Skip to main content

fluss/client/table/
append.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::client::table::partition_getter::{PartitionGetter, get_physical_path};
19use crate::client::{WriteRecord, WriteResultFuture, WriterClient};
20use crate::error::Error::IllegalArgument;
21use crate::error::Result;
22use crate::metadata::{PhysicalTablePath, TableInfo, TablePath};
23use crate::row::{ColumnarRow, InternalRow};
24use arrow::array::RecordBatch;
25use std::sync::Arc;
26
27pub struct TableAppend {
28    table_path: Arc<TablePath>,
29    table_info: Arc<TableInfo>,
30    writer_client: Arc<WriterClient>,
31}
32
33impl TableAppend {
34    pub(super) fn new(
35        table_path: TablePath,
36        table_info: Arc<TableInfo>,
37        writer_client: Arc<WriterClient>,
38    ) -> Self {
39        Self {
40            table_path: Arc::new(table_path),
41            table_info,
42            writer_client,
43        }
44    }
45
46    pub fn create_writer(&self) -> Result<AppendWriter> {
47        let partition_getter = if self.table_info.is_partitioned() {
48            Some(PartitionGetter::new(
49                self.table_info.row_type(),
50                Arc::clone(self.table_info.get_partition_keys()),
51            )?)
52        } else {
53            None
54        };
55
56        Ok(AppendWriter {
57            table_path: Arc::clone(&self.table_path),
58            partition_getter,
59            writer_client: self.writer_client.clone(),
60            table_info: Arc::clone(&self.table_info),
61        })
62    }
63}
64
65pub struct AppendWriter {
66    table_path: Arc<TablePath>,
67    partition_getter: Option<PartitionGetter>,
68    writer_client: Arc<WriterClient>,
69    table_info: Arc<TableInfo>,
70}
71
72impl AppendWriter {
73    fn check_field_count<R: InternalRow>(&self, row: &R) -> Result<()> {
74        let expected = self.table_info.get_row_type().fields().len();
75        if row.get_field_count() != expected {
76            return Err(IllegalArgument {
77                message: format!(
78                    "The field count of the row does not match the table schema. \
79                     Expected: {}, Actual: {}",
80                    expected,
81                    row.get_field_count()
82                ),
83            });
84        }
85        Ok(())
86    }
87
88    /// Appends a row to the table.
89    ///
90    /// This method returns a [`WriteResultFuture`] immediately after queueing the write,
91    /// enabling fire-and-forget semantics for efficient batching.
92    ///
93    /// # Arguments
94    /// * row - the row to append.
95    ///
96    /// # Returns
97    /// A [`WriteResultFuture`] that can be awaited to wait for server acknowledgment,
98    /// or dropped for fire-and-forget behavior (use `flush()` to ensure delivery).
99    pub fn append<R: InternalRow>(&self, row: &R) -> Result<WriteResultFuture> {
100        self.check_field_count(row)?;
101        let physical_table_path = Arc::new(get_physical_path(
102            &self.table_path,
103            self.partition_getter.as_ref(),
104            row,
105        )?);
106        let record = WriteRecord::for_append(
107            Arc::clone(&self.table_info),
108            physical_table_path,
109            self.table_info.schema_id,
110            row,
111        );
112        let result_handle = self.writer_client.send(&record)?;
113        Ok(WriteResultFuture::new(result_handle))
114    }
115
116    /// Appends an Arrow RecordBatch to the table.
117    ///
118    /// This method returns a [`WriteResultFuture`] immediately after queueing the write,
119    /// enabling fire-and-forget semantics for efficient batching.
120    ///
121    /// For partitioned tables, the partition is derived from the **first row** of the batch.
122    /// Callers must ensure all rows in the batch belong to the same partition.
123    ///
124    /// # Returns
125    /// A [`WriteResultFuture`] that can be awaited to wait for server acknowledgment,
126    /// or dropped for fire-and-forget behavior (use `flush()` to ensure delivery).
127    pub fn append_arrow_batch(&self, batch: RecordBatch) -> Result<WriteResultFuture> {
128        let physical_table_path = if self.partition_getter.is_some() && batch.num_rows() > 0 {
129            let first_row = ColumnarRow::new(Arc::new(batch.clone()));
130            Arc::new(get_physical_path(
131                &self.table_path,
132                self.partition_getter.as_ref(),
133                &first_row,
134            )?)
135        } else {
136            Arc::new(PhysicalTablePath::of(Arc::clone(&self.table_path)))
137        };
138
139        let record = WriteRecord::for_append_record_batch(
140            Arc::clone(&self.table_info),
141            physical_table_path,
142            self.table_info.schema_id,
143            batch,
144        );
145        let result_handle = self.writer_client.send(&record)?;
146        Ok(WriteResultFuture::new(result_handle))
147    }
148
149    pub async fn flush(&self) -> Result<()> {
150        self.writer_client.flush().await
151    }
152}