uni_bulk/appender.rs
1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Streaming appender — row-by-row data loading for a single label.
5//!
6//! Wraps `BulkWriter` to provide an ergonomic, buffered append API for
7//! loading large volumes of vertices into a single label.
8
9use std::collections::HashMap;
10
11use uni_common::{Result, UniError, Value};
12
13use crate::bulk::{BulkBackend, BulkStats, BulkWriter, BulkWriterBuilder};
14
15/// Builder for creating a [`StreamingAppender`].
16pub struct AppenderBuilder {
17 backend: BulkBackend,
18 label: String,
19 batch_size: usize,
20 defer_vector_indexes: bool,
21 max_buffer_size_bytes: Option<usize>,
22}
23
24impl AppenderBuilder {
25 /// Create an appender builder for use within a Transaction.
26 ///
27 /// The Transaction already holds the session write guard, so the appender
28 /// uses the unguarded bulk-writer path and does not acquire or release a
29 /// guard of its own.
30 pub fn new_from_tx(backend: BulkBackend, label: &str) -> Self {
31 Self {
32 backend,
33 label: label.to_string(),
34 batch_size: 5000,
35 defer_vector_indexes: true,
36 max_buffer_size_bytes: None,
37 }
38 }
39
40 /// Set the number of rows to buffer before auto-flushing to the bulk writer.
41 ///
42 /// Default: 5000.
43 pub fn batch_size(mut self, size: usize) -> Self {
44 self.batch_size = size;
45 self
46 }
47
48 /// Set whether to defer vector index building until commit.
49 ///
50 /// Default: `true`.
51 pub fn defer_vector_indexes(mut self, defer: bool) -> Self {
52 self.defer_vector_indexes = defer;
53 self
54 }
55
56 /// Set the maximum buffer size in bytes before triggering a checkpoint.
57 ///
58 /// Default: 1 GB (from BulkWriter defaults).
59 pub fn max_buffer_size_bytes(mut self, size: usize) -> Self {
60 self.max_buffer_size_bytes = Some(size);
61 self
62 }
63
64 /// Build the streaming appender.
65 ///
66 /// The owning Transaction already holds the session write guard, so the
67 /// appender layers over an unguarded [`BulkWriter`].
68 pub fn build(self) -> Result<StreamingAppender> {
69 let mut bulk_builder = BulkWriterBuilder::new_unguarded(self.backend)
70 .batch_size(self.batch_size)
71 .defer_vector_indexes(self.defer_vector_indexes);
72 if let Some(max_buf) = self.max_buffer_size_bytes {
73 bulk_builder = bulk_builder.max_buffer_size_bytes(max_buf);
74 }
75 let writer = bulk_builder.build()?;
76
77 Ok(StreamingAppender {
78 writer: Some(writer),
79 label: self.label,
80 batch_size: self.batch_size,
81 buffer: Vec::with_capacity(self.batch_size),
82 })
83 }
84}
85
86/// A streaming appender for buffered, single-label data loading.
87///
88/// Rows are buffered internally and flushed to the underlying `BulkWriter`
89/// when the buffer reaches `batch_size`. Call [`finish()`](Self::finish) to
90/// flush remaining rows and commit.
91///
92/// The appender is always created from a Transaction, which owns the session
93/// write guard for the appender's lifetime; the appender itself acquires no
94/// guard.
95pub struct StreamingAppender {
96 writer: Option<BulkWriter>,
97 label: String,
98 batch_size: usize,
99 buffer: Vec<HashMap<String, Value>>,
100}
101
102impl StreamingAppender {
103 /// Append a single row of properties.
104 ///
105 /// The row is buffered internally. When the buffer reaches `batch_size`,
106 /// it is automatically flushed to the underlying bulk writer.
107 pub async fn append(&mut self, properties: impl Into<HashMap<String, Value>>) -> Result<()> {
108 self.buffer.push(properties.into());
109 if self.buffer.len() >= self.batch_size {
110 self.flush_buffer().await?;
111 }
112 Ok(())
113 }
114
115 /// Append an Arrow `RecordBatch` of rows.
116 ///
117 /// Each row in the batch is converted to a property map and buffered.
118 /// Columns in the batch become property keys; values are converted from
119 /// Arrow types to Uni [`Value`]s via `arrow_to_value`.
120 pub async fn write_batch(&mut self, batch: &arrow_array::RecordBatch) -> Result<()> {
121 for props in crate::bulk::record_batch_to_property_maps(batch) {
122 self.buffer.push(props);
123 if self.buffer.len() >= self.batch_size {
124 self.flush_buffer().await?;
125 }
126 }
127 Ok(())
128 }
129
130 /// Flush all buffered rows and commit the bulk writer.
131 ///
132 /// Consumes the appender. Returns statistics about the loading operation.
133 pub async fn finish(mut self) -> Result<BulkStats> {
134 self.flush_buffer().await?;
135 let writer = self
136 .writer
137 .take()
138 .ok_or_else(|| UniError::Internal(anyhow::anyhow!("Appender already finished")))?;
139 let stats = writer.commit().await.map_err(UniError::Internal)?;
140 Ok(stats)
141 }
142
143 /// Abort the appender without committing.
144 ///
145 /// Consumes the appender. Discards all buffered and previously flushed rows.
146 pub fn abort(mut self) {
147 self.buffer.clear();
148 self.writer.take(); // Drop the writer
149 }
150
151 /// Get the number of rows currently buffered (not yet flushed).
152 pub fn buffered_count(&self) -> usize {
153 self.buffer.len()
154 }
155
156 async fn flush_buffer(&mut self) -> Result<()> {
157 if self.buffer.is_empty() {
158 return Ok(());
159 }
160 let rows = std::mem::replace(&mut self.buffer, Vec::with_capacity(self.batch_size));
161 let writer = self
162 .writer
163 .as_mut()
164 .ok_or_else(|| UniError::Internal(anyhow::anyhow!("Appender already finished")))?;
165 writer
166 .insert_vertices(&self.label, rows)
167 .await
168 .map_err(UniError::Internal)?;
169 Ok(())
170 }
171}