fluss/client/table/
mod.rs1use crate::client::connection::FlussConnection;
19use crate::client::metadata::Metadata;
20use crate::error::{Error, Result};
21use crate::metadata::{TableInfo, TablePath};
22use std::sync::Arc;
23
24pub const EARLIEST_OFFSET: i64 = -2;
25
26mod append;
27mod lookup;
28
29mod log_fetch_buffer;
30mod partition_getter;
31mod remote_log;
32mod scanner;
33mod upsert;
34
35pub use append::{AppendWriter, TableAppend};
36pub use lookup::{LookupResult, Lookuper, TableLookup};
37pub use remote_log::{
38 DEFAULT_REMOTE_FILE_DOWNLOAD_THREAD_NUM, DEFAULT_SCANNER_REMOTE_LOG_PREFETCH_NUM,
39};
40pub use scanner::{LogScanner, RecordBatchLogScanner, TableScan};
41pub use upsert::{TableUpsert, UpsertWriter};
42
43#[allow(dead_code)]
44pub struct FlussTable<'a> {
45 conn: &'a FlussConnection,
46 metadata: Arc<Metadata>,
47 table_info: TableInfo,
48 table_path: TablePath,
49 has_primary_key: bool,
50}
51
52impl<'a> FlussTable<'a> {
53 pub fn new(conn: &'a FlussConnection, metadata: Arc<Metadata>, table_info: TableInfo) -> Self {
54 FlussTable {
55 conn,
56 table_path: table_info.table_path.clone(),
57 has_primary_key: table_info.has_primary_key(),
58 table_info,
59 metadata,
60 }
61 }
62
63 pub fn new_append(&self) -> Result<TableAppend> {
64 if self.has_primary_key {
65 return Err(Error::UnsupportedOperation {
66 message: "Append is only supported for log tables (without primary key)"
67 .to_string(),
68 });
69 }
70 Ok(TableAppend::new(
71 self.table_path.clone(),
72 Arc::new(self.table_info.clone()),
73 self.conn.get_or_create_writer_client()?,
74 ))
75 }
76
77 pub fn new_scan(&self) -> TableScan<'_> {
78 TableScan::new(self.conn, self.table_info.clone(), self.metadata.clone())
79 }
80
81 pub fn metadata(&self) -> &Arc<Metadata> {
82 &self.metadata
83 }
84
85 pub fn get_table_info(&self) -> &TableInfo {
86 &self.table_info
87 }
88
89 pub fn table_path(&self) -> &TablePath {
90 &self.table_path
91 }
92
93 pub fn has_primary_key(&self) -> bool {
94 self.has_primary_key
95 }
96
97 pub fn new_lookup(&self) -> Result<TableLookup> {
118 if !self.has_primary_key {
119 return Err(Error::UnsupportedOperation {
120 message: "Lookup is only supported for primary key tables".to_string(),
121 });
122 }
123 Ok(TableLookup::new(
124 self.conn.get_connections(),
125 self.table_info.clone(),
126 self.metadata.clone(),
127 ))
128 }
129
130 pub fn new_upsert(&self) -> Result<TableUpsert> {
131 if !self.has_primary_key {
132 return Err(Error::UnsupportedOperation {
133 message: "Upsert is only supported for primary key tables".to_string(),
134 });
135 }
136
137 Ok(TableUpsert::new(
138 self.table_path.clone(),
139 self.table_info.clone(),
140 self.conn.get_or_create_writer_client()?,
141 ))
142 }
143}
144
145impl<'a> Drop for FlussTable<'a> {
146 fn drop(&mut self) {
147 }
149}