1use std::collections::HashMap;
10use std::sync::Arc;
11use std::sync::atomic::{AtomicBool, Ordering};
12
13use uni_common::{Result, UniError, Value};
14
15use crate::api::UniInner;
16use crate::api::bulk::{BulkStats, BulkWriter, BulkWriterBuilder};
17
18pub struct AppenderBuilder {
20 db: Arc<UniInner>,
21 write_guard: Arc<AtomicBool>,
22 session_id: String,
23 is_pinned: bool,
24 guard_pre_acquired: bool,
26 label: String,
27 batch_size: usize,
28 defer_vector_indexes: bool,
29 max_buffer_size_bytes: Option<usize>,
30}
31
32impl AppenderBuilder {
33 pub(crate) fn new_from_tx(db: Arc<UniInner>, label: &str) -> Self {
38 let dummy_guard = Arc::new(AtomicBool::new(true));
40 Self {
41 db,
42 write_guard: dummy_guard,
43 session_id: String::new(),
44 is_pinned: false,
45 guard_pre_acquired: true,
46 label: label.to_string(),
47 batch_size: 5000,
48 defer_vector_indexes: true,
49 max_buffer_size_bytes: None,
50 }
51 }
52
53 pub fn batch_size(mut self, size: usize) -> Self {
57 self.batch_size = size;
58 self
59 }
60
61 pub fn defer_vector_indexes(mut self, defer: bool) -> Self {
65 self.defer_vector_indexes = defer;
66 self
67 }
68
69 pub fn max_buffer_size_bytes(mut self, size: usize) -> Self {
73 self.max_buffer_size_bytes = Some(size);
74 self
75 }
76
77 pub fn build(self) -> Result<StreamingAppender> {
82 if self.is_pinned {
83 return Err(UniError::ReadOnly {
84 operation: "appender".to_string(),
85 });
86 }
87
88 let (bulk_builder_base, session_write_guard) = if self.guard_pre_acquired {
90 (BulkWriterBuilder::new_unguarded(self.db), None)
92 } else {
93 let guard = self.write_guard.clone();
95 if guard
96 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
97 .is_err()
98 {
99 return Err(UniError::WriteContextAlreadyActive {
100 session_id: self.session_id,
101 hint: "Only one Transaction, BulkWriter, or Appender can be active per Session at a time. Commit or rollback the active one first, or create a separate Session for concurrent writes.",
102 });
103 }
104 (
105 BulkWriterBuilder::new_with_guard(self.db, guard.clone()),
106 Some(guard),
107 )
108 };
109
110 let mut bulk_builder = bulk_builder_base
112 .batch_size(self.batch_size)
113 .defer_vector_indexes(self.defer_vector_indexes);
114 if let Some(max_buf) = self.max_buffer_size_bytes {
115 bulk_builder = bulk_builder.max_buffer_size_bytes(max_buf);
116 }
117 let writer = bulk_builder.build()?;
118
119 Ok(StreamingAppender {
120 writer: Some(writer),
121 label: self.label,
122 batch_size: self.batch_size,
123 buffer: Vec::with_capacity(self.batch_size),
124 session_write_guard,
125 finished: false,
126 })
127 }
128}
129
130pub struct StreamingAppender {
144 writer: Option<BulkWriter>,
145 label: String,
146 batch_size: usize,
147 buffer: Vec<HashMap<String, Value>>,
148 session_write_guard: Option<Arc<AtomicBool>>,
149 finished: bool,
150}
151
152impl StreamingAppender {
153 pub async fn append(&mut self, properties: impl Into<HashMap<String, Value>>) -> Result<()> {
158 self.buffer.push(properties.into());
159 if self.buffer.len() >= self.batch_size {
160 self.flush_buffer().await?;
161 }
162 Ok(())
163 }
164
165 pub async fn write_batch(&mut self, batch: &arrow_array::RecordBatch) -> Result<()> {
171 let schema = batch.schema();
172 let num_rows = batch.num_rows();
173 for row_idx in 0..num_rows {
174 let mut props = HashMap::with_capacity(schema.fields().len());
175 for (col_idx, field) in schema.fields().iter().enumerate() {
176 let col = batch.column(col_idx);
177 let value =
178 uni_store::storage::arrow_convert::arrow_to_value(col.as_ref(), row_idx, None);
179 if !value.is_null() {
180 props.insert(field.name().clone(), value);
181 }
182 }
183 self.buffer.push(props);
184 if self.buffer.len() >= self.batch_size {
185 self.flush_buffer().await?;
186 }
187 }
188 Ok(())
189 }
190
191 pub async fn finish(mut self) -> Result<BulkStats> {
196 self.flush_buffer().await?;
197 let writer = self
198 .writer
199 .take()
200 .ok_or_else(|| UniError::Internal(anyhow::anyhow!("Appender already finished")))?;
201 let stats = writer.commit().await.map_err(UniError::Internal)?;
202 self.finished = true;
203 Ok(stats)
204 }
205
206 pub fn abort(mut self) {
211 self.buffer.clear();
212 self.writer.take(); self.finished = true;
214 }
215
216 pub fn buffered_count(&self) -> usize {
218 self.buffer.len()
219 }
220
221 async fn flush_buffer(&mut self) -> Result<()> {
222 if self.buffer.is_empty() {
223 return Ok(());
224 }
225 let rows = std::mem::replace(&mut self.buffer, Vec::with_capacity(self.batch_size));
226 let writer = self
227 .writer
228 .as_mut()
229 .ok_or_else(|| UniError::Internal(anyhow::anyhow!("Appender already finished")))?;
230 writer
231 .insert_vertices(&self.label, rows)
232 .await
233 .map_err(UniError::Internal)?;
234 Ok(())
235 }
236}
237
238impl Drop for StreamingAppender {
239 fn drop(&mut self) {
240 if !self.finished {
241 if let Some(guard) = &self.session_write_guard {
243 guard.store(false, Ordering::SeqCst);
244 }
245 }
246 }
247}