use crate::client::connection::FlussConnection;
use crate::client::metadata::Metadata;
use crate::error::{Error, Result};
use crate::metadata::{TableInfo, TablePath};
use std::sync::Arc;
pub const EARLIEST_OFFSET: i64 = -2;
mod append;
mod lookup;
mod log_fetch_buffer;
mod partition_getter;
mod remote_log;
mod scanner;
mod upsert;
pub use append::{AppendWriter, TableAppend};
pub use lookup::{LookupResult, Lookuper, TableLookup};
pub use remote_log::{
DEFAULT_REMOTE_FILE_DOWNLOAD_THREAD_NUM, DEFAULT_SCANNER_REMOTE_LOG_PREFETCH_NUM,
};
pub use scanner::{LogScanner, RecordBatchLogScanner, TableScan};
pub use upsert::{TableUpsert, UpsertWriter};
#[allow(dead_code)]
pub struct FlussTable<'a> {
conn: &'a FlussConnection,
metadata: Arc<Metadata>,
table_info: TableInfo,
table_path: TablePath,
has_primary_key: bool,
}
impl<'a> FlussTable<'a> {
pub fn new(conn: &'a FlussConnection, metadata: Arc<Metadata>, table_info: TableInfo) -> Self {
FlussTable {
conn,
table_path: table_info.table_path.clone(),
has_primary_key: table_info.has_primary_key(),
table_info,
metadata,
}
}
pub fn new_append(&self) -> Result<TableAppend> {
if self.has_primary_key {
return Err(Error::UnsupportedOperation {
message: "Append is only supported for log tables (without primary key)"
.to_string(),
});
}
Ok(TableAppend::new(
self.table_path.clone(),
Arc::new(self.table_info.clone()),
self.conn.get_or_create_writer_client()?,
))
}
pub fn new_scan(&self) -> TableScan<'_> {
TableScan::new(self.conn, self.table_info.clone(), self.metadata.clone())
}
pub fn metadata(&self) -> &Arc<Metadata> {
&self.metadata
}
pub fn get_table_info(&self) -> &TableInfo {
&self.table_info
}
pub fn table_path(&self) -> &TablePath {
&self.table_path
}
pub fn has_primary_key(&self) -> bool {
self.has_primary_key
}
pub fn new_lookup(&self) -> Result<TableLookup> {
if !self.has_primary_key {
return Err(Error::UnsupportedOperation {
message: "Lookup is only supported for primary key tables".to_string(),
});
}
Ok(TableLookup::new(
self.conn.get_connections(),
self.table_info.clone(),
self.metadata.clone(),
))
}
pub fn new_upsert(&self) -> Result<TableUpsert> {
if !self.has_primary_key {
return Err(Error::UnsupportedOperation {
message: "Upsert is only supported for primary key tables".to_string(),
});
}
Ok(TableUpsert::new(
self.table_path.clone(),
self.table_info.clone(),
self.conn.get_or_create_writer_client()?,
))
}
}
impl<'a> Drop for FlussTable<'a> {
fn drop(&mut self) {
}
}