Skip to main content

fluss/client/table/
lookup.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::bucketing::BucketingFunction;
19use crate::client::metadata::Metadata;
20use crate::client::table::partition_getter::PartitionGetter;
21use crate::error::{Error, Result};
22use crate::metadata::{PhysicalTablePath, RowType, TableBucket, TableInfo, TablePath};
23use crate::record::RowAppendRecordBatchBuilder;
24use crate::record::kv::SCHEMA_ID_LENGTH;
25use crate::row::InternalRow;
26use crate::row::compacted::CompactedRow;
27use crate::row::encode::{KeyEncoder, KeyEncoderFactory};
28use crate::rpc::ApiError;
29use crate::rpc::RpcClient;
30use crate::rpc::message::LookupRequest;
31use arrow::array::RecordBatch;
32use std::sync::Arc;
33
34/// The result of a lookup operation.
35///
36/// Contains the rows returned from a lookup. For primary key lookups,
37/// this will contain at most one row. For prefix key lookups (future),
38/// this may contain multiple rows.
39pub struct LookupResult {
40    rows: Vec<Vec<u8>>,
41    row_type: Arc<RowType>,
42}
43
44impl LookupResult {
45    /// Creates a new LookupResult from a list of row bytes.
46    fn new(rows: Vec<Vec<u8>>, row_type: Arc<RowType>) -> Self {
47        Self { rows, row_type }
48    }
49
50    /// Creates an empty LookupResult.
51    fn empty(row_type: Arc<RowType>) -> Self {
52        Self {
53            rows: Vec::new(),
54            row_type,
55        }
56    }
57
58    /// Extracts the row payload by stripping the schema id prefix.
59    fn extract_payload(bytes: &[u8]) -> Result<&[u8]> {
60        bytes
61            .get(SCHEMA_ID_LENGTH..)
62            .ok_or_else(|| Error::RowConvertError {
63                message: format!(
64                    "Row payload too short: {} bytes, need at least {} for schema id",
65                    bytes.len(),
66                    SCHEMA_ID_LENGTH
67                ),
68            })
69    }
70
71    /// Returns the only row in the result set as a [`CompactedRow`].
72    ///
73    /// This method provides a zero-copy view of the row data, which means the returned
74    /// `CompactedRow` borrows from this result set and cannot outlive it.
75    ///
76    /// # Returns
77    /// - `Ok(Some(row))`: If exactly one row exists.
78    /// - `Ok(None)`: If the result set is empty.
79    /// - `Err(Error::UnexpectedError)`: If the result set contains more than one row.
80    /// - `Err(Error)`: If the row payload is too short to contain a schema id.
81    pub fn get_single_row(&self) -> Result<Option<CompactedRow<'_>>> {
82        match self.rows.len() {
83            0 => Ok(None),
84            1 => {
85                let payload = Self::extract_payload(&self.rows[0])?;
86                Ok(Some(CompactedRow::from_bytes(&self.row_type, payload)))
87            }
88            _ => Err(Error::UnexpectedError {
89                message: "LookupResult contains multiple rows, use get_rows() instead".to_string(),
90                source: None,
91            }),
92        }
93    }
94
95    /// Returns all rows in the result set as [`CompactedRow`]s.
96    ///
97    /// # Returns
98    /// - `Ok(rows)` - All rows in the result set.
99    /// - `Err(Error)` - If any row payload is too short to contain a schema id.
100    pub fn get_rows(&self) -> Result<Vec<CompactedRow<'_>>> {
101        self.rows
102            .iter()
103            // TODO Add schema id check and fetch when implementing prefix lookup
104            .map(|bytes| {
105                let payload = Self::extract_payload(bytes)?;
106                Ok(CompactedRow::from_bytes(&self.row_type, payload))
107            })
108            .collect()
109    }
110
111    /// Converts all rows in this result into an Arrow [`RecordBatch`].
112    ///
113    /// This is useful for integration with DataFusion or other Arrow-based tools.
114    ///
115    /// # Returns
116    /// - `Ok(RecordBatch)` - All rows in columnar Arrow format. Returns an empty
117    ///   batch (with the correct schema) if the result set is empty.
118    /// - `Err(Error)` - If the conversion fails.
119    pub fn to_record_batch(&self) -> Result<RecordBatch> {
120        let mut builder = RowAppendRecordBatchBuilder::new(&self.row_type)?;
121
122        for bytes in &self.rows {
123            let payload = Self::extract_payload(bytes)?;
124
125            let row = CompactedRow::from_bytes(&self.row_type, payload);
126            builder.append(&row)?;
127        }
128
129        builder.build_arrow_record_batch().map(Arc::unwrap_or_clone)
130    }
131}
132
133/// Configuration and factory struct for creating lookup operations.
134///
135/// `TableLookup` follows the same pattern as `TableScan` and `TableAppend`,
136/// providing a builder-style API for configuring lookup operations before
137/// creating the actual `Lookuper`.
138///
139/// # Example
140/// ```ignore
141/// let table = conn.get_table(&table_path).await?;
142/// let lookuper = table.new_lookup()?.create_lookuper()?;
143/// let result = lookuper.lookup(&row).await?;
144/// if let Some(value) = result.get_single_row() {
145///     println!("Found: {:?}", value);
146/// }
147/// ```
148// TODO: Add lookup_by(column_names) for prefix key lookups (PrefixKeyLookuper)
149// TODO: Add create_typed_lookuper<T>() for typed lookups with POJO mapping
150pub struct TableLookup {
151    rpc_client: Arc<RpcClient>,
152    table_info: TableInfo,
153    metadata: Arc<Metadata>,
154}
155
156impl TableLookup {
157    pub(super) fn new(
158        rpc_client: Arc<RpcClient>,
159        table_info: TableInfo,
160        metadata: Arc<Metadata>,
161    ) -> Self {
162        Self {
163            rpc_client,
164            table_info,
165            metadata,
166        }
167    }
168
169    /// Creates a `Lookuper` for performing key-based lookups.
170    ///
171    /// The lookuper will automatically encode the key and compute the bucket
172    /// for each lookup using the appropriate bucketing function.
173    pub fn create_lookuper(self) -> Result<Lookuper> {
174        let num_buckets = self.table_info.get_num_buckets();
175
176        // Get data lake format from table config for bucketing function
177        let data_lake_format = self.table_info.get_table_config().get_datalake_format()?;
178        let bucketing_function = <dyn BucketingFunction>::of(data_lake_format.as_ref());
179
180        let row_type = self.table_info.row_type();
181        let primary_keys = self.table_info.get_primary_keys();
182        let lookup_row_type = row_type.project_with_field_names(primary_keys)?;
183
184        let physical_primary_keys = self.table_info.get_physical_primary_keys().to_vec();
185        let primary_key_encoder =
186            KeyEncoderFactory::of(&lookup_row_type, &physical_primary_keys, &data_lake_format)?;
187
188        let bucket_key_encoder = if self.table_info.is_default_bucket_key() {
189            None
190        } else {
191            let bucket_keys = self.table_info.get_bucket_keys().to_vec();
192            Some(KeyEncoderFactory::of(
193                &lookup_row_type,
194                &bucket_keys,
195                &data_lake_format,
196            )?)
197        };
198
199        let partition_getter = if self.table_info.is_partitioned() {
200            Some(PartitionGetter::new(
201                &lookup_row_type,
202                Arc::clone(self.table_info.get_partition_keys()),
203            )?)
204        } else {
205            None
206        };
207
208        let row_type = Arc::new(self.table_info.row_type().clone());
209        Ok(Lookuper {
210            rpc_client: self.rpc_client,
211            table_path: Arc::new(self.table_info.table_path.clone()),
212            row_type,
213            table_info: self.table_info,
214            metadata: self.metadata,
215            bucketing_function,
216            primary_key_encoder,
217            bucket_key_encoder,
218            partition_getter,
219            num_buckets,
220        })
221    }
222}
223
224/// Performs key-based lookups against a primary key table.
225///
226/// The `Lookuper` automatically encodes the lookup key, computes the target
227/// bucket, finds the appropriate tablet server, and retrieves the value.
228///
229/// # Example
230/// ```ignore
231/// let lookuper = table.new_lookup()?.create_lookuper()?;
232/// let row = GenericRow::new(vec![Datum::Int32(42)]); // lookup key
233/// let result = lookuper.lookup(&row).await?;
234/// ```
235pub struct Lookuper {
236    rpc_client: Arc<RpcClient>,
237    table_info: TableInfo,
238    row_type: Arc<RowType>,
239    table_path: Arc<TablePath>,
240    metadata: Arc<Metadata>,
241    bucketing_function: Box<dyn BucketingFunction>,
242    primary_key_encoder: Box<dyn KeyEncoder>,
243    bucket_key_encoder: Option<Box<dyn KeyEncoder>>,
244    partition_getter: Option<PartitionGetter>,
245    num_buckets: i32,
246}
247
248impl Lookuper {
249    /// Looks up a value by its primary key.
250    ///
251    /// The key is encoded and the bucket is automatically computed using
252    /// the table's bucketing function.
253    ///
254    /// # Arguments
255    /// * `row` - The row containing the primary key field values
256    ///
257    /// # Returns
258    /// * `Ok(LookupResult)` - The lookup result (may be empty if key not found)
259    /// * `Err(Error)` - If the lookup fails
260    pub async fn lookup(&mut self, row: &dyn InternalRow) -> Result<LookupResult> {
261        // todo: support batch lookup
262        let pk_bytes = self.primary_key_encoder.encode_key(row)?;
263        let pk_bytes_vec = pk_bytes.to_vec();
264        let bk_bytes = match &mut self.bucket_key_encoder {
265            Some(encoder) => &encoder.encode_key(row)?,
266            None => &pk_bytes,
267        };
268
269        let partition_id = if let Some(ref partition_getter) = self.partition_getter {
270            let partition_name = partition_getter.get_partition(row)?;
271            let physical_table_path = PhysicalTablePath::of_partitioned(
272                Arc::clone(&self.table_path),
273                Some(partition_name),
274            );
275            let cluster = self.metadata.get_cluster();
276            match cluster.get_partition_id(&physical_table_path) {
277                Some(id) => Some(id),
278                None => {
279                    // Partition doesn't exist, return empty result (like Java)
280                    return Ok(LookupResult::empty(Arc::clone(&self.row_type)));
281                }
282            }
283        } else {
284            None
285        };
286
287        let bucket_id = self
288            .bucketing_function
289            .bucketing(bk_bytes, self.num_buckets)?;
290
291        let table_id = self.table_info.get_table_id();
292        let table_bucket = TableBucket::new_with_partition(table_id, partition_id, bucket_id);
293
294        // Find the leader for this bucket
295        let cluster = self.metadata.get_cluster();
296        let leader = self
297            .metadata
298            .leader_for(self.table_path.as_ref(), &table_bucket)
299            .await?
300            .ok_or_else(|| {
301                Error::leader_not_available(format!(
302                    "No leader found for table bucket: {table_bucket}"
303                ))
304            })?;
305
306        // Get connection to the tablet server
307        let tablet_server = cluster.get_tablet_server(leader.id()).ok_or_else(|| {
308            Error::leader_not_available(format!(
309                "Tablet server {} is not found in metadata cache",
310                leader.id()
311            ))
312        })?;
313
314        let connection = self.rpc_client.get_connection(tablet_server).await?;
315
316        // Send lookup request
317        let request = LookupRequest::new(table_id, partition_id, bucket_id, vec![pk_bytes_vec]);
318        let response = connection.request(request).await?;
319
320        // Extract the values from response
321        if let Some(bucket_resp) = response.buckets_resp.into_iter().next() {
322            // Check for errors
323            if let Some(error_code) = bucket_resp.error_code {
324                if error_code != 0 {
325                    return Err(Error::FlussAPIError {
326                        api_error: ApiError {
327                            code: error_code,
328                            message: bucket_resp.error_message.unwrap_or_default(),
329                        },
330                    });
331                }
332            }
333
334            // Collect all values
335            let rows: Vec<Vec<u8>> = bucket_resp
336                .values
337                .into_iter()
338                .filter_map(|pb_value| pb_value.values)
339                .collect();
340
341            return Ok(LookupResult::new(rows, Arc::clone(&self.row_type)));
342        }
343
344        Ok(LookupResult::empty(Arc::clone(&self.row_type)))
345    }
346
347    /// Returns a reference to the table info.
348    pub fn table_info(&self) -> &TableInfo {
349        &self.table_info
350    }
351}
352
353#[cfg(test)]
354mod tests {
355    use super::*;
356    use crate::metadata::{DataField, DataTypes};
357    use crate::row::binary::BinaryWriter;
358    use crate::row::compacted::CompactedRowWriter;
359    use arrow::array::Int32Array;
360
361    fn make_row_bytes(schema_id: i16, row_data: &[u8]) -> Vec<u8> {
362        let mut bytes = Vec::with_capacity(SCHEMA_ID_LENGTH + row_data.len());
363        bytes.extend_from_slice(&schema_id.to_le_bytes());
364        bytes.extend_from_slice(row_data);
365        bytes
366    }
367
368    #[test]
369    fn test_to_record_batch_empty() {
370        let row_type = Arc::new(RowType::new(vec![DataField::new(
371            "id",
372            DataTypes::int(),
373            None,
374        )]));
375        let result = LookupResult::empty(row_type);
376        let batch = result.to_record_batch().unwrap();
377        assert_eq!(batch.num_rows(), 0);
378        assert_eq!(batch.num_columns(), 1);
379    }
380
381    #[test]
382    fn test_to_record_batch_with_row() {
383        let row_type = Arc::new(RowType::new(vec![DataField::new(
384            "id",
385            DataTypes::int(),
386            None,
387        )]));
388
389        let mut writer = CompactedRowWriter::new(1);
390        writer.write_int(42);
391        let row_bytes = make_row_bytes(0, writer.buffer());
392
393        let result = LookupResult::new(vec![row_bytes], Arc::clone(&row_type));
394        let batch = result.to_record_batch().unwrap();
395
396        assert_eq!(batch.num_rows(), 1);
397        let col = batch
398            .column(0)
399            .as_any()
400            .downcast_ref::<Int32Array>()
401            .unwrap();
402        assert_eq!(col.value(0), 42);
403    }
404
405    #[test]
406    fn test_to_record_batch_payload_too_short() {
407        let row_type = Arc::new(RowType::new(vec![DataField::new(
408            "id",
409            DataTypes::int(),
410            None,
411        )]));
412        // Only 1 byte — shorter than SCHEMA_ID_LENGTH (2)
413        let result = LookupResult::new(vec![vec![0u8]], Arc::clone(&row_type));
414        assert!(result.to_record_batch().is_err());
415    }
416}