Skip to main content

uni_bulk/
bulk.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Bulk loading API for high-throughput data ingestion.
5//!
6//! This module provides `BulkWriter` for efficiently loading large amounts of
7//! vertices and edges while deferring index updates until commit time.
8//!
9//! ## Async Index Building
10//!
11//! By default, `commit()` blocks until all indexes are rebuilt. For large datasets,
12//! you can enable async index building to return immediately while indexes are
13//! built in the background:
14//!
15//! ```ignore
16//! let stats = db.bulk_writer()
17//!     .async_indexes(true)
18//!     .build()?
19//!     .insert_vertices(...)
20//!     .await?
21//!     .commit()
22//!     .await?;
23//!
24//! // Data is queryable immediately (may use full scans)
25//! // Check index status later:
26//! let status = db.index_rebuild_status().await?;
27//! ```
28
29use anyhow::{Result, anyhow};
30use chrono::Utc;
31use std::cmp::Ordering;
32use std::collections::{HashMap, HashSet};
33use std::sync::Arc;
34use std::time::{Duration, Instant};
35use uni_common::UniConfig;
36use uni_common::Value;
37use uni_common::core::id::{Eid, Vid};
38use uni_common::core::schema::SchemaManager;
39use uni_common::core::snapshot::{EdgeSnapshot, LabelSnapshot, SnapshotManifest};
40use uni_common::{Properties, UniError};
41use uni_plugin_host::shutdown::ShutdownHandle;
42use uni_store::runtime::writer::Writer;
43use uni_store::storage::delta::{L1Entry, Op};
44use uni_store::storage::main_edge::MainEdgeDataset;
45use uni_store::storage::main_vertex::MainVertexDataset;
46use uni_store::storage::manager::StorageManager;
47use uni_store::storage::{IndexManager, IndexRebuildManager};
48use uuid::Uuid;
49
50/// Concrete handle bundle injected into the bulk write path.
51///
52/// `BulkWriter`/`BulkWriterBuilder`/`StreamingAppender` need only field access
53/// to the database's storage, writer, schema, shutdown coordinator, and config —
54/// they call no methods on the owning `uni_db::Uni` inner state. The uni-db
55/// driver constructs a `BulkBackend` from its `UniInner` fields and hands it in,
56/// avoiding a dependency cycle.
57#[derive(Clone)]
58pub struct BulkBackend {
59    /// Storage manager (datasets, backend, snapshot manager).
60    pub storage: Arc<StorageManager>,
61    /// Writer for ID allocation. `None` on a read-only database.
62    pub writer: Option<Arc<Writer>>,
63    /// Schema manager (labels, edge types, constraints, index metadata).
64    pub schema: Arc<SchemaManager>,
65    /// Shutdown coordinator — tracks background index-rebuild tasks.
66    pub shutdown: Arc<ShutdownHandle>,
67    /// Database configuration (e.g. index-rebuild tuning).
68    pub config: UniConfig,
69}
70
71/// Trait for types that can be converted to property maps for bulk insertion.
72///
73/// Enables `insert_vertices` to accept both `Vec<HashMap<String, Value>>`
74/// and `RecordBatch` (Arrow columnar data).
75pub trait IntoArrow {
76    /// Convert to a vector of property maps.
77    fn into_property_maps(self) -> Vec<HashMap<String, Value>>;
78}
79
80impl IntoArrow for Vec<HashMap<String, Value>> {
81    fn into_property_maps(self) -> Vec<HashMap<String, Value>> {
82        self
83    }
84}
85
86impl IntoArrow for arrow_array::RecordBatch {
87    fn into_property_maps(self) -> Vec<HashMap<String, Value>> {
88        record_batch_to_property_maps(&self)
89    }
90}
91
92/// Convert each row of an Arrow `RecordBatch` to a property map.
93///
94/// Columns become property keys; values are converted from Arrow types to Uni
95/// [`Value`]s via `arrow_to_value`. Null values are omitted from the map.
96pub fn record_batch_to_property_maps(
97    batch: &arrow_array::RecordBatch,
98) -> Vec<HashMap<String, Value>> {
99    let schema = batch.schema();
100    let num_rows = batch.num_rows();
101    let mut rows = Vec::with_capacity(num_rows);
102    for row_idx in 0..num_rows {
103        let mut props = HashMap::with_capacity(schema.fields().len());
104        for (col_idx, field) in schema.fields().iter().enumerate() {
105            let col = batch.column(col_idx);
106            let value =
107                uni_store::storage::arrow_convert::arrow_to_value(col.as_ref(), row_idx, None);
108            if !value.is_null() {
109                props.insert(field.name().clone(), value);
110            }
111        }
112        rows.push(props);
113    }
114    rows
115}
116
117/// Builder for configuring a bulk writer.
118pub struct BulkWriterBuilder {
119    backend: BulkBackend,
120    config: BulkConfig,
121    progress_callback: Option<Box<dyn Fn(BulkProgress) + Send>>,
122}
123
124impl BulkWriterBuilder {
125    /// Create a bulk writer builder.
126    ///
127    /// Used by `Transaction::bulk_writer()` and `AppenderBuilder` — the
128    /// Transaction already holds the session write guard, so the BulkWriter
129    /// neither acquires nor releases one.
130    pub fn new_unguarded(backend: BulkBackend) -> Self {
131        Self {
132            backend,
133            config: BulkConfig::default(),
134            progress_callback: None,
135        }
136    }
137
138    /// Set whether to defer vector index building until commit.
139    pub fn defer_vector_indexes(mut self, defer: bool) -> Self {
140        self.config.defer_vector_indexes = defer;
141        self
142    }
143
144    /// Set whether to defer scalar index building until commit.
145    pub fn defer_scalar_indexes(mut self, defer: bool) -> Self {
146        self.config.defer_scalar_indexes = defer;
147        self
148    }
149
150    /// Set the batch size for buffering before flush.
151    pub fn batch_size(mut self, size: usize) -> Self {
152        self.config.batch_size = size;
153        self
154    }
155
156    /// Set a progress callback for monitoring bulk load progress.
157    pub fn on_progress<F: Fn(BulkProgress) + Send + 'static>(mut self, f: F) -> Self {
158        self.progress_callback = Some(Box::new(f));
159        self
160    }
161
162    /// Build indexes asynchronously after commit.
163    ///
164    /// When enabled, `commit()` returns immediately after data is written,
165    /// and indexes are rebuilt in the background. The data is queryable
166    /// immediately but queries may use full scans until indexes are ready.
167    ///
168    /// Use `Uni::index_rebuild_status()` to check progress.
169    ///
170    /// Default: `false` (blocking index rebuild)
171    pub fn async_indexes(mut self, async_: bool) -> Self {
172        self.config.async_indexes = async_;
173        self
174    }
175
176    /// Set whether to validate constraints during bulk load.
177    ///
178    /// When enabled (default), BulkWriter validates NOT NULL, UNIQUE, and CHECK
179    /// constraints before each flush, matching the behavior of regular Writer.
180    /// Set to `false` for trusted data sources to improve performance.
181    ///
182    /// Default: `true`
183    pub fn validate_constraints(mut self, validate: bool) -> Self {
184        self.config.validate_constraints = validate;
185        self
186    }
187
188    /// Set the maximum buffer size before triggering a checkpoint flush.
189    ///
190    /// When the in-memory buffer exceeds this size, a checkpoint is triggered
191    /// to flush data to storage. This allows bulk loading of arbitrarily large
192    /// datasets while controlling memory usage.
193    ///
194    /// Default: 1 GB (1_073_741_824 bytes)
195    pub fn max_buffer_size_bytes(mut self, size: usize) -> Self {
196        self.config.max_buffer_size_bytes = size;
197        self
198    }
199
200    /// Build the bulk writer.
201    ///
202    /// # Errors
203    ///
204    /// Returns an error if the database is not writable.
205    pub fn build(self) -> Result<BulkWriter> {
206        if self.backend.writer.is_none() {
207            return Err(anyhow!("BulkWriter requires a writable database instance"));
208        }
209
210        Ok(BulkWriter {
211            backend: self.backend,
212            config: self.config,
213            progress_callback: self.progress_callback,
214            stats: BulkStats::default(),
215            start_time: Instant::now(),
216            pending_vertices: HashMap::new(),
217            pending_edges: HashMap::new(),
218            touched_labels: HashSet::new(),
219            touched_edge_types: HashSet::new(),
220            initial_table_versions: HashMap::new(),
221            buffer_size_bytes: 0,
222            committed: false,
223        })
224    }
225}
226
227/// Configuration for bulk loading operations.
228pub struct BulkConfig {
229    /// Whether to defer vector index building until commit.
230    pub defer_vector_indexes: bool,
231    /// Whether to defer scalar index building until commit.
232    pub defer_scalar_indexes: bool,
233    /// Number of rows to buffer before flushing to storage.
234    pub batch_size: usize,
235    /// Whether to build indexes asynchronously after commit.
236    pub async_indexes: bool,
237    /// Whether to validate constraints (NOT NULL, UNIQUE, CHECK) during bulk load.
238    ///
239    /// Default: `true`. Set to `false` to skip validation for trusted data sources.
240    pub validate_constraints: bool,
241    /// Maximum buffer size in bytes before triggering a checkpoint flush.
242    ///
243    /// Default: 1 GB (1_073_741_824 bytes). When buffer size exceeds this limit,
244    /// a checkpoint is triggered to flush data to storage while continuing to
245    /// accept new data.
246    pub max_buffer_size_bytes: usize,
247}
248
249impl Default for BulkConfig {
250    fn default() -> Self {
251        Self {
252            defer_vector_indexes: true,
253            defer_scalar_indexes: true,
254            batch_size: 10_000,
255            async_indexes: false,
256            validate_constraints: true,
257            max_buffer_size_bytes: 1_073_741_824, // 1 GB
258        }
259    }
260}
261
262#[derive(Debug, Clone)]
263pub struct BulkProgress {
264    pub phase: BulkPhase,
265    pub rows_processed: usize,
266    pub total_rows: Option<usize>,
267    pub current_label: Option<String>,
268    pub elapsed: Duration,
269}
270
271#[derive(Debug, Clone)]
272pub enum BulkPhase {
273    Inserting,
274    RebuildingIndexes { label: String },
275    Finalizing,
276}
277
278#[derive(Debug, Clone, Default)]
279pub struct BulkStats {
280    pub vertices_inserted: usize,
281    pub edges_inserted: usize,
282    pub indexes_rebuilt: usize,
283    pub duration: Duration,
284    pub index_build_duration: Duration,
285    /// Task IDs for async index rebuilds (populated when `async_indexes` is true).
286    pub index_task_ids: Vec<String>,
287    /// True if index building was deferred to background (async mode).
288    pub indexes_pending: bool,
289}
290
291/// Edge data for bulk insertion.
292///
293/// Contains source/destination vertex IDs and properties.
294#[derive(Debug, Clone)]
295pub struct EdgeData {
296    /// Source vertex ID.
297    pub src_vid: Vid,
298    /// Destination vertex ID.
299    pub dst_vid: Vid,
300    /// Edge properties.
301    pub properties: Properties,
302}
303
304impl EdgeData {
305    /// Create new edge data.
306    pub fn new(src_vid: Vid, dst_vid: Vid, properties: Properties) -> Self {
307        Self {
308            src_vid,
309            dst_vid,
310            properties,
311        }
312    }
313}
314
315/// Bulk writer for high-throughput data ingestion.
316///
317/// Buffers vertices and edges, deferring index updates until commit.
318/// Supports constraint validation, automatic checkpointing when buffer limits
319/// are exceeded, and proper rollback via LanceDB version tracking.
320///
321/// Use `abort()` to discard uncommitted changes and roll back storage to its
322/// pre-bulk-load state.
323pub struct BulkWriter {
324    backend: BulkBackend,
325    config: BulkConfig,
326    progress_callback: Option<Box<dyn Fn(BulkProgress) + Send>>,
327    stats: BulkStats,
328    start_time: Instant,
329    // Buffered data per label/type
330    pending_vertices: HashMap<String, Vec<(Vid, Properties)>>,
331    pending_edges: HashMap<String, Vec<L1Entry>>,
332    // Track what was written (for index rebuild)
333    touched_labels: HashSet<String>,
334    touched_edge_types: HashSet<String>,
335    // Track LanceDB table versions before bulk load started (for abort rollback)
336    // Key: table name, Value: version before first write (None = table created during bulk load)
337    initial_table_versions: HashMap<String, Option<u64>>,
338    // Current buffer size in bytes (approximate)
339    buffer_size_bytes: usize,
340    committed: bool,
341}
342
343impl BulkWriter {
344    /// Returns a snapshot of the current bulk load statistics.
345    /// Updated after each batch flush.
346    pub fn stats(&self) -> &BulkStats {
347        &self.stats
348    }
349
350    /// Returns the set of vertex labels that have been written to.
351    pub fn touched_labels(&self) -> Vec<String> {
352        self.touched_labels.iter().cloned().collect()
353    }
354
355    /// Returns the set of edge types that have been written to.
356    pub fn touched_edge_types(&self) -> Vec<String> {
357        self.touched_edge_types.iter().cloned().collect()
358    }
359
360    /// Returns the current timestamp in microseconds since Unix epoch.
361    fn get_current_timestamp_micros() -> i64 {
362        use std::time::{SystemTime, UNIX_EPOCH};
363        SystemTime::now()
364            .duration_since(UNIX_EPOCH)
365            .map(|d| d.as_micros() as i64)
366            .unwrap_or(0)
367    }
368
369    /// Insert vertices in bulk.
370    ///
371    /// The vertices are buffered until `batch_size` is reached, then written to storage.
372    /// When constraint validation is enabled, constraints are checked before each flush.
373    /// When the buffer size exceeds `max_buffer_size_bytes`, a checkpoint is triggered.
374    ///
375    /// # Errors
376    ///
377    /// Returns an error if:
378    /// - The label is not found in the schema
379    /// - Constraint validation fails (when enabled)
380    /// - Storage write fails
381    pub async fn insert_vertices(
382        &mut self,
383        label: &str,
384        vertices: impl IntoArrow,
385    ) -> Result<Vec<Vid>> {
386        let vertices = vertices.into_property_maps();
387        let schema = self.backend.schema.schema();
388        // Validate label exists in schema
389        schema
390            .labels
391            .get(label)
392            .ok_or_else(|| UniError::LabelNotFound {
393                label: label.to_string(),
394            })?;
395        // Validate constraints before buffering (if enabled)
396        if self.config.validate_constraints {
397            self.validate_vertex_batch_constraints(label, &vertices)
398                .await?;
399        }
400
401        // Allocate VIDs (batched for performance)
402        let vids = {
403            let writer = self.backend.writer.as_ref().unwrap();
404            writer
405                .allocate_vids(vertices.len())
406                .await
407                .map_err(UniError::Internal)?
408        };
409
410        // Track buffer size and add to buffer
411        let buffer = self.pending_vertices.entry(label.to_string()).or_default();
412        for (i, props) in vertices.into_iter().enumerate() {
413            self.buffer_size_bytes += Self::estimate_properties_size(&props);
414            buffer.push((vids[i], props));
415        }
416
417        self.touched_labels.insert(label.to_string());
418
419        // Check if we need to checkpoint based on buffer size
420        if self.buffer_size_bytes >= self.config.max_buffer_size_bytes {
421            self.checkpoint().await?;
422        } else {
423            // Otherwise, check batch size threshold for this label only
424            self.check_flush_vertices(label).await?;
425        }
426
427        self.stats.vertices_inserted += vids.len();
428        self.report_progress(
429            BulkPhase::Inserting,
430            self.stats.vertices_inserted,
431            Some(label.to_string()),
432        );
433
434        Ok(vids)
435    }
436
437    /// Estimate the size of a properties map in bytes.
438    fn estimate_properties_size(props: &Properties) -> usize {
439        let mut size = 0;
440        for (key, value) in props {
441            size += key.len();
442            size += Self::estimate_value_size(value);
443        }
444        size
445    }
446
447    /// Estimate the size of a value in bytes.
448    fn estimate_value_size(value: &Value) -> usize {
449        match value {
450            Value::Null => 1,
451            Value::Bool(_) => 1,
452            Value::Int(_) | Value::Float(_) => 8,
453            Value::String(s) => s.len(),
454            Value::Bytes(b) => b.len(),
455            Value::List(arr) => arr.iter().map(Self::estimate_value_size).sum::<usize>() + 8,
456            Value::Map(obj) => {
457                obj.iter()
458                    .map(|(k, v)| k.len() + Self::estimate_value_size(v))
459                    .sum::<usize>()
460                    + 8
461            }
462            Value::Vector(v) => v.len() * 4,
463            _ => 16, // Node, Edge, Path
464        }
465    }
466
467    /// Validate constraints for a batch of vertices before insertion.
468    ///
469    /// Checks NOT NULL, UNIQUE, and CHECK constraints. For UNIQUE constraints,
470    /// validates both within the batch and against already-buffered data.
471    async fn validate_vertex_batch_constraints(
472        &self,
473        label: &str,
474        vertices: &[Properties],
475    ) -> Result<()> {
476        let schema = self.backend.schema.schema();
477
478        // Check NOT NULL and CHECK constraints for each vertex
479        if let Some(props_meta) = schema.properties.get(label) {
480            for (idx, props) in vertices.iter().enumerate() {
481                // NOT NULL constraints
482                for (prop_name, meta) in props_meta {
483                    if !meta.nullable && props.get(prop_name).is_none_or(|v| v.is_null()) {
484                        return Err(anyhow!(
485                            "NOT NULL constraint violation at row {}: property '{}' cannot be null for label '{}'",
486                            idx,
487                            prop_name,
488                            label
489                        ));
490                    }
491                }
492            }
493        }
494
495        // Check explicit constraints (UNIQUE, CHECK)
496        for constraint in &schema.constraints {
497            if !constraint.enabled {
498                continue;
499            }
500            match &constraint.target {
501                uni_common::core::schema::ConstraintTarget::Label(l) if l == label => {}
502                _ => continue,
503            }
504
505            match &constraint.constraint_type {
506                uni_common::core::schema::ConstraintType::Unique {
507                    properties: unique_props,
508                } => {
509                    // Check for duplicates within the batch
510                    let mut seen_keys: HashSet<String> = HashSet::new();
511                    for (idx, props) in vertices.iter().enumerate() {
512                        let key = self.compute_unique_key(unique_props, props);
513                        if let Some(k) = key
514                            && !seen_keys.insert(k.clone())
515                        {
516                            return Err(anyhow!(
517                                "UNIQUE constraint violation at row {}: duplicate key '{}' in batch",
518                                idx,
519                                k
520                            ));
521                        }
522                    }
523
524                    // Check against already-buffered data
525                    if let Some(buffered) = self.pending_vertices.get(label) {
526                        for (idx, props) in vertices.iter().enumerate() {
527                            let key = self.compute_unique_key(unique_props, props);
528                            if let Some(k) = key {
529                                for (_, buffered_props) in buffered {
530                                    let buffered_key =
531                                        self.compute_unique_key(unique_props, buffered_props);
532                                    if buffered_key.as_ref() == Some(&k) {
533                                        return Err(anyhow!(
534                                            "UNIQUE constraint violation at row {}: key '{}' conflicts with buffered data",
535                                            idx,
536                                            k
537                                        ));
538                                    }
539                                }
540                            }
541                        }
542                    }
543                }
544                uni_common::core::schema::ConstraintType::Exists { property } => {
545                    for (idx, props) in vertices.iter().enumerate() {
546                        if props.get(property).is_none_or(|v| v.is_null()) {
547                            return Err(anyhow!(
548                                "EXISTS constraint violation at row {}: property '{}' must exist",
549                                idx,
550                                property
551                            ));
552                        }
553                    }
554                }
555                uni_common::core::schema::ConstraintType::Check { expression } => {
556                    for (idx, props) in vertices.iter().enumerate() {
557                        if !self.evaluate_check_expression(expression, props)? {
558                            return Err(anyhow!(
559                                "CHECK constraint '{}' violated at row {}: expression '{}' evaluated to false",
560                                constraint.name,
561                                idx,
562                                expression
563                            ));
564                        }
565                    }
566                }
567                _ => {}
568            }
569        }
570
571        Ok(())
572    }
573
574    /// Compute a unique key string from properties for UNIQUE constraint checking.
575    fn compute_unique_key(&self, unique_props: &[String], props: &Properties) -> Option<String> {
576        let mut parts = Vec::new();
577        for prop in unique_props {
578            match props.get(prop) {
579                Some(v) if !v.is_null() => parts.push(v.to_string()),
580                _ => return None, // Missing property means can't enforce uniqueness
581            }
582        }
583        Some(parts.join(":"))
584    }
585
586    /// Evaluate a simple CHECK constraint expression.
587    fn evaluate_check_expression(&self, expression: &str, properties: &Properties) -> Result<bool> {
588        let parts: Vec<&str> = expression.split_whitespace().collect();
589        if parts.len() != 3 {
590            // Complex expression - allow for now
591            return Ok(true);
592        }
593
594        let prop_part = parts[0].trim_start_matches('(');
595        let prop_name = if let Some(idx) = prop_part.find('.') {
596            &prop_part[idx + 1..]
597        } else {
598            prop_part
599        };
600
601        let op = parts[1];
602        let val_str = parts[2].trim_end_matches(')');
603
604        let prop_val = match properties.get(prop_name) {
605            Some(v) => v,
606            None => return Ok(true), // Missing property passes CHECK
607        };
608
609        // Parse target value
610        let target_val = if (val_str.starts_with('\'') && val_str.ends_with('\''))
611            || (val_str.starts_with('"') && val_str.ends_with('"'))
612        {
613            Value::String(val_str[1..val_str.len() - 1].to_string())
614        } else if let Ok(n) = val_str.parse::<i64>() {
615            Value::Int(n)
616        } else if let Ok(n) = val_str.parse::<f64>() {
617            Value::Float(n)
618        } else if let Ok(b) = val_str.parse::<bool>() {
619            Value::Bool(b)
620        } else {
621            Value::String(val_str.to_string())
622        };
623
624        match op {
625            "=" | "==" => Ok(prop_val == &target_val),
626            "!=" | "<>" => Ok(prop_val != &target_val),
627            ">" => self.compare_values(prop_val, &target_val).map(|c| c > 0),
628            "<" => self.compare_values(prop_val, &target_val).map(|c| c < 0),
629            ">=" => self.compare_values(prop_val, &target_val).map(|c| c >= 0),
630            "<=" => self.compare_values(prop_val, &target_val).map(|c| c <= 0),
631            _ => Ok(true), // Unknown operator - allow
632        }
633    }
634
635    /// Compare two values, returning -1, 0, or 1.
636    ///
637    /// Incomparable floats (NaN) compare as equal (0), matching the prior
638    /// branch-based implementation.
639    fn compare_values(&self, a: &Value, b: &Value) -> Result<i8> {
640        let ordering = match (a, b) {
641            (Value::Int(n1), Value::Int(n2)) => n1.cmp(n2),
642            (Value::Float(f1), Value::Float(f2)) => f1.partial_cmp(f2).unwrap_or(Ordering::Equal),
643            (Value::Int(n), Value::Float(f)) => {
644                (*n as f64).partial_cmp(f).unwrap_or(Ordering::Equal)
645            }
646            (Value::Float(f), Value::Int(n)) => {
647                f.partial_cmp(&(*n as f64)).unwrap_or(Ordering::Equal)
648            }
649            (Value::String(s1), Value::String(s2)) => s1.cmp(s2),
650            _ => {
651                return Err(anyhow!(
652                    "Cannot compare incompatible types: {:?} vs {:?}",
653                    a,
654                    b
655                ));
656            }
657        };
658        Ok(ordering as i8)
659    }
660
661    /// Checkpoint: flush all pending data to storage.
662    ///
663    /// Called automatically when buffer size exceeds `max_buffer_size_bytes`.
664    /// Flushes all buffered vertices and edges, then resets the buffer size counter.
665    async fn checkpoint(&mut self) -> Result<()> {
666        log::debug!(
667            "Checkpoint triggered at {} bytes (limit: {})",
668            self.buffer_size_bytes,
669            self.config.max_buffer_size_bytes
670        );
671
672        // Flush all pending vertices
673        let labels: Vec<String> = self.pending_vertices.keys().cloned().collect();
674        for label in labels {
675            self.flush_vertices_buffer(&label).await?;
676        }
677
678        // Flush all pending edges
679        let edge_types: Vec<String> = self.pending_edges.keys().cloned().collect();
680        for edge_type in edge_types {
681            self.flush_edges_buffer(&edge_type).await?;
682        }
683
684        // Reset buffer size
685        self.buffer_size_bytes = 0;
686
687        Ok(())
688    }
689
690    // Helper to flush vertex buffer if full
691    async fn check_flush_vertices(&mut self, label: &str) -> Result<()> {
692        let should_flush = self
693            .pending_vertices
694            .get(label)
695            .is_some_and(|buf| buf.len() >= self.config.batch_size);
696
697        if should_flush {
698            self.flush_vertices_buffer(label).await?;
699        }
700        Ok(())
701    }
702
703    /// Flush vertex buffer to LanceDB storage.
704    ///
705    /// Records the initial table version before first write for rollback support.
706    /// Writes to both per-label table and main vertices table.
707    async fn flush_vertices_buffer(&mut self, label: &str) -> Result<()> {
708        if let Some(vertices) = self.pending_vertices.remove(label) {
709            if vertices.is_empty() {
710                return Ok(());
711            }
712
713            // Record initial version for abort rollback (only once per table)
714            let table_name = uni_store::backend::table_names::vertex_table_name(label);
715            if !self.initial_table_versions.contains_key(&table_name) {
716                let backend = self.backend.storage.backend();
717                let version = backend
718                    .get_table_version(&table_name)
719                    .await
720                    .map_err(UniError::Internal)?;
721                self.initial_table_versions.insert(table_name, version);
722            }
723
724            // Record main vertices table version for rollback
725            let main_table_name =
726                uni_store::backend::table_names::main_vertex_table_name().to_string();
727            if !self.initial_table_versions.contains_key(&main_table_name) {
728                let backend = self.backend.storage.backend();
729                let version = backend
730                    .get_table_version(&main_table_name)
731                    .await
732                    .map_err(UniError::Internal)?;
733                self.initial_table_versions
734                    .insert(main_table_name.clone(), version);
735            }
736
737            let ds = self
738                .backend
739                .storage
740                .vertex_dataset(label)
741                .map_err(UniError::Internal)?;
742            let schema = self.backend.schema.schema();
743
744            let deleted = vec![false; vertices.len()];
745            let versions = vec![1; vertices.len()]; // Version 1 for bulk load
746
747            // Generate timestamps for this batch
748            let now = Self::get_current_timestamp_micros();
749            let mut created_at: HashMap<Vid, i64> = HashMap::new();
750            let mut updated_at: HashMap<Vid, i64> = HashMap::new();
751            for (vid, _) in &vertices {
752                created_at.insert(*vid, now);
753                updated_at.insert(*vid, now);
754            }
755
756            // Build per-label and main-vertex entries from the 2-tuple input.
757            // Both tables need labels attached; compute once per vertex.
758            let labels = vec![label.to_string()];
759            let vertices_with_labels: Vec<(Vid, Vec<String>, Properties)> = vertices
760                .iter()
761                .map(|(vid, props)| (*vid, labels.clone(), props.clone()))
762                .collect();
763
764            let batch = ds
765                .build_record_batch_with_timestamps(
766                    &vertices_with_labels,
767                    &deleted,
768                    &versions,
769                    &schema,
770                    Some(&created_at),
771                    Some(&updated_at),
772                )
773                .map_err(UniError::Internal)?;
774
775            // Write to per-label table via backend
776            let backend = self.backend.storage.backend();
777            ds.write_batch(backend, batch, &schema)
778                .await
779                .map_err(UniError::Internal)?;
780
781            // Create default scalar indexes (_vid, _uid) which are critical for basic function
782            ds.ensure_default_indexes(backend)
783                .await
784                .map_err(UniError::Internal)?;
785
786            // Dual-write to main vertices table
787            let main_vertices: Vec<(Vid, Vec<String>, Properties, bool, u64)> =
788                vertices_with_labels
789                    .into_iter()
790                    .map(|(vid, lbls, props)| (vid, lbls, props, false, 1u64))
791                    .collect();
792
793            if !main_vertices.is_empty() {
794                let main_batch = MainVertexDataset::build_record_batch(
795                    &main_vertices,
796                    Some(&created_at),
797                    Some(&updated_at),
798                )
799                .map_err(UniError::Internal)?;
800
801                MainVertexDataset::write_batch(backend, main_batch)
802                    .await
803                    .map_err(UniError::Internal)?;
804
805                MainVertexDataset::ensure_default_indexes(backend)
806                    .await
807                    .map_err(UniError::Internal)?;
808            }
809        }
810        Ok(())
811    }
812
813    /// Insert edges in bulk.
814    ///
815    /// Edges are buffered until `batch_size` is reached, then written to storage.
816    /// When the buffer size exceeds `max_buffer_size_bytes`, a checkpoint is triggered.
817    /// Indexes are NOT updated during these writes.
818    ///
819    /// # Errors
820    ///
821    /// Returns an error if the edge type is not found in the schema or if
822    /// storage write fails.
823    pub async fn insert_edges(
824        &mut self,
825        edge_type: &str,
826        edges: Vec<EdgeData>,
827    ) -> Result<Vec<Eid>> {
828        let schema = self.backend.schema.schema();
829        // Validate the edge type exists, but its id is no longer needed:
830        // `allocate_eids` does not partition by type.
831        schema
832            .edge_types
833            .get(edge_type)
834            .ok_or_else(|| UniError::EdgeTypeNotFound {
835                edge_type: edge_type.to_string(),
836            })?;
837
838        // Allocate EIDs in one IdAllocator mutex acquisition.
839        let eids = {
840            let writer = self.backend.writer.as_ref().unwrap();
841            writer
842                .allocate_eids(edges.len())
843                .await
844                .map_err(UniError::Internal)?
845        };
846
847        // Convert to L1Entry format and track buffer size
848        let now = Self::get_current_timestamp_micros();
849        let mut added_size = 0usize;
850        let entries: Vec<L1Entry> = edges
851            .into_iter()
852            .enumerate()
853            .map(|(i, edge)| {
854                // Estimate size for buffer tracking (16 bytes for VIDs + 8 for EID + properties)
855                added_size += 32 + Self::estimate_properties_size(&edge.properties);
856                L1Entry {
857                    src_vid: edge.src_vid,
858                    dst_vid: edge.dst_vid,
859                    eid: eids[i],
860                    op: Op::Insert,
861                    version: 1,
862                    properties: edge.properties,
863                    created_at: Some(now),
864                    updated_at: Some(now),
865                }
866            })
867            .collect();
868        self.buffer_size_bytes += added_size;
869        self.pending_edges
870            .entry(edge_type.to_string())
871            .or_default()
872            .extend(entries);
873
874        self.touched_edge_types.insert(edge_type.to_string());
875
876        // Check if we need to checkpoint based on buffer size
877        if self.buffer_size_bytes >= self.config.max_buffer_size_bytes {
878            self.checkpoint().await?;
879        } else {
880            self.check_flush_edges(edge_type).await?;
881        }
882
883        self.stats.edges_inserted += eids.len();
884        self.report_progress(
885            BulkPhase::Inserting,
886            self.stats.vertices_inserted + self.stats.edges_inserted,
887            Some(edge_type.to_string()),
888        );
889
890        Ok(eids)
891    }
892
893    /// Check and flush edge buffer if full.
894    async fn check_flush_edges(&mut self, edge_type: &str) -> Result<()> {
895        let should_flush = self
896            .pending_edges
897            .get(edge_type)
898            .is_some_and(|buf| buf.len() >= self.config.batch_size);
899
900        if should_flush {
901            self.flush_edges_buffer(edge_type).await?;
902        }
903        Ok(())
904    }
905
906    /// Flush edge buffer to delta datasets.
907    ///
908    /// Records initial table versions before first write for rollback support.
909    /// Writes to both per-type delta tables and main edges table.
910    #[expect(
911        clippy::map_entry,
912        reason = "async code between contains_key and insert"
913    )]
914    async fn flush_edges_buffer(&mut self, edge_type: &str) -> Result<()> {
915        if let Some(entries) = self.pending_edges.remove(edge_type) {
916            if entries.is_empty() {
917                return Ok(());
918            }
919
920            let schema = self.backend.schema.schema();
921            let backend = self.backend.storage.backend();
922
923            // Record initial versions for abort rollback (FWD and BWD tables)
924            let fwd_table_name =
925                uni_store::backend::table_names::delta_table_name(edge_type, "fwd");
926            if !self.initial_table_versions.contains_key(&fwd_table_name) {
927                let version = backend
928                    .get_table_version(&fwd_table_name)
929                    .await
930                    .map_err(UniError::Internal)?;
931                self.initial_table_versions.insert(fwd_table_name, version);
932            }
933            let bwd_table_name =
934                uni_store::backend::table_names::delta_table_name(edge_type, "bwd");
935            if !self.initial_table_versions.contains_key(&bwd_table_name) {
936                let version = backend
937                    .get_table_version(&bwd_table_name)
938                    .await
939                    .map_err(UniError::Internal)?;
940                self.initial_table_versions.insert(bwd_table_name, version);
941            }
942
943            // Record main edges table version for rollback
944            let main_edge_table_name =
945                uni_store::backend::table_names::main_edge_table_name().to_string();
946            if !self
947                .initial_table_versions
948                .contains_key(&main_edge_table_name)
949            {
950                let version = backend
951                    .get_table_version(&main_edge_table_name)
952                    .await
953                    .map_err(UniError::Internal)?;
954                self.initial_table_versions
955                    .insert(main_edge_table_name.clone(), version);
956            }
957
958            // Write to FWD delta (sorted by src_vid)
959            let mut fwd_entries = entries.clone();
960            fwd_entries.sort_by_key(|e| e.src_vid);
961            let fwd_ds = self
962                .backend
963                .storage
964                .delta_dataset(edge_type, "fwd")
965                .map_err(UniError::Internal)?;
966            let fwd_batch = fwd_ds
967                .build_record_batch(&fwd_entries, &schema)
968                .map_err(UniError::Internal)?;
969            let backend = self.backend.storage.backend();
970            fwd_ds
971                .write_run(backend, fwd_batch)
972                .await
973                .map_err(UniError::Internal)?;
974            fwd_ds
975                .ensure_eid_index(backend)
976                .await
977                .map_err(UniError::Internal)?;
978
979            // Write to BWD delta (sorted by dst_vid)
980            let mut bwd_entries = entries.clone();
981            bwd_entries.sort_by_key(|e| e.dst_vid);
982            let bwd_ds = self
983                .backend
984                .storage
985                .delta_dataset(edge_type, "bwd")
986                .map_err(UniError::Internal)?;
987            let bwd_batch = bwd_ds
988                .build_record_batch(&bwd_entries, &schema)
989                .map_err(UniError::Internal)?;
990            bwd_ds
991                .write_run(backend, bwd_batch)
992                .await
993                .map_err(UniError::Internal)?;
994            bwd_ds
995                .ensure_eid_index(backend)
996                .await
997                .map_err(UniError::Internal)?;
998
999            // Dual-write to main edges table
1000            let mut edge_created_at: HashMap<Eid, i64> = HashMap::new();
1001            let mut edge_updated_at: HashMap<Eid, i64> = HashMap::new();
1002            let main_edges: Vec<(Eid, Vid, Vid, String, Properties, bool, u64)> = entries
1003                .iter()
1004                .map(|e| {
1005                    let deleted = matches!(e.op, Op::Delete);
1006                    if let Some(ts) = e.created_at {
1007                        edge_created_at.insert(e.eid, ts);
1008                    }
1009                    if let Some(ts) = e.updated_at {
1010                        edge_updated_at.insert(e.eid, ts);
1011                    }
1012                    (
1013                        e.eid,
1014                        e.src_vid,
1015                        e.dst_vid,
1016                        edge_type.to_string(),
1017                        e.properties.clone(),
1018                        deleted,
1019                        e.version,
1020                    )
1021                })
1022                .collect();
1023
1024            if !main_edges.is_empty() {
1025                let main_batch = MainEdgeDataset::build_record_batch(
1026                    &main_edges,
1027                    Some(&edge_created_at),
1028                    Some(&edge_updated_at),
1029                )
1030                .map_err(UniError::Internal)?;
1031
1032                MainEdgeDataset::write_batch(self.backend.storage.backend(), main_batch)
1033                    .await
1034                    .map_err(UniError::Internal)?;
1035
1036                MainEdgeDataset::ensure_default_indexes(self.backend.storage.backend())
1037                    .await
1038                    .map_err(UniError::Internal)?;
1039            }
1040        }
1041        Ok(())
1042    }
1043
1044    /// Commit all pending data and rebuild indexes.
1045    ///
1046    /// Flushes remaining buffered data, rebuilds deferred indexes, and updates
1047    /// the snapshot manifest.
1048    ///
1049    /// # Errors
1050    ///
1051    /// Returns an error if flushing, index rebuilding, or snapshot update fails.
1052    pub async fn commit(mut self) -> Result<BulkStats> {
1053        // 1. Flush remaining vertex buffers
1054        let labels: Vec<String> = self.pending_vertices.keys().cloned().collect();
1055        for label in labels {
1056            self.flush_vertices_buffer(&label).await?;
1057        }
1058
1059        // 2. Flush remaining edge buffers
1060        let edge_types: Vec<String> = self.pending_edges.keys().cloned().collect();
1061        for edge_type in edge_types {
1062            self.flush_edges_buffer(&edge_type).await?;
1063        }
1064
1065        let index_start = Instant::now();
1066
1067        // 3. Rebuild indexes for vertices
1068        if self.config.defer_vector_indexes || self.config.defer_scalar_indexes {
1069            let labels_to_rebuild: Vec<String> = self.touched_labels.iter().cloned().collect();
1070
1071            if self.config.async_indexes && !labels_to_rebuild.is_empty() {
1072                // Async mode: mark affected indexes as Stale before scheduling
1073                let schema = self.backend.schema.schema();
1074                for label in &labels_to_rebuild {
1075                    for idx in &schema.indexes {
1076                        if idx.label() == label.as_str() {
1077                            let _ = self.backend.schema.update_index_metadata(idx.name(), |m| {
1078                                m.status = uni_common::core::schema::IndexStatus::Stale;
1079                            });
1080                        }
1081                    }
1082                }
1083
1084                let rebuild_manager = IndexRebuildManager::new(
1085                    self.backend.storage.clone(),
1086                    self.backend.schema.clone(),
1087                    self.backend.config.index_rebuild.clone(),
1088                )
1089                .await
1090                .map_err(UniError::Internal)?;
1091
1092                let task_ids = rebuild_manager
1093                    .schedule(labels_to_rebuild)
1094                    .await
1095                    .map_err(UniError::Internal)?;
1096
1097                self.stats.index_task_ids = task_ids;
1098                self.stats.indexes_pending = true;
1099
1100                let manager = Arc::new(rebuild_manager);
1101                let handle = manager.start_background_worker(self.backend.shutdown.subscribe());
1102                self.backend.shutdown.track_task(handle);
1103            } else {
1104                // Sync mode: rebuild indexes blocking
1105                for label in &labels_to_rebuild {
1106                    self.report_progress(
1107                        BulkPhase::RebuildingIndexes {
1108                            label: label.clone(),
1109                        },
1110                        self.stats.vertices_inserted + self.stats.edges_inserted,
1111                        Some(label.clone()),
1112                    );
1113                    let idx_mgr = IndexManager::new(
1114                        self.backend.storage.base_path(),
1115                        self.backend.storage.schema_manager_arc(),
1116                    );
1117                    idx_mgr
1118                        .rebuild_indexes_for_label(label)
1119                        .await
1120                        .map_err(UniError::Internal)?;
1121                    self.stats.indexes_rebuilt += 1;
1122
1123                    // Update index metadata after successful sync rebuild
1124                    let now = Utc::now();
1125                    let vtable_name = uni_store::backend::table_names::vertex_table_name(label);
1126                    let row_count = self
1127                        .backend
1128                        .storage
1129                        .backend()
1130                        .count_rows(&vtable_name, None)
1131                        .await
1132                        .ok()
1133                        .map(|c| c as u64);
1134
1135                    let schema = self.backend.schema.schema();
1136                    for idx in &schema.indexes {
1137                        if idx.label() == label.as_str() {
1138                            let _ = self.backend.schema.update_index_metadata(idx.name(), |m| {
1139                                m.status = uni_common::core::schema::IndexStatus::Online;
1140                                m.last_built_at = Some(now);
1141                                if let Some(count) = row_count {
1142                                    m.row_count_at_build = Some(count);
1143                                }
1144                            });
1145                        }
1146                    }
1147                }
1148            }
1149        }
1150
1151        self.stats.index_build_duration = index_start.elapsed();
1152
1153        // 4. Update Snapshot
1154        self.report_progress(
1155            BulkPhase::Finalizing,
1156            self.stats.vertices_inserted + self.stats.edges_inserted,
1157            None,
1158        );
1159
1160        // Load latest snapshot or create new
1161        let mut manifest = self
1162            .backend
1163            .storage
1164            .snapshot_manager()
1165            .load_latest_snapshot()
1166            .await
1167            .map_err(UniError::Internal)?
1168            .unwrap_or_else(|| {
1169                SnapshotManifest::new(
1170                    Uuid::new_v4().to_string(),
1171                    self.backend.schema.schema().schema_version,
1172                )
1173            });
1174
1175        // Update Manifest
1176        let parent_id = manifest.snapshot_id.clone();
1177        manifest.parent_snapshot = Some(parent_id);
1178        manifest.snapshot_id = Uuid::new_v4().to_string();
1179        manifest.created_at = Utc::now();
1180
1181        // Update counts and versions for touched labels (vertices)
1182        let backend = self.backend.storage.backend();
1183        for label in &self.touched_labels {
1184            let vtable_name = uni_store::backend::table_names::vertex_table_name(label);
1185            let count = backend
1186                .count_rows(&vtable_name, None)
1187                .await
1188                .map_err(UniError::Internal)?;
1189
1190            let current_snap =
1191                manifest
1192                    .vertices
1193                    .entry(label.to_string())
1194                    .or_insert(LabelSnapshot {
1195                        version: 0,
1196                        count: 0,
1197                        lance_version: 0,
1198                    });
1199            current_snap.count = count as u64;
1200            // LanceDB tables don't expose Lance version directly
1201            current_snap.lance_version = 0;
1202        }
1203
1204        // Update counts and versions for touched edge types
1205        for edge_type in &self.touched_edge_types {
1206            let delta_name = uni_store::backend::table_names::delta_table_name(edge_type, "fwd");
1207            if let Ok(count) = backend.count_rows(&delta_name, None).await {
1208                let current_snap =
1209                    manifest
1210                        .edges
1211                        .entry(edge_type.to_string())
1212                        .or_insert(EdgeSnapshot {
1213                            version: 0,
1214                            count: 0,
1215                            lance_version: 0,
1216                        });
1217                current_snap.count = count as u64;
1218                // LanceDB tables don't expose Lance version directly
1219                current_snap.lance_version = 0;
1220            }
1221        }
1222
1223        // Save Snapshot
1224        self.backend
1225            .storage
1226            .snapshot_manager()
1227            .save_snapshot(&manifest)
1228            .await
1229            .map_err(UniError::Internal)?;
1230        self.backend
1231            .storage
1232            .snapshot_manager()
1233            .set_latest_snapshot(&manifest.snapshot_id)
1234            .await
1235            .map_err(UniError::Internal)?;
1236
1237        // Save schema with updated index metadata
1238        self.backend
1239            .schema
1240            .save()
1241            .await
1242            .map_err(UniError::Internal)?;
1243
1244        // Warm adjacency CSRs for all edge types written during bulk import
1245        // so that subsequent traversal queries can find the edges.
1246        let schema = self.backend.storage.schema_manager().schema();
1247        for edge_type_name in &self.touched_edge_types {
1248            if let Some(meta) = schema.edge_types.get(edge_type_name.as_str()) {
1249                let type_id = meta.id;
1250                for &dir in uni_store::storage::direction::Direction::Both.expand() {
1251                    let _ = self
1252                        .backend
1253                        .storage
1254                        .warm_adjacency(type_id, dir, None)
1255                        .await;
1256                }
1257            }
1258        }
1259
1260        self.committed = true;
1261        self.stats.duration = self.start_time.elapsed();
1262        Ok(self.stats.clone())
1263    }
1264
1265    /// Abort bulk loading and roll back all changes.
1266    ///
1267    /// Rolls back LanceDB tables to their pre-bulk-load versions using LanceDB's
1268    /// version API. Tables created during the bulk load are dropped. Buffered
1269    /// data that hasn't been flushed is discarded.
1270    ///
1271    /// # Errors
1272    ///
1273    /// Returns an error if rollback fails. The error message includes details
1274    /// about which tables failed to roll back.
1275    pub async fn abort(mut self) -> Result<()> {
1276        if self.committed {
1277            return Err(anyhow!("Cannot abort: bulk load already committed"));
1278        }
1279
1280        // 1. Clear pending buffers (not yet flushed to storage)
1281        self.pending_vertices.clear();
1282        self.pending_edges.clear();
1283        self.buffer_size_bytes = 0;
1284
1285        // 2. Roll back each modified table to its initial version
1286        let backend = self.backend.storage.backend();
1287        let mut rollback_errors = Vec::new();
1288        let mut rolled_back_count = 0;
1289        let mut dropped_count = 0;
1290
1291        for (table_name, initial_version) in &self.initial_table_versions {
1292            match initial_version {
1293                Some(version) => {
1294                    // Table existed before - rollback to initial version
1295                    match backend.rollback_table(table_name, *version).await {
1296                        Ok(()) => {
1297                            log::info!("Rolled back table '{}' to version {}", table_name, version);
1298                            rolled_back_count += 1;
1299                        }
1300                        Err(e) => {
1301                            rollback_errors.push(format!("{}: {}", table_name, e));
1302                        }
1303                    }
1304                }
1305                None => {
1306                    // Table was created during bulk load - drop it
1307                    match backend.drop_table(table_name).await {
1308                        Ok(()) => {
1309                            log::info!("Dropped table '{}' (created during bulk load)", table_name);
1310                            dropped_count += 1;
1311                        }
1312                        Err(e) => {
1313                            rollback_errors.push(format!("{}: {}", table_name, e));
1314                        }
1315                    }
1316                }
1317            }
1318        }
1319
1320        // 3. Clear backend cache to ensure next read picks up rolled-back state
1321        self.backend.storage.backend().clear_cache();
1322
1323        if rollback_errors.is_empty() {
1324            log::info!(
1325                "Bulk load aborted successfully. Rolled back {} tables, dropped {} tables.",
1326                rolled_back_count,
1327                dropped_count
1328            );
1329            Ok(())
1330        } else {
1331            Err(anyhow!(
1332                "Bulk load abort had {} rollback errors: {}",
1333                rollback_errors.len(),
1334                rollback_errors.join("; ")
1335            ))
1336        }
1337    }
1338
1339    fn report_progress(&self, phase: BulkPhase, rows: usize, label: Option<String>) {
1340        if let Some(cb) = &self.progress_callback {
1341            cb(BulkProgress {
1342                phase,
1343                rows_processed: rows,
1344                total_rows: None,
1345                current_label: label,
1346                elapsed: self.start_time.elapsed(),
1347            });
1348        }
1349    }
1350}