Skip to main content

fluss/client/write/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18mod accumulator;
19mod batch;
20mod idempotence;
21
22use crate::client::broadcast::{self as client_broadcast, BatchWriteResult, BroadcastOnceReceiver};
23use crate::error::Error;
24use crate::metadata::{PhysicalTablePath, TableInfo};
25
26use crate::row::InternalRow;
27pub use accumulator::*;
28use arrow::array::RecordBatch;
29use bytes::Bytes;
30use std::future::Future;
31use std::pin::Pin;
32use std::sync::Arc;
33use std::task::{Context, Poll};
34
35pub(crate) mod broadcast;
36mod bucket_assigner;
37
38mod sender;
39mod write_format;
40mod writer_client;
41
42pub(crate) use idempotence::IdempotenceManager;
43pub use write_format::WriteFormat;
44pub(crate) use writer_client::WriterClient;
45
46#[allow(dead_code)]
47pub struct WriteRecord<'a> {
48    record: Record<'a>,
49    physical_table_path: Arc<PhysicalTablePath>,
50    bucket_key: Option<Bytes>,
51    schema_id: i32,
52    write_format: WriteFormat,
53    table_info: Arc<TableInfo>,
54}
55
56impl<'a> WriteRecord<'a> {
57    pub fn record(&self) -> &Record<'a> {
58        &self.record
59    }
60
61    pub fn physical_table_path(&self) -> &Arc<PhysicalTablePath> {
62        &self.physical_table_path
63    }
64
65    /// Minimum batch capacity needed to fit this record, including batch header
66    /// overhead. Used to size memory reservations and KV write limits so that
67    /// oversized records don't panic on append.
68    pub fn estimated_record_size(&self) -> usize {
69        match &self.record {
70            Record::Kv(kv) => {
71                let record_size = crate::record::kv::KvRecord::size_of(
72                    &kv.key,
73                    kv.row_bytes.as_ref().map(|rb| rb.as_slice()),
74                );
75                crate::record::kv::RECORD_BATCH_HEADER_SIZE + record_size
76            }
77            Record::Log(_) => 0, // Arrow batches use record count, not byte size
78        }
79    }
80}
81
82pub enum Record<'a> {
83    Log(LogWriteRecord<'a>),
84    Kv(KvWriteRecord<'a>),
85}
86
87pub enum LogWriteRecord<'a> {
88    InternalRow(&'a dyn InternalRow),
89    RecordBatch(Arc<RecordBatch>),
90}
91
92#[derive(Clone)]
93pub enum RowBytes<'a> {
94    Borrowed(&'a [u8]),
95    Owned(Bytes),
96}
97
98impl<'a> RowBytes<'a> {
99    pub fn as_slice(&self) -> &[u8] {
100        match self {
101            RowBytes::Borrowed(slice) => slice,
102            RowBytes::Owned(bytes) => bytes.as_ref(),
103        }
104    }
105}
106
107pub struct KvWriteRecord<'a> {
108    key: Bytes,
109    target_columns: Option<Arc<Vec<usize>>>,
110    row_bytes: Option<RowBytes<'a>>,
111}
112
113impl<'a> KvWriteRecord<'a> {
114    fn new(
115        key: Bytes,
116        target_columns: Option<Arc<Vec<usize>>>,
117        row_bytes: Option<RowBytes<'a>>,
118    ) -> Self {
119        KvWriteRecord {
120            key,
121            target_columns,
122            row_bytes,
123        }
124    }
125
126    pub fn row_bytes(&self) -> Option<&[u8]> {
127        self.row_bytes.as_ref().map(|rb| rb.as_slice())
128    }
129}
130
131impl<'a> WriteRecord<'a> {
132    pub fn for_append(
133        table_info: Arc<TableInfo>,
134        physical_table_path: Arc<PhysicalTablePath>,
135        schema_id: i32,
136        row: &'a dyn InternalRow,
137    ) -> Self {
138        Self {
139            table_info,
140            record: Record::Log(LogWriteRecord::InternalRow(row)),
141            physical_table_path,
142            bucket_key: None,
143            schema_id,
144            write_format: WriteFormat::ArrowLog,
145        }
146    }
147
148    pub fn for_append_record_batch(
149        table_info: Arc<TableInfo>,
150        physical_table_path: Arc<PhysicalTablePath>,
151        schema_id: i32,
152        row: RecordBatch,
153    ) -> Self {
154        Self {
155            table_info,
156            record: Record::Log(LogWriteRecord::RecordBatch(Arc::new(row))),
157            physical_table_path,
158            bucket_key: None,
159            schema_id,
160            write_format: WriteFormat::ArrowLog,
161        }
162    }
163
164    #[allow(clippy::too_many_arguments)]
165    pub fn for_upsert(
166        table_info: Arc<TableInfo>,
167        physical_table_path: Arc<PhysicalTablePath>,
168        schema_id: i32,
169        key: Bytes,
170        bucket_key: Option<Bytes>,
171        write_format: WriteFormat,
172        target_columns: Option<Arc<Vec<usize>>>,
173        row_bytes: Option<RowBytes<'a>>,
174    ) -> Self {
175        Self {
176            table_info,
177            record: Record::Kv(KvWriteRecord::new(key, target_columns, row_bytes)),
178            physical_table_path,
179            bucket_key,
180            schema_id,
181            write_format,
182        }
183    }
184}
185
186#[derive(Debug, Clone)]
187pub struct ResultHandle {
188    receiver: BroadcastOnceReceiver<BatchWriteResult>,
189}
190
191impl ResultHandle {
192    pub fn new(receiver: BroadcastOnceReceiver<BatchWriteResult>) -> Self {
193        ResultHandle { receiver }
194    }
195
196    /// Force-complete with an error if not already completed.
197    pub(crate) fn fail(&self, error: client_broadcast::Error) {
198        self.receiver.fail(error);
199    }
200
201    pub async fn wait(&self) -> Result<BatchWriteResult, Error> {
202        self.receiver
203            .receive()
204            .await
205            .map_err(|e| Error::UnexpectedError {
206                message: format!("Fail to wait write result {e:?}"),
207                source: None,
208            })
209    }
210
211    pub fn result(&self, batch_result: BatchWriteResult) -> Result<(), Error> {
212        batch_result.map_err(|e| match e {
213            client_broadcast::Error::WriteFailed { code, message } => Error::FlussAPIError {
214                api_error: crate::rpc::ApiError { code, message },
215            },
216            client_broadcast::Error::Client { message } => Error::UnexpectedError {
217                message,
218                source: None,
219            },
220            client_broadcast::Error::Dropped => Error::UnexpectedError {
221                message: "Fail to get write result because broadcast was dropped.".to_string(),
222                source: None,
223            },
224        })
225    }
226}
227
228/// A future that represents a pending write operation.
229///
230/// This type implements [`Future`], allowing users to either:
231/// 1. Await immediately to block on acknowledgment: `writer.upsert(&row)?.await?`
232/// 2. Fire-and-forget with later flush: `writer.upsert(&row)?; writer.flush().await?`
233///
234/// This pattern is similar to rdkafka's `DeliveryFuture` and allows for efficient batching
235/// when users don't need immediate per-record acknowledgment.
236pub struct WriteResultFuture {
237    inner: Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>,
238}
239
240impl std::fmt::Debug for WriteResultFuture {
241    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
242        f.debug_struct("WriteResultFuture").finish_non_exhaustive()
243    }
244}
245
246impl WriteResultFuture {
247    /// Create a new WriteResultFuture from a ResultHandle.
248    pub fn new(result_handle: ResultHandle) -> Self {
249        Self {
250            inner: Box::pin(async move {
251                let result = result_handle.wait().await?;
252                result_handle.result(result)
253            }),
254        }
255    }
256}
257
258impl Future for WriteResultFuture {
259    type Output = Result<(), Error>;
260
261    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
262        self.inner.as_mut().poll(cx)
263    }
264}