1use crate::bucketing::BucketingFunction;
19use crate::client::metadata::Metadata;
20use crate::client::table::partition_getter::PartitionGetter;
21use crate::error::{Error, Result};
22use crate::metadata::{PhysicalTablePath, RowType, TableBucket, TableInfo, TablePath};
23use crate::record::RowAppendRecordBatchBuilder;
24use crate::record::kv::SCHEMA_ID_LENGTH;
25use crate::row::InternalRow;
26use crate::row::compacted::CompactedRow;
27use crate::row::encode::{KeyEncoder, KeyEncoderFactory};
28use crate::rpc::ApiError;
29use crate::rpc::RpcClient;
30use crate::rpc::message::LookupRequest;
31use arrow::array::RecordBatch;
32use std::sync::Arc;
33
34pub struct LookupResult {
40 rows: Vec<Vec<u8>>,
41 row_type: Arc<RowType>,
42}
43
44impl LookupResult {
45 fn new(rows: Vec<Vec<u8>>, row_type: Arc<RowType>) -> Self {
47 Self { rows, row_type }
48 }
49
50 fn empty(row_type: Arc<RowType>) -> Self {
52 Self {
53 rows: Vec::new(),
54 row_type,
55 }
56 }
57
58 fn extract_payload(bytes: &[u8]) -> Result<&[u8]> {
60 bytes
61 .get(SCHEMA_ID_LENGTH..)
62 .ok_or_else(|| Error::RowConvertError {
63 message: format!(
64 "Row payload too short: {} bytes, need at least {} for schema id",
65 bytes.len(),
66 SCHEMA_ID_LENGTH
67 ),
68 })
69 }
70
71 pub fn get_single_row(&self) -> Result<Option<CompactedRow<'_>>> {
82 match self.rows.len() {
83 0 => Ok(None),
84 1 => {
85 let payload = Self::extract_payload(&self.rows[0])?;
86 Ok(Some(CompactedRow::from_bytes(&self.row_type, payload)))
87 }
88 _ => Err(Error::UnexpectedError {
89 message: "LookupResult contains multiple rows, use get_rows() instead".to_string(),
90 source: None,
91 }),
92 }
93 }
94
95 pub fn get_rows(&self) -> Result<Vec<CompactedRow<'_>>> {
101 self.rows
102 .iter()
103 .map(|bytes| {
105 let payload = Self::extract_payload(bytes)?;
106 Ok(CompactedRow::from_bytes(&self.row_type, payload))
107 })
108 .collect()
109 }
110
111 pub fn to_record_batch(&self) -> Result<RecordBatch> {
120 let mut builder = RowAppendRecordBatchBuilder::new(&self.row_type)?;
121
122 for bytes in &self.rows {
123 let payload = Self::extract_payload(bytes)?;
124
125 let row = CompactedRow::from_bytes(&self.row_type, payload);
126 builder.append(&row)?;
127 }
128
129 builder.build_arrow_record_batch().map(Arc::unwrap_or_clone)
130 }
131}
132
133pub struct TableLookup {
151 rpc_client: Arc<RpcClient>,
152 table_info: TableInfo,
153 metadata: Arc<Metadata>,
154}
155
156impl TableLookup {
157 pub(super) fn new(
158 rpc_client: Arc<RpcClient>,
159 table_info: TableInfo,
160 metadata: Arc<Metadata>,
161 ) -> Self {
162 Self {
163 rpc_client,
164 table_info,
165 metadata,
166 }
167 }
168
169 pub fn create_lookuper(self) -> Result<Lookuper> {
174 let num_buckets = self.table_info.get_num_buckets();
175
176 let data_lake_format = self.table_info.get_table_config().get_datalake_format()?;
178 let bucketing_function = <dyn BucketingFunction>::of(data_lake_format.as_ref());
179
180 let row_type = self.table_info.row_type();
181 let primary_keys = self.table_info.get_primary_keys();
182 let lookup_row_type = row_type.project_with_field_names(primary_keys)?;
183
184 let physical_primary_keys = self.table_info.get_physical_primary_keys().to_vec();
185 let primary_key_encoder =
186 KeyEncoderFactory::of(&lookup_row_type, &physical_primary_keys, &data_lake_format)?;
187
188 let bucket_key_encoder = if self.table_info.is_default_bucket_key() {
189 None
190 } else {
191 let bucket_keys = self.table_info.get_bucket_keys().to_vec();
192 Some(KeyEncoderFactory::of(
193 &lookup_row_type,
194 &bucket_keys,
195 &data_lake_format,
196 )?)
197 };
198
199 let partition_getter = if self.table_info.is_partitioned() {
200 Some(PartitionGetter::new(
201 &lookup_row_type,
202 Arc::clone(self.table_info.get_partition_keys()),
203 )?)
204 } else {
205 None
206 };
207
208 let row_type = Arc::new(self.table_info.row_type().clone());
209 Ok(Lookuper {
210 rpc_client: self.rpc_client,
211 table_path: Arc::new(self.table_info.table_path.clone()),
212 row_type,
213 table_info: self.table_info,
214 metadata: self.metadata,
215 bucketing_function,
216 primary_key_encoder,
217 bucket_key_encoder,
218 partition_getter,
219 num_buckets,
220 })
221 }
222}
223
224pub struct Lookuper {
236 rpc_client: Arc<RpcClient>,
237 table_info: TableInfo,
238 row_type: Arc<RowType>,
239 table_path: Arc<TablePath>,
240 metadata: Arc<Metadata>,
241 bucketing_function: Box<dyn BucketingFunction>,
242 primary_key_encoder: Box<dyn KeyEncoder>,
243 bucket_key_encoder: Option<Box<dyn KeyEncoder>>,
244 partition_getter: Option<PartitionGetter>,
245 num_buckets: i32,
246}
247
248impl Lookuper {
249 pub async fn lookup(&mut self, row: &dyn InternalRow) -> Result<LookupResult> {
261 let pk_bytes = self.primary_key_encoder.encode_key(row)?;
263 let pk_bytes_vec = pk_bytes.to_vec();
264 let bk_bytes = match &mut self.bucket_key_encoder {
265 Some(encoder) => &encoder.encode_key(row)?,
266 None => &pk_bytes,
267 };
268
269 let partition_id = if let Some(ref partition_getter) = self.partition_getter {
270 let partition_name = partition_getter.get_partition(row)?;
271 let physical_table_path = PhysicalTablePath::of_partitioned(
272 Arc::clone(&self.table_path),
273 Some(partition_name),
274 );
275 let cluster = self.metadata.get_cluster();
276 match cluster.get_partition_id(&physical_table_path) {
277 Some(id) => Some(id),
278 None => {
279 return Ok(LookupResult::empty(Arc::clone(&self.row_type)));
281 }
282 }
283 } else {
284 None
285 };
286
287 let bucket_id = self
288 .bucketing_function
289 .bucketing(bk_bytes, self.num_buckets)?;
290
291 let table_id = self.table_info.get_table_id();
292 let table_bucket = TableBucket::new_with_partition(table_id, partition_id, bucket_id);
293
294 let cluster = self.metadata.get_cluster();
296 let leader = self
297 .metadata
298 .leader_for(self.table_path.as_ref(), &table_bucket)
299 .await?
300 .ok_or_else(|| {
301 Error::leader_not_available(format!(
302 "No leader found for table bucket: {table_bucket}"
303 ))
304 })?;
305
306 let tablet_server = cluster.get_tablet_server(leader.id()).ok_or_else(|| {
308 Error::leader_not_available(format!(
309 "Tablet server {} is not found in metadata cache",
310 leader.id()
311 ))
312 })?;
313
314 let connection = self.rpc_client.get_connection(tablet_server).await?;
315
316 let request = LookupRequest::new(table_id, partition_id, bucket_id, vec![pk_bytes_vec]);
318 let response = connection.request(request).await?;
319
320 if let Some(bucket_resp) = response.buckets_resp.into_iter().next() {
322 if let Some(error_code) = bucket_resp.error_code {
324 if error_code != 0 {
325 return Err(Error::FlussAPIError {
326 api_error: ApiError {
327 code: error_code,
328 message: bucket_resp.error_message.unwrap_or_default(),
329 },
330 });
331 }
332 }
333
334 let rows: Vec<Vec<u8>> = bucket_resp
336 .values
337 .into_iter()
338 .filter_map(|pb_value| pb_value.values)
339 .collect();
340
341 return Ok(LookupResult::new(rows, Arc::clone(&self.row_type)));
342 }
343
344 Ok(LookupResult::empty(Arc::clone(&self.row_type)))
345 }
346
347 pub fn table_info(&self) -> &TableInfo {
349 &self.table_info
350 }
351}
352
353#[cfg(test)]
354mod tests {
355 use super::*;
356 use crate::metadata::{DataField, DataTypes};
357 use crate::row::binary::BinaryWriter;
358 use crate::row::compacted::CompactedRowWriter;
359 use arrow::array::Int32Array;
360
361 fn make_row_bytes(schema_id: i16, row_data: &[u8]) -> Vec<u8> {
362 let mut bytes = Vec::with_capacity(SCHEMA_ID_LENGTH + row_data.len());
363 bytes.extend_from_slice(&schema_id.to_le_bytes());
364 bytes.extend_from_slice(row_data);
365 bytes
366 }
367
368 #[test]
369 fn test_to_record_batch_empty() {
370 let row_type = Arc::new(RowType::new(vec![DataField::new(
371 "id",
372 DataTypes::int(),
373 None,
374 )]));
375 let result = LookupResult::empty(row_type);
376 let batch = result.to_record_batch().unwrap();
377 assert_eq!(batch.num_rows(), 0);
378 assert_eq!(batch.num_columns(), 1);
379 }
380
381 #[test]
382 fn test_to_record_batch_with_row() {
383 let row_type = Arc::new(RowType::new(vec![DataField::new(
384 "id",
385 DataTypes::int(),
386 None,
387 )]));
388
389 let mut writer = CompactedRowWriter::new(1);
390 writer.write_int(42);
391 let row_bytes = make_row_bytes(0, writer.buffer());
392
393 let result = LookupResult::new(vec![row_bytes], Arc::clone(&row_type));
394 let batch = result.to_record_batch().unwrap();
395
396 assert_eq!(batch.num_rows(), 1);
397 let col = batch
398 .column(0)
399 .as_any()
400 .downcast_ref::<Int32Array>()
401 .unwrap();
402 assert_eq!(col.value(0), 42);
403 }
404
405 #[test]
406 fn test_to_record_batch_payload_too_short() {
407 let row_type = Arc::new(RowType::new(vec![DataField::new(
408 "id",
409 DataTypes::int(),
410 None,
411 )]));
412 let result = LookupResult::new(vec![vec![0u8]], Arc::clone(&row_type));
414 assert!(result.to_record_batch().is_err());
415 }
416}