Skip to main content

fluss/client/table/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::client::connection::FlussConnection;
19use crate::client::metadata::Metadata;
20use crate::error::{Error, Result};
21use crate::metadata::{TableInfo, TablePath};
22use std::sync::Arc;
23
24pub const EARLIEST_OFFSET: i64 = -2;
25
26mod append;
27mod lookup;
28
29mod log_fetch_buffer;
30mod partition_getter;
31mod remote_log;
32mod scanner;
33mod upsert;
34
35pub use append::{AppendWriter, TableAppend};
36pub use lookup::{LookupResult, Lookuper, TableLookup};
37pub use remote_log::{
38    DEFAULT_REMOTE_FILE_DOWNLOAD_THREAD_NUM, DEFAULT_SCANNER_REMOTE_LOG_PREFETCH_NUM,
39};
40pub use scanner::{LogScanner, RecordBatchLogScanner, TableScan};
41pub use upsert::{TableUpsert, UpsertWriter};
42
43#[allow(dead_code)]
44pub struct FlussTable<'a> {
45    conn: &'a FlussConnection,
46    metadata: Arc<Metadata>,
47    table_info: TableInfo,
48    table_path: TablePath,
49    has_primary_key: bool,
50}
51
52impl<'a> FlussTable<'a> {
53    pub fn new(conn: &'a FlussConnection, metadata: Arc<Metadata>, table_info: TableInfo) -> Self {
54        FlussTable {
55            conn,
56            table_path: table_info.table_path.clone(),
57            has_primary_key: table_info.has_primary_key(),
58            table_info,
59            metadata,
60        }
61    }
62
63    pub fn new_append(&self) -> Result<TableAppend> {
64        if self.has_primary_key {
65            return Err(Error::UnsupportedOperation {
66                message: "Append is only supported for log tables (without primary key)"
67                    .to_string(),
68            });
69        }
70        Ok(TableAppend::new(
71            self.table_path.clone(),
72            Arc::new(self.table_info.clone()),
73            self.conn.get_or_create_writer_client()?,
74        ))
75    }
76
77    pub fn new_scan(&self) -> TableScan<'_> {
78        TableScan::new(self.conn, self.table_info.clone(), self.metadata.clone())
79    }
80
81    pub fn metadata(&self) -> &Arc<Metadata> {
82        &self.metadata
83    }
84
85    pub fn get_table_info(&self) -> &TableInfo {
86        &self.table_info
87    }
88
89    pub fn table_path(&self) -> &TablePath {
90        &self.table_path
91    }
92
93    pub fn has_primary_key(&self) -> bool {
94        self.has_primary_key
95    }
96
97    /// Creates a new `TableLookup` for configuring lookup operations.
98    ///
99    /// This follows the same pattern as `new_scan()` and `new_append()`,
100    /// returning a configuration object that can be used to create a `Lookuper`.
101    ///
102    /// The table must have a primary key (be a primary key table).
103    ///
104    /// # Returns
105    /// * `Ok(TableLookup)` - A lookup configuration object
106    /// * `Err(Error)` - If the table doesn't have a primary key
107    ///
108    /// # Example
109    /// ```ignore
110    /// let table = conn.get_table(&table_path).await?;
111    /// let lookuper = table.new_lookup()?.create_lookuper()?;
112    /// let key = vec![1, 2, 3]; // encoded primary key bytes
113    /// if let Some(value) = lookuper.lookup(key).await? {
114    ///     println!("Found value: {:?}", value);
115    /// }
116    /// ```
117    pub fn new_lookup(&self) -> Result<TableLookup> {
118        if !self.has_primary_key {
119            return Err(Error::UnsupportedOperation {
120                message: "Lookup is only supported for primary key tables".to_string(),
121            });
122        }
123        Ok(TableLookup::new(
124            self.conn.get_connections(),
125            self.table_info.clone(),
126            self.metadata.clone(),
127        ))
128    }
129
130    pub fn new_upsert(&self) -> Result<TableUpsert> {
131        if !self.has_primary_key {
132            return Err(Error::UnsupportedOperation {
133                message: "Upsert is only supported for primary key tables".to_string(),
134            });
135        }
136
137        Ok(TableUpsert::new(
138            self.table_path.clone(),
139            self.table_info.clone(),
140            self.conn.get_or_create_writer_client()?,
141        ))
142    }
143}
144
145impl<'a> Drop for FlussTable<'a> {
146    fn drop(&mut self) {
147        // do-nothing now
148    }
149}