fluss/client/table/
append.rs1use crate::client::table::partition_getter::{PartitionGetter, get_physical_path};
19use crate::client::{WriteRecord, WriteResultFuture, WriterClient};
20use crate::error::Error::IllegalArgument;
21use crate::error::Result;
22use crate::metadata::{PhysicalTablePath, TableInfo, TablePath};
23use crate::row::{ColumnarRow, InternalRow};
24use arrow::array::RecordBatch;
25use std::sync::Arc;
26
27pub struct TableAppend {
28 table_path: Arc<TablePath>,
29 table_info: Arc<TableInfo>,
30 writer_client: Arc<WriterClient>,
31}
32
33impl TableAppend {
34 pub(super) fn new(
35 table_path: TablePath,
36 table_info: Arc<TableInfo>,
37 writer_client: Arc<WriterClient>,
38 ) -> Self {
39 Self {
40 table_path: Arc::new(table_path),
41 table_info,
42 writer_client,
43 }
44 }
45
46 pub fn create_writer(&self) -> Result<AppendWriter> {
47 let partition_getter = if self.table_info.is_partitioned() {
48 Some(PartitionGetter::new(
49 self.table_info.row_type(),
50 Arc::clone(self.table_info.get_partition_keys()),
51 )?)
52 } else {
53 None
54 };
55
56 Ok(AppendWriter {
57 table_path: Arc::clone(&self.table_path),
58 partition_getter,
59 writer_client: self.writer_client.clone(),
60 table_info: Arc::clone(&self.table_info),
61 })
62 }
63}
64
65pub struct AppendWriter {
66 table_path: Arc<TablePath>,
67 partition_getter: Option<PartitionGetter>,
68 writer_client: Arc<WriterClient>,
69 table_info: Arc<TableInfo>,
70}
71
72impl AppendWriter {
73 fn check_field_count<R: InternalRow>(&self, row: &R) -> Result<()> {
74 let expected = self.table_info.get_row_type().fields().len();
75 if row.get_field_count() != expected {
76 return Err(IllegalArgument {
77 message: format!(
78 "The field count of the row does not match the table schema. \
79 Expected: {}, Actual: {}",
80 expected,
81 row.get_field_count()
82 ),
83 });
84 }
85 Ok(())
86 }
87
88 pub fn append<R: InternalRow>(&self, row: &R) -> Result<WriteResultFuture> {
100 self.check_field_count(row)?;
101 let physical_table_path = Arc::new(get_physical_path(
102 &self.table_path,
103 self.partition_getter.as_ref(),
104 row,
105 )?);
106 let record = WriteRecord::for_append(
107 Arc::clone(&self.table_info),
108 physical_table_path,
109 self.table_info.schema_id,
110 row,
111 );
112 let result_handle = self.writer_client.send(&record)?;
113 Ok(WriteResultFuture::new(result_handle))
114 }
115
116 pub fn append_arrow_batch(&self, batch: RecordBatch) -> Result<WriteResultFuture> {
128 let physical_table_path = if self.partition_getter.is_some() && batch.num_rows() > 0 {
129 let first_row = ColumnarRow::new(Arc::new(batch.clone()));
130 Arc::new(get_physical_path(
131 &self.table_path,
132 self.partition_getter.as_ref(),
133 &first_row,
134 )?)
135 } else {
136 Arc::new(PhysicalTablePath::of(Arc::clone(&self.table_path)))
137 };
138
139 let record = WriteRecord::for_append_record_batch(
140 Arc::clone(&self.table_info),
141 physical_table_path,
142 self.table_info.schema_id,
143 batch,
144 );
145 let result_handle = self.writer_client.send(&record)?;
146 Ok(WriteResultFuture::new(result_handle))
147 }
148
149 pub async fn flush(&self) -> Result<()> {
150 self.writer_client.flush().await
151 }
152}