Skip to main content

uni_bulk/
appender.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Streaming appender — row-by-row data loading for a single label.
5//!
6//! Wraps `BulkWriter` to provide an ergonomic, buffered append API for
7//! loading large volumes of vertices into a single label.
8
9use std::collections::HashMap;
10
11use uni_common::{Result, UniError, Value};
12
13use crate::bulk::{BulkBackend, BulkStats, BulkWriter, BulkWriterBuilder};
14
15/// Builder for creating a [`StreamingAppender`].
16pub struct AppenderBuilder {
17    backend: BulkBackend,
18    label: String,
19    batch_size: usize,
20    defer_vector_indexes: bool,
21    max_buffer_size_bytes: Option<usize>,
22}
23
24impl AppenderBuilder {
25    /// Create an appender builder for use within a Transaction.
26    ///
27    /// The Transaction already holds the session write guard, so the appender
28    /// uses the unguarded bulk-writer path and does not acquire or release a
29    /// guard of its own.
30    pub fn new_from_tx(backend: BulkBackend, label: &str) -> Self {
31        Self {
32            backend,
33            label: label.to_string(),
34            batch_size: 5000,
35            defer_vector_indexes: true,
36            max_buffer_size_bytes: None,
37        }
38    }
39
40    /// Set the number of rows to buffer before auto-flushing to the bulk writer.
41    ///
42    /// Default: 5000.
43    pub fn batch_size(mut self, size: usize) -> Self {
44        self.batch_size = size;
45        self
46    }
47
48    /// Set whether to defer vector index building until commit.
49    ///
50    /// Default: `true`.
51    pub fn defer_vector_indexes(mut self, defer: bool) -> Self {
52        self.defer_vector_indexes = defer;
53        self
54    }
55
56    /// Set the maximum buffer size in bytes before triggering a checkpoint.
57    ///
58    /// Default: 1 GB (from BulkWriter defaults).
59    pub fn max_buffer_size_bytes(mut self, size: usize) -> Self {
60        self.max_buffer_size_bytes = Some(size);
61        self
62    }
63
64    /// Build the streaming appender.
65    ///
66    /// The owning Transaction already holds the session write guard, so the
67    /// appender layers over an unguarded [`BulkWriter`].
68    pub fn build(self) -> Result<StreamingAppender> {
69        let mut bulk_builder = BulkWriterBuilder::new_unguarded(self.backend)
70            .batch_size(self.batch_size)
71            .defer_vector_indexes(self.defer_vector_indexes);
72        if let Some(max_buf) = self.max_buffer_size_bytes {
73            bulk_builder = bulk_builder.max_buffer_size_bytes(max_buf);
74        }
75        let writer = bulk_builder.build()?;
76
77        Ok(StreamingAppender {
78            writer: Some(writer),
79            label: self.label,
80            batch_size: self.batch_size,
81            buffer: Vec::with_capacity(self.batch_size),
82        })
83    }
84}
85
86/// A streaming appender for buffered, single-label data loading.
87///
88/// Rows are buffered internally and flushed to the underlying `BulkWriter`
89/// when the buffer reaches `batch_size`. Call [`finish()`](Self::finish) to
90/// flush remaining rows and commit.
91///
92/// The appender is always created from a Transaction, which owns the session
93/// write guard for the appender's lifetime; the appender itself acquires no
94/// guard.
95pub struct StreamingAppender {
96    writer: Option<BulkWriter>,
97    label: String,
98    batch_size: usize,
99    buffer: Vec<HashMap<String, Value>>,
100}
101
102impl StreamingAppender {
103    /// Append a single row of properties.
104    ///
105    /// The row is buffered internally. When the buffer reaches `batch_size`,
106    /// it is automatically flushed to the underlying bulk writer.
107    pub async fn append(&mut self, properties: impl Into<HashMap<String, Value>>) -> Result<()> {
108        self.buffer.push(properties.into());
109        if self.buffer.len() >= self.batch_size {
110            self.flush_buffer().await?;
111        }
112        Ok(())
113    }
114
115    /// Append an Arrow `RecordBatch` of rows.
116    ///
117    /// Each row in the batch is converted to a property map and buffered.
118    /// Columns in the batch become property keys; values are converted from
119    /// Arrow types to Uni [`Value`]s via `arrow_to_value`.
120    pub async fn write_batch(&mut self, batch: &arrow_array::RecordBatch) -> Result<()> {
121        for props in crate::bulk::record_batch_to_property_maps(batch) {
122            self.buffer.push(props);
123            if self.buffer.len() >= self.batch_size {
124                self.flush_buffer().await?;
125            }
126        }
127        Ok(())
128    }
129
130    /// Flush all buffered rows and commit the bulk writer.
131    ///
132    /// Consumes the appender. Returns statistics about the loading operation.
133    pub async fn finish(mut self) -> Result<BulkStats> {
134        self.flush_buffer().await?;
135        let writer = self
136            .writer
137            .take()
138            .ok_or_else(|| UniError::Internal(anyhow::anyhow!("Appender already finished")))?;
139        let stats = writer.commit().await.map_err(UniError::Internal)?;
140        Ok(stats)
141    }
142
143    /// Abort the appender without committing.
144    ///
145    /// Consumes the appender. Discards all buffered and previously flushed rows.
146    pub fn abort(mut self) {
147        self.buffer.clear();
148        self.writer.take(); // Drop the writer
149    }
150
151    /// Get the number of rows currently buffered (not yet flushed).
152    pub fn buffered_count(&self) -> usize {
153        self.buffer.len()
154    }
155
156    async fn flush_buffer(&mut self) -> Result<()> {
157        if self.buffer.is_empty() {
158            return Ok(());
159        }
160        let rows = std::mem::replace(&mut self.buffer, Vec::with_capacity(self.batch_size));
161        let writer = self
162            .writer
163            .as_mut()
164            .ok_or_else(|| UniError::Internal(anyhow::anyhow!("Appender already finished")))?;
165        writer
166            .insert_vertices(&self.label, rows)
167            .await
168            .map_err(UniError::Internal)?;
169        Ok(())
170    }
171}