Skip to main content

uni_db/api/
appender.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Streaming appender — row-by-row data loading for a single label.
5//!
6//! Wraps `BulkWriter` to provide an ergonomic, buffered append API for
7//! loading large volumes of vertices into a single label.
8
9use std::collections::HashMap;
10use std::sync::Arc;
11use std::sync::atomic::{AtomicBool, Ordering};
12
13use uni_common::{Result, UniError, Value};
14
15use crate::api::UniInner;
16use crate::api::bulk::{BulkStats, BulkWriter, BulkWriterBuilder};
17
18/// Builder for creating a [`StreamingAppender`].
19pub struct AppenderBuilder {
20    db: Arc<UniInner>,
21    write_guard: Arc<AtomicBool>,
22    session_id: String,
23    is_pinned: bool,
24    /// Whether the write guard was already acquired by the caller (Transaction).
25    guard_pre_acquired: bool,
26    label: String,
27    batch_size: usize,
28    defer_vector_indexes: bool,
29    max_buffer_size_bytes: Option<usize>,
30}
31
32impl AppenderBuilder {
33    /// Create an appender builder for use within a Transaction.
34    ///
35    /// The Transaction already holds the session write guard, so the appender
36    /// skips guard acquisition and does not release it on finish/drop.
37    pub(crate) fn new_from_tx(db: Arc<UniInner>, label: &str) -> Self {
38        // Dummy guard — never acquired/released. Transaction owns the real guard.
39        let dummy_guard = Arc::new(AtomicBool::new(true));
40        Self {
41            db,
42            write_guard: dummy_guard,
43            session_id: String::new(),
44            is_pinned: false,
45            guard_pre_acquired: true,
46            label: label.to_string(),
47            batch_size: 5000,
48            defer_vector_indexes: true,
49            max_buffer_size_bytes: None,
50        }
51    }
52
53    /// Set the number of rows to buffer before auto-flushing to the bulk writer.
54    ///
55    /// Default: 5000.
56    pub fn batch_size(mut self, size: usize) -> Self {
57        self.batch_size = size;
58        self
59    }
60
61    /// Set whether to defer vector index building until commit.
62    ///
63    /// Default: `true`.
64    pub fn defer_vector_indexes(mut self, defer: bool) -> Self {
65        self.defer_vector_indexes = defer;
66        self
67    }
68
69    /// Set the maximum buffer size in bytes before triggering a checkpoint.
70    ///
71    /// Default: 1 GB (from BulkWriter defaults).
72    pub fn max_buffer_size_bytes(mut self, size: usize) -> Self {
73        self.max_buffer_size_bytes = Some(size);
74        self
75    }
76
77    /// Build the streaming appender.
78    ///
79    /// Acquires the session's write guard (mutual exclusion with transactions
80    /// and other bulk writers) unless the guard was pre-acquired by a Transaction.
81    pub fn build(self) -> Result<StreamingAppender> {
82        if self.is_pinned {
83            return Err(UniError::ReadOnly {
84                operation: "appender".to_string(),
85            });
86        }
87
88        // Determine guard ownership and create the appropriate BulkWriterBuilder.
89        let (bulk_builder_base, session_write_guard) = if self.guard_pre_acquired {
90            // Transaction path: guard already held, use unguarded BulkWriter.
91            (BulkWriterBuilder::new_unguarded(self.db), None)
92        } else {
93            // Session path: acquire the guard.
94            let guard = self.write_guard.clone();
95            if guard
96                .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
97                .is_err()
98            {
99                return Err(UniError::WriteContextAlreadyActive {
100                    session_id: self.session_id,
101                    hint: "Only one Transaction, BulkWriter, or Appender can be active per Session at a time. Commit or rollback the active one first, or create a separate Session for concurrent writes.",
102                });
103            }
104            (
105                BulkWriterBuilder::new_with_guard(self.db, guard.clone()),
106                Some(guard),
107            )
108        };
109
110        // Apply shared configuration.
111        let mut bulk_builder = bulk_builder_base
112            .batch_size(self.batch_size)
113            .defer_vector_indexes(self.defer_vector_indexes);
114        if let Some(max_buf) = self.max_buffer_size_bytes {
115            bulk_builder = bulk_builder.max_buffer_size_bytes(max_buf);
116        }
117        let writer = bulk_builder.build()?;
118
119        Ok(StreamingAppender {
120            writer: Some(writer),
121            label: self.label,
122            batch_size: self.batch_size,
123            buffer: Vec::with_capacity(self.batch_size),
124            session_write_guard,
125            finished: false,
126        })
127    }
128}
129
130/// A streaming appender for buffered, single-label data loading.
131///
132/// Rows are buffered internally and flushed to the underlying `BulkWriter`
133/// when the buffer reaches `batch_size`. Call [`finish()`](Self::finish) to
134/// flush remaining rows and commit.
135///
136/// # Write Guard
137///
138/// The appender holds the session's write guard for its entire lifetime
139/// (unless created from a Transaction, where the Transaction manages the guard).
140/// Only one write context (transaction, bulk writer, or appender) can be
141/// active per session at a time. The guard is released on `finish()`,
142/// `abort()`, or `drop()`.
143pub struct StreamingAppender {
144    writer: Option<BulkWriter>,
145    label: String,
146    batch_size: usize,
147    buffer: Vec<HashMap<String, Value>>,
148    session_write_guard: Option<Arc<AtomicBool>>,
149    finished: bool,
150}
151
152impl StreamingAppender {
153    /// Append a single row of properties.
154    ///
155    /// The row is buffered internally. When the buffer reaches `batch_size`,
156    /// it is automatically flushed to the underlying bulk writer.
157    pub async fn append(&mut self, properties: impl Into<HashMap<String, Value>>) -> Result<()> {
158        self.buffer.push(properties.into());
159        if self.buffer.len() >= self.batch_size {
160            self.flush_buffer().await?;
161        }
162        Ok(())
163    }
164
165    /// Append an Arrow `RecordBatch` of rows.
166    ///
167    /// Each row in the batch is converted to a property map and buffered.
168    /// Columns in the batch become property keys; values are converted from
169    /// Arrow types to Uni [`Value`]s via `arrow_to_value`.
170    pub async fn write_batch(&mut self, batch: &arrow_array::RecordBatch) -> Result<()> {
171        let schema = batch.schema();
172        let num_rows = batch.num_rows();
173        for row_idx in 0..num_rows {
174            let mut props = HashMap::with_capacity(schema.fields().len());
175            for (col_idx, field) in schema.fields().iter().enumerate() {
176                let col = batch.column(col_idx);
177                let value =
178                    uni_store::storage::arrow_convert::arrow_to_value(col.as_ref(), row_idx, None);
179                if !value.is_null() {
180                    props.insert(field.name().clone(), value);
181                }
182            }
183            self.buffer.push(props);
184            if self.buffer.len() >= self.batch_size {
185                self.flush_buffer().await?;
186            }
187        }
188        Ok(())
189    }
190
191    /// Flush all buffered rows and commit the bulk writer.
192    ///
193    /// Consumes the appender. Returns statistics about the loading operation.
194    /// The write guard is released when this method returns (or on error via Drop).
195    pub async fn finish(mut self) -> Result<BulkStats> {
196        self.flush_buffer().await?;
197        let writer = self
198            .writer
199            .take()
200            .ok_or_else(|| UniError::Internal(anyhow::anyhow!("Appender already finished")))?;
201        let stats = writer.commit().await.map_err(UniError::Internal)?;
202        self.finished = true;
203        Ok(stats)
204    }
205
206    /// Abort the appender without committing.
207    ///
208    /// Consumes the appender. Discards all buffered and previously flushed rows.
209    /// Releases the write guard.
210    pub fn abort(mut self) {
211        self.buffer.clear();
212        self.writer.take(); // Drop the writer
213        self.finished = true;
214    }
215
216    /// Get the number of rows currently buffered (not yet flushed).
217    pub fn buffered_count(&self) -> usize {
218        self.buffer.len()
219    }
220
221    async fn flush_buffer(&mut self) -> Result<()> {
222        if self.buffer.is_empty() {
223            return Ok(());
224        }
225        let rows = std::mem::replace(&mut self.buffer, Vec::with_capacity(self.batch_size));
226        let writer = self
227            .writer
228            .as_mut()
229            .ok_or_else(|| UniError::Internal(anyhow::anyhow!("Appender already finished")))?;
230        writer
231            .insert_vertices(&self.label, rows)
232            .await
233            .map_err(UniError::Internal)?;
234        Ok(())
235    }
236}
237
238impl Drop for StreamingAppender {
239    fn drop(&mut self) {
240        if !self.finished {
241            // Release write guard — buffered data is lost
242            if let Some(guard) = &self.session_write_guard {
243                guard.store(false, Ordering::SeqCst);
244            }
245        }
246    }
247}