1mod accumulator;
19mod batch;
20mod idempotence;
21
22use crate::client::broadcast::{self as client_broadcast, BatchWriteResult, BroadcastOnceReceiver};
23use crate::error::Error;
24use crate::metadata::{PhysicalTablePath, TableInfo};
25
26use crate::row::InternalRow;
27pub use accumulator::*;
28use arrow::array::RecordBatch;
29use bytes::Bytes;
30use std::future::Future;
31use std::pin::Pin;
32use std::sync::Arc;
33use std::task::{Context, Poll};
34
35pub(crate) mod broadcast;
36mod bucket_assigner;
37
38mod sender;
39mod write_format;
40mod writer_client;
41
42pub(crate) use idempotence::IdempotenceManager;
43pub use write_format::WriteFormat;
44pub(crate) use writer_client::WriterClient;
45
46#[allow(dead_code)]
47pub struct WriteRecord<'a> {
48 record: Record<'a>,
49 physical_table_path: Arc<PhysicalTablePath>,
50 bucket_key: Option<Bytes>,
51 schema_id: i32,
52 write_format: WriteFormat,
53 table_info: Arc<TableInfo>,
54}
55
56impl<'a> WriteRecord<'a> {
57 pub fn record(&self) -> &Record<'a> {
58 &self.record
59 }
60
61 pub fn physical_table_path(&self) -> &Arc<PhysicalTablePath> {
62 &self.physical_table_path
63 }
64
65 pub fn estimated_record_size(&self) -> usize {
69 match &self.record {
70 Record::Kv(kv) => {
71 let record_size = crate::record::kv::KvRecord::size_of(
72 &kv.key,
73 kv.row_bytes.as_ref().map(|rb| rb.as_slice()),
74 );
75 crate::record::kv::RECORD_BATCH_HEADER_SIZE + record_size
76 }
77 Record::Log(_) => 0, }
79 }
80}
81
82pub enum Record<'a> {
83 Log(LogWriteRecord<'a>),
84 Kv(KvWriteRecord<'a>),
85}
86
87pub enum LogWriteRecord<'a> {
88 InternalRow(&'a dyn InternalRow),
89 RecordBatch(Arc<RecordBatch>),
90}
91
92#[derive(Clone)]
93pub enum RowBytes<'a> {
94 Borrowed(&'a [u8]),
95 Owned(Bytes),
96}
97
98impl<'a> RowBytes<'a> {
99 pub fn as_slice(&self) -> &[u8] {
100 match self {
101 RowBytes::Borrowed(slice) => slice,
102 RowBytes::Owned(bytes) => bytes.as_ref(),
103 }
104 }
105}
106
107pub struct KvWriteRecord<'a> {
108 key: Bytes,
109 target_columns: Option<Arc<Vec<usize>>>,
110 row_bytes: Option<RowBytes<'a>>,
111}
112
113impl<'a> KvWriteRecord<'a> {
114 fn new(
115 key: Bytes,
116 target_columns: Option<Arc<Vec<usize>>>,
117 row_bytes: Option<RowBytes<'a>>,
118 ) -> Self {
119 KvWriteRecord {
120 key,
121 target_columns,
122 row_bytes,
123 }
124 }
125
126 pub fn row_bytes(&self) -> Option<&[u8]> {
127 self.row_bytes.as_ref().map(|rb| rb.as_slice())
128 }
129}
130
131impl<'a> WriteRecord<'a> {
132 pub fn for_append(
133 table_info: Arc<TableInfo>,
134 physical_table_path: Arc<PhysicalTablePath>,
135 schema_id: i32,
136 row: &'a dyn InternalRow,
137 ) -> Self {
138 Self {
139 table_info,
140 record: Record::Log(LogWriteRecord::InternalRow(row)),
141 physical_table_path,
142 bucket_key: None,
143 schema_id,
144 write_format: WriteFormat::ArrowLog,
145 }
146 }
147
148 pub fn for_append_record_batch(
149 table_info: Arc<TableInfo>,
150 physical_table_path: Arc<PhysicalTablePath>,
151 schema_id: i32,
152 row: RecordBatch,
153 ) -> Self {
154 Self {
155 table_info,
156 record: Record::Log(LogWriteRecord::RecordBatch(Arc::new(row))),
157 physical_table_path,
158 bucket_key: None,
159 schema_id,
160 write_format: WriteFormat::ArrowLog,
161 }
162 }
163
164 #[allow(clippy::too_many_arguments)]
165 pub fn for_upsert(
166 table_info: Arc<TableInfo>,
167 physical_table_path: Arc<PhysicalTablePath>,
168 schema_id: i32,
169 key: Bytes,
170 bucket_key: Option<Bytes>,
171 write_format: WriteFormat,
172 target_columns: Option<Arc<Vec<usize>>>,
173 row_bytes: Option<RowBytes<'a>>,
174 ) -> Self {
175 Self {
176 table_info,
177 record: Record::Kv(KvWriteRecord::new(key, target_columns, row_bytes)),
178 physical_table_path,
179 bucket_key,
180 schema_id,
181 write_format,
182 }
183 }
184}
185
186#[derive(Debug, Clone)]
187pub struct ResultHandle {
188 receiver: BroadcastOnceReceiver<BatchWriteResult>,
189}
190
191impl ResultHandle {
192 pub fn new(receiver: BroadcastOnceReceiver<BatchWriteResult>) -> Self {
193 ResultHandle { receiver }
194 }
195
196 pub(crate) fn fail(&self, error: client_broadcast::Error) {
198 self.receiver.fail(error);
199 }
200
201 pub async fn wait(&self) -> Result<BatchWriteResult, Error> {
202 self.receiver
203 .receive()
204 .await
205 .map_err(|e| Error::UnexpectedError {
206 message: format!("Fail to wait write result {e:?}"),
207 source: None,
208 })
209 }
210
211 pub fn result(&self, batch_result: BatchWriteResult) -> Result<(), Error> {
212 batch_result.map_err(|e| match e {
213 client_broadcast::Error::WriteFailed { code, message } => Error::FlussAPIError {
214 api_error: crate::rpc::ApiError { code, message },
215 },
216 client_broadcast::Error::Client { message } => Error::UnexpectedError {
217 message,
218 source: None,
219 },
220 client_broadcast::Error::Dropped => Error::UnexpectedError {
221 message: "Fail to get write result because broadcast was dropped.".to_string(),
222 source: None,
223 },
224 })
225 }
226}
227
228pub struct WriteResultFuture {
237 inner: Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>,
238}
239
240impl std::fmt::Debug for WriteResultFuture {
241 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
242 f.debug_struct("WriteResultFuture").finish_non_exhaustive()
243 }
244}
245
246impl WriteResultFuture {
247 pub fn new(result_handle: ResultHandle) -> Self {
249 Self {
250 inner: Box::pin(async move {
251 let result = result_handle.wait().await?;
252 result_handle.result(result)
253 }),
254 }
255 }
256}
257
258impl Future for WriteResultFuture {
259 type Output = Result<(), Error>;
260
261 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
262 self.inner.as_mut().poll(cx)
263 }
264}