Skip to main content

uni_db/api/
bulk.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Bulk loading API for high-throughput data ingestion.
5//!
6//! This module provides `BulkWriter` for efficiently loading large amounts of
7//! vertices and edges while deferring index updates until commit time.
8//!
9//! ## Async Index Building
10//!
11//! By default, `commit()` blocks until all indexes are rebuilt. For large datasets,
12//! you can enable async index building to return immediately while indexes are
13//! built in the background:
14//!
15//! ```ignore
16//! let stats = db.bulk_writer()
17//!     .async_indexes(true)
18//!     .build()?
19//!     .insert_vertices(...)
20//!     .await?
21//!     .commit()
22//!     .await?;
23//!
24//! // Data is queryable immediately (may use full scans)
25//! // Check index status later:
26//! let status = db.index_rebuild_status().await?;
27//! ```
28
29use crate::api::Uni;
30use anyhow::{Result, anyhow};
31use chrono::Utc;
32use std::collections::{HashMap, HashSet};
33use std::sync::Arc;
34use std::time::{Duration, Instant};
35use uni_common::Value;
36use uni_common::core::id::{Eid, Vid};
37use uni_common::core::snapshot::{EdgeSnapshot, LabelSnapshot, SnapshotManifest};
38use uni_common::{Properties, UniError};
39use uni_store::storage::delta::{L1Entry, Op};
40use uni_store::storage::main_edge::MainEdgeDataset;
41use uni_store::storage::main_vertex::MainVertexDataset;
42use uni_store::storage::{IndexManager, IndexRebuildManager};
43use uuid::Uuid;
44
45/// Builder for configuring a bulk writer.
46pub struct BulkWriterBuilder<'a> {
47    db: &'a Uni,
48    config: BulkConfig,
49    progress_callback: Option<Box<dyn Fn(BulkProgress) + Send>>,
50}
51
52impl<'a> BulkWriterBuilder<'a> {
53    /// Create a new bulk writer builder with default configuration.
54    pub fn new(db: &'a Uni) -> Self {
55        Self {
56            db,
57            config: BulkConfig::default(),
58            progress_callback: None,
59        }
60    }
61
62    /// Set whether to defer vector index building until commit.
63    pub fn defer_vector_indexes(mut self, defer: bool) -> Self {
64        self.config.defer_vector_indexes = defer;
65        self
66    }
67
68    /// Set whether to defer scalar index building until commit.
69    pub fn defer_scalar_indexes(mut self, defer: bool) -> Self {
70        self.config.defer_scalar_indexes = defer;
71        self
72    }
73
74    /// Set the batch size for buffering before flush.
75    pub fn batch_size(mut self, size: usize) -> Self {
76        self.config.batch_size = size;
77        self
78    }
79
80    /// Set a progress callback for monitoring bulk load progress.
81    pub fn on_progress<F: Fn(BulkProgress) + Send + 'static>(mut self, f: F) -> Self {
82        self.progress_callback = Some(Box::new(f));
83        self
84    }
85
86    /// Build indexes asynchronously after commit.
87    ///
88    /// When enabled, `commit()` returns immediately after data is written,
89    /// and indexes are rebuilt in the background. The data is queryable
90    /// immediately but queries may use full scans until indexes are ready.
91    ///
92    /// Use `Uni::index_rebuild_status()` to check progress.
93    ///
94    /// Default: `false` (blocking index rebuild)
95    pub fn async_indexes(mut self, async_: bool) -> Self {
96        self.config.async_indexes = async_;
97        self
98    }
99
100    /// Set whether to validate constraints during bulk load.
101    ///
102    /// When enabled (default), BulkWriter validates NOT NULL, UNIQUE, and CHECK
103    /// constraints before each flush, matching the behavior of regular Writer.
104    /// Set to `false` for trusted data sources to improve performance.
105    ///
106    /// Default: `true`
107    pub fn validate_constraints(mut self, validate: bool) -> Self {
108        self.config.validate_constraints = validate;
109        self
110    }
111
112    /// Set the maximum buffer size before triggering a checkpoint flush.
113    ///
114    /// When the in-memory buffer exceeds this size, a checkpoint is triggered
115    /// to flush data to storage. This allows bulk loading of arbitrarily large
116    /// datasets while controlling memory usage.
117    ///
118    /// Default: 1 GB (1_073_741_824 bytes)
119    pub fn max_buffer_size_bytes(mut self, size: usize) -> Self {
120        self.config.max_buffer_size_bytes = size;
121        self
122    }
123
124    /// Build the bulk writer.
125    ///
126    /// # Errors
127    ///
128    /// Returns an error if the database is not writable.
129    pub fn build(self) -> Result<BulkWriter<'a>> {
130        if self.db.writer.is_none() {
131            return Err(anyhow!("BulkWriter requires a writable database instance"));
132        }
133
134        Ok(BulkWriter {
135            db: self.db,
136            config: self.config,
137            progress_callback: self.progress_callback,
138            stats: BulkStats::default(),
139            start_time: Instant::now(),
140            pending_vertices: HashMap::new(),
141            pending_edges: HashMap::new(),
142            touched_labels: HashSet::new(),
143            touched_edge_types: HashSet::new(),
144            initial_table_versions: HashMap::new(),
145            buffer_size_bytes: 0,
146            committed: false,
147        })
148    }
149}
150
151/// Configuration for bulk loading operations.
152pub struct BulkConfig {
153    /// Whether to defer vector index building until commit.
154    pub defer_vector_indexes: bool,
155    /// Whether to defer scalar index building until commit.
156    pub defer_scalar_indexes: bool,
157    /// Number of rows to buffer before flushing to storage.
158    pub batch_size: usize,
159    /// Whether to build indexes asynchronously after commit.
160    pub async_indexes: bool,
161    /// Whether to validate constraints (NOT NULL, UNIQUE, CHECK) during bulk load.
162    ///
163    /// Default: `true`. Set to `false` to skip validation for trusted data sources.
164    pub validate_constraints: bool,
165    /// Maximum buffer size in bytes before triggering a checkpoint flush.
166    ///
167    /// Default: 1 GB (1_073_741_824 bytes). When buffer size exceeds this limit,
168    /// a checkpoint is triggered to flush data to storage while continuing to
169    /// accept new data.
170    pub max_buffer_size_bytes: usize,
171}
172
173impl Default for BulkConfig {
174    fn default() -> Self {
175        Self {
176            defer_vector_indexes: true,
177            defer_scalar_indexes: true,
178            batch_size: 10_000,
179            async_indexes: false,
180            validate_constraints: true,
181            max_buffer_size_bytes: 1_073_741_824, // 1 GB
182        }
183    }
184}
185
186#[derive(Debug, Clone)]
187pub struct BulkProgress {
188    pub phase: BulkPhase,
189    pub rows_processed: usize,
190    pub total_rows: Option<usize>,
191    pub current_label: Option<String>,
192    pub elapsed: Duration,
193}
194
195#[derive(Debug, Clone)]
196pub enum BulkPhase {
197    Inserting,
198    RebuildingIndexes { label: String },
199    Finalizing,
200}
201
202#[derive(Debug, Clone, Default)]
203pub struct BulkStats {
204    pub vertices_inserted: usize,
205    pub edges_inserted: usize,
206    pub indexes_rebuilt: usize,
207    pub duration: Duration,
208    pub index_build_duration: Duration,
209    /// Task IDs for async index rebuilds (populated when `async_indexes` is true).
210    pub index_task_ids: Vec<String>,
211    /// True if index building was deferred to background (async mode).
212    pub indexes_pending: bool,
213}
214
215/// Edge data for bulk insertion.
216///
217/// Contains source/destination vertex IDs and properties.
218#[derive(Debug, Clone)]
219pub struct EdgeData {
220    /// Source vertex ID.
221    pub src_vid: Vid,
222    /// Destination vertex ID.
223    pub dst_vid: Vid,
224    /// Edge properties.
225    pub properties: Properties,
226}
227
228impl EdgeData {
229    /// Create new edge data.
230    pub fn new(src_vid: Vid, dst_vid: Vid, properties: Properties) -> Self {
231        Self {
232            src_vid,
233            dst_vid,
234            properties,
235        }
236    }
237}
238
239/// Bulk writer for high-throughput data ingestion.
240///
241/// Buffers vertices and edges, deferring index updates until commit.
242/// Supports constraint validation, automatic checkpointing when buffer limits
243/// are exceeded, and proper rollback via LanceDB version tracking.
244///
245/// Use `abort()` to discard uncommitted changes and roll back storage to its
246/// pre-bulk-load state.
247pub struct BulkWriter<'a> {
248    db: &'a Uni,
249    config: BulkConfig,
250    progress_callback: Option<Box<dyn Fn(BulkProgress) + Send>>,
251    stats: BulkStats,
252    start_time: Instant,
253    // Buffered data per label/type
254    pending_vertices: HashMap<String, Vec<(Vid, Properties)>>,
255    pending_edges: HashMap<String, Vec<L1Entry>>,
256    // Track what was written (for index rebuild)
257    touched_labels: HashSet<String>,
258    touched_edge_types: HashSet<String>,
259    // Track LanceDB table versions before bulk load started (for abort rollback)
260    // Key: table name, Value: version before first write (None = table created during bulk load)
261    initial_table_versions: HashMap<String, Option<u64>>,
262    // Current buffer size in bytes (approximate)
263    buffer_size_bytes: usize,
264    committed: bool,
265}
266
267impl<'a> BulkWriter<'a> {
268    /// Returns the current timestamp in microseconds since Unix epoch.
269    fn get_current_timestamp_micros(&self) -> i64 {
270        use std::time::{SystemTime, UNIX_EPOCH};
271        SystemTime::now()
272            .duration_since(UNIX_EPOCH)
273            .map(|d| d.as_micros() as i64)
274            .unwrap_or(0)
275    }
276
277    /// Insert vertices in bulk.
278    ///
279    /// The vertices are buffered until `batch_size` is reached, then written to storage.
280    /// When constraint validation is enabled, constraints are checked before each flush.
281    /// When the buffer size exceeds `max_buffer_size_bytes`, a checkpoint is triggered.
282    ///
283    /// # Errors
284    ///
285    /// Returns an error if:
286    /// - The label is not found in the schema
287    /// - Constraint validation fails (when enabled)
288    /// - Storage write fails
289    pub async fn insert_vertices(
290        &mut self,
291        label: &str,
292        vertices: Vec<HashMap<String, Value>>,
293    ) -> Result<Vec<Vid>> {
294        let schema = self.db.schema.schema();
295        // Validate label exists in schema
296        schema
297            .labels
298            .get(label)
299            .ok_or_else(|| UniError::LabelNotFound {
300                label: label.to_string(),
301            })?;
302        // Validate constraints before buffering (if enabled)
303        if self.config.validate_constraints {
304            self.validate_vertex_batch_constraints(label, &vertices)
305                .await?;
306        }
307
308        // Allocate VIDs (batched for performance)
309        let vids = {
310            let writer = self.db.writer.as_ref().unwrap().read().await;
311            writer
312                .allocate_vids(vertices.len())
313                .await
314                .map_err(UniError::Internal)?
315        };
316
317        // Track buffer size and add to buffer
318        let buffer = self.pending_vertices.entry(label.to_string()).or_default();
319        for (i, props) in vertices.into_iter().enumerate() {
320            self.buffer_size_bytes += Self::estimate_properties_size(&props);
321            buffer.push((vids[i], props));
322        }
323
324        self.touched_labels.insert(label.to_string());
325
326        // Check if we need to checkpoint based on buffer size
327        if self.buffer_size_bytes >= self.config.max_buffer_size_bytes {
328            self.checkpoint().await?;
329        } else {
330            // Otherwise, check batch size threshold for this label only
331            self.check_flush_vertices(label).await?;
332        }
333
334        self.stats.vertices_inserted += vids.len();
335        self.report_progress(
336            BulkPhase::Inserting,
337            self.stats.vertices_inserted,
338            Some(label.to_string()),
339        );
340
341        Ok(vids)
342    }
343
344    /// Estimate the size of a properties map in bytes.
345    fn estimate_properties_size(props: &Properties) -> usize {
346        let mut size = 0;
347        for (key, value) in props {
348            size += key.len();
349            size += Self::estimate_value_size(value);
350        }
351        size
352    }
353
354    /// Estimate the size of a value in bytes.
355    fn estimate_value_size(value: &Value) -> usize {
356        match value {
357            Value::Null => 1,
358            Value::Bool(_) => 1,
359            Value::Int(_) | Value::Float(_) => 8,
360            Value::String(s) => s.len(),
361            Value::Bytes(b) => b.len(),
362            Value::List(arr) => arr.iter().map(Self::estimate_value_size).sum::<usize>() + 8,
363            Value::Map(obj) => {
364                obj.iter()
365                    .map(|(k, v)| k.len() + Self::estimate_value_size(v))
366                    .sum::<usize>()
367                    + 8
368            }
369            Value::Vector(v) => v.len() * 4,
370            _ => 16, // Node, Edge, Path
371        }
372    }
373
374    /// Validate constraints for a batch of vertices before insertion.
375    ///
376    /// Checks NOT NULL, UNIQUE, and CHECK constraints. For UNIQUE constraints,
377    /// validates both within the batch and against already-buffered data.
378    async fn validate_vertex_batch_constraints(
379        &self,
380        label: &str,
381        vertices: &[Properties],
382    ) -> Result<()> {
383        let schema = self.db.schema.schema();
384
385        // Check NOT NULL and CHECK constraints for each vertex
386        if let Some(props_meta) = schema.properties.get(label) {
387            for (idx, props) in vertices.iter().enumerate() {
388                // NOT NULL constraints
389                for (prop_name, meta) in props_meta {
390                    if !meta.nullable && props.get(prop_name).is_none_or(|v| v.is_null()) {
391                        return Err(anyhow!(
392                            "NOT NULL constraint violation at row {}: property '{}' cannot be null for label '{}'",
393                            idx,
394                            prop_name,
395                            label
396                        ));
397                    }
398                }
399            }
400        }
401
402        // Check explicit constraints (UNIQUE, CHECK)
403        for constraint in &schema.constraints {
404            if !constraint.enabled {
405                continue;
406            }
407            match &constraint.target {
408                uni_common::core::schema::ConstraintTarget::Label(l) if l == label => {}
409                _ => continue,
410            }
411
412            match &constraint.constraint_type {
413                uni_common::core::schema::ConstraintType::Unique {
414                    properties: unique_props,
415                } => {
416                    // Check for duplicates within the batch
417                    let mut seen_keys: HashSet<String> = HashSet::new();
418                    for (idx, props) in vertices.iter().enumerate() {
419                        let key = self.compute_unique_key(unique_props, props);
420                        if let Some(k) = key
421                            && !seen_keys.insert(k.clone())
422                        {
423                            return Err(anyhow!(
424                                "UNIQUE constraint violation at row {}: duplicate key '{}' in batch",
425                                idx,
426                                k
427                            ));
428                        }
429                    }
430
431                    // Check against already-buffered data
432                    if let Some(buffered) = self.pending_vertices.get(label) {
433                        for (idx, props) in vertices.iter().enumerate() {
434                            let key = self.compute_unique_key(unique_props, props);
435                            if let Some(k) = key {
436                                for (_, buffered_props) in buffered {
437                                    let buffered_key =
438                                        self.compute_unique_key(unique_props, buffered_props);
439                                    if buffered_key.as_ref() == Some(&k) {
440                                        return Err(anyhow!(
441                                            "UNIQUE constraint violation at row {}: key '{}' conflicts with buffered data",
442                                            idx,
443                                            k
444                                        ));
445                                    }
446                                }
447                            }
448                        }
449                    }
450                }
451                uni_common::core::schema::ConstraintType::Exists { property } => {
452                    for (idx, props) in vertices.iter().enumerate() {
453                        if props.get(property).is_none_or(|v| v.is_null()) {
454                            return Err(anyhow!(
455                                "EXISTS constraint violation at row {}: property '{}' must exist",
456                                idx,
457                                property
458                            ));
459                        }
460                    }
461                }
462                uni_common::core::schema::ConstraintType::Check { expression } => {
463                    for (idx, props) in vertices.iter().enumerate() {
464                        if !self.evaluate_check_expression(expression, props)? {
465                            return Err(anyhow!(
466                                "CHECK constraint '{}' violated at row {}: expression '{}' evaluated to false",
467                                constraint.name,
468                                idx,
469                                expression
470                            ));
471                        }
472                    }
473                }
474                _ => {}
475            }
476        }
477
478        Ok(())
479    }
480
481    /// Compute a unique key string from properties for UNIQUE constraint checking.
482    fn compute_unique_key(&self, unique_props: &[String], props: &Properties) -> Option<String> {
483        let mut parts = Vec::new();
484        for prop in unique_props {
485            match props.get(prop) {
486                Some(v) if !v.is_null() => parts.push(v.to_string()),
487                _ => return None, // Missing property means can't enforce uniqueness
488            }
489        }
490        Some(parts.join(":"))
491    }
492
493    /// Evaluate a simple CHECK constraint expression.
494    fn evaluate_check_expression(&self, expression: &str, properties: &Properties) -> Result<bool> {
495        let parts: Vec<&str> = expression.split_whitespace().collect();
496        if parts.len() != 3 {
497            // Complex expression - allow for now
498            return Ok(true);
499        }
500
501        let prop_part = parts[0].trim_start_matches('(');
502        let prop_name = if let Some(idx) = prop_part.find('.') {
503            &prop_part[idx + 1..]
504        } else {
505            prop_part
506        };
507
508        let op = parts[1];
509        let val_str = parts[2].trim_end_matches(')');
510
511        let prop_val = match properties.get(prop_name) {
512            Some(v) => v,
513            None => return Ok(true), // Missing property passes CHECK
514        };
515
516        // Parse target value
517        let target_val = if (val_str.starts_with('\'') && val_str.ends_with('\''))
518            || (val_str.starts_with('"') && val_str.ends_with('"'))
519        {
520            Value::String(val_str[1..val_str.len() - 1].to_string())
521        } else if let Ok(n) = val_str.parse::<i64>() {
522            Value::Int(n)
523        } else if let Ok(n) = val_str.parse::<f64>() {
524            Value::Float(n)
525        } else if let Ok(b) = val_str.parse::<bool>() {
526            Value::Bool(b)
527        } else {
528            Value::String(val_str.to_string())
529        };
530
531        match op {
532            "=" | "==" => Ok(prop_val == &target_val),
533            "!=" | "<>" => Ok(prop_val != &target_val),
534            ">" => self
535                .compare_json_values(prop_val, &target_val)
536                .map(|c| c > 0),
537            "<" => self
538                .compare_json_values(prop_val, &target_val)
539                .map(|c| c < 0),
540            ">=" => self
541                .compare_json_values(prop_val, &target_val)
542                .map(|c| c >= 0),
543            "<=" => self
544                .compare_json_values(prop_val, &target_val)
545                .map(|c| c <= 0),
546            _ => Ok(true), // Unknown operator - allow
547        }
548    }
549
550    /// Compare two values, returning -1, 0, or 1.
551    fn compare_json_values(&self, a: &Value, b: &Value) -> Result<i8> {
552        match (a, b) {
553            (Value::Int(n1), Value::Int(n2)) => Ok(n1.cmp(n2) as i8),
554            (Value::Float(f1), Value::Float(f2)) => {
555                if f1 < f2 {
556                    Ok(-1)
557                } else if f1 > f2 {
558                    Ok(1)
559                } else {
560                    Ok(0)
561                }
562            }
563            (Value::Int(n), Value::Float(f)) => {
564                let nf = *n as f64;
565                if nf < *f {
566                    Ok(-1)
567                } else if nf > *f {
568                    Ok(1)
569                } else {
570                    Ok(0)
571                }
572            }
573            (Value::Float(f), Value::Int(n)) => {
574                let nf = *n as f64;
575                if *f < nf {
576                    Ok(-1)
577                } else if *f > nf {
578                    Ok(1)
579                } else {
580                    Ok(0)
581                }
582            }
583            (Value::String(s1), Value::String(s2)) => match s1.cmp(s2) {
584                std::cmp::Ordering::Less => Ok(-1),
585                std::cmp::Ordering::Greater => Ok(1),
586                std::cmp::Ordering::Equal => Ok(0),
587            },
588            _ => Err(anyhow!(
589                "Cannot compare incompatible types: {:?} vs {:?}",
590                a,
591                b
592            )),
593        }
594    }
595
596    /// Checkpoint: flush all pending data to storage.
597    ///
598    /// Called automatically when buffer size exceeds `max_buffer_size_bytes`.
599    /// Flushes all buffered vertices and edges, then resets the buffer size counter.
600    async fn checkpoint(&mut self) -> Result<()> {
601        log::debug!(
602            "Checkpoint triggered at {} bytes (limit: {})",
603            self.buffer_size_bytes,
604            self.config.max_buffer_size_bytes
605        );
606
607        // Flush all pending vertices
608        let labels: Vec<String> = self.pending_vertices.keys().cloned().collect();
609        for label in labels {
610            self.flush_vertices_buffer(&label).await?;
611        }
612
613        // Flush all pending edges
614        let edge_types: Vec<String> = self.pending_edges.keys().cloned().collect();
615        for edge_type in edge_types {
616            self.flush_edges_buffer(&edge_type).await?;
617        }
618
619        // Reset buffer size
620        self.buffer_size_bytes = 0;
621
622        Ok(())
623    }
624
625    // Helper to flush vertex buffer if full
626    async fn check_flush_vertices(&mut self, label: &str) -> Result<()> {
627        let should_flush = {
628            if let Some(buf) = self.pending_vertices.get(label) {
629                buf.len() >= self.config.batch_size
630            } else {
631                false
632            }
633        };
634
635        if should_flush {
636            self.flush_vertices_buffer(label).await?;
637        }
638        Ok(())
639    }
640
641    /// Flush vertex buffer to LanceDB storage.
642    ///
643    /// Records the initial table version before first write for rollback support.
644    /// Writes to both per-label table and main vertices table.
645    async fn flush_vertices_buffer(&mut self, label: &str) -> Result<()> {
646        if let Some(vertices) = self.pending_vertices.remove(label) {
647            if vertices.is_empty() {
648                return Ok(());
649            }
650
651            // Record initial version for abort rollback (only once per table)
652            let table_name = uni_store::lancedb::LanceDbStore::vertex_table_name(label);
653            if !self.initial_table_versions.contains_key(&table_name) {
654                let lancedb_store = self.db.storage.lancedb_store();
655                let version = lancedb_store
656                    .get_table_version(&table_name)
657                    .await
658                    .map_err(UniError::Internal)?;
659                self.initial_table_versions.insert(table_name, version);
660            }
661
662            // Record main vertices table version for rollback
663            let main_table_name =
664                uni_store::lancedb::LanceDbStore::main_vertex_table_name().to_string();
665            if !self.initial_table_versions.contains_key(&main_table_name) {
666                let lancedb_store = self.db.storage.lancedb_store();
667                let version = lancedb_store
668                    .get_table_version(&main_table_name)
669                    .await
670                    .map_err(UniError::Internal)?;
671                self.initial_table_versions
672                    .insert(main_table_name.clone(), version);
673            }
674
675            let ds = self
676                .db
677                .storage
678                .vertex_dataset(label)
679                .map_err(UniError::Internal)?;
680            let schema = self.db.schema.schema();
681
682            let deleted = vec![false; vertices.len()];
683            let versions = vec![1; vertices.len()]; // Version 1 for bulk load
684
685            // Generate timestamps for this batch
686            let now = self.get_current_timestamp_micros();
687            let mut created_at: HashMap<Vid, i64> = HashMap::new();
688            let mut updated_at: HashMap<Vid, i64> = HashMap::new();
689            for (vid, _) in &vertices {
690                created_at.insert(*vid, now);
691                updated_at.insert(*vid, now);
692            }
693
694            // Build per-label and main-vertex entries from the 2-tuple input.
695            // Both tables need labels attached; compute once per vertex.
696            let labels = vec![label.to_string()];
697            let vertices_with_labels: Vec<(Vid, Vec<String>, Properties)> = vertices
698                .iter()
699                .map(|(vid, props)| (*vid, labels.clone(), props.clone()))
700                .collect();
701
702            let batch = ds
703                .build_record_batch_with_timestamps(
704                    &vertices_with_labels,
705                    &deleted,
706                    &versions,
707                    &schema,
708                    Some(&created_at),
709                    Some(&updated_at),
710                )
711                .map_err(UniError::Internal)?;
712
713            // Write to per-label LanceDB table
714            let lancedb_store = self.db.storage.lancedb_store();
715            let table = ds
716                .write_batch_lancedb(lancedb_store, batch, &schema)
717                .await
718                .map_err(UniError::Internal)?;
719
720            // Create default scalar indexes (_vid, _uid) which are critical for basic function
721            ds.ensure_default_indexes_lancedb(&table)
722                .await
723                .map_err(UniError::Internal)?;
724
725            // Dual-write to main vertices table
726            let main_vertices: Vec<(Vid, Vec<String>, Properties, bool, u64)> =
727                vertices_with_labels
728                    .into_iter()
729                    .map(|(vid, lbls, props)| (vid, lbls, props, false, 1u64))
730                    .collect();
731
732            if !main_vertices.is_empty() {
733                let main_batch = MainVertexDataset::build_record_batch(
734                    &main_vertices,
735                    Some(&created_at),
736                    Some(&updated_at),
737                )
738                .map_err(UniError::Internal)?;
739
740                let main_table = MainVertexDataset::write_batch_lancedb(lancedb_store, main_batch)
741                    .await
742                    .map_err(UniError::Internal)?;
743
744                MainVertexDataset::ensure_default_indexes_lancedb(&main_table)
745                    .await
746                    .map_err(UniError::Internal)?;
747            }
748        }
749        Ok(())
750    }
751
752    /// Insert edges in bulk.
753    ///
754    /// Edges are buffered until `batch_size` is reached, then written to storage.
755    /// When the buffer size exceeds `max_buffer_size_bytes`, a checkpoint is triggered.
756    /// Indexes are NOT updated during these writes.
757    ///
758    /// # Errors
759    ///
760    /// Returns an error if the edge type is not found in the schema or if
761    /// storage write fails.
762    pub async fn insert_edges(
763        &mut self,
764        edge_type: &str,
765        edges: Vec<EdgeData>,
766    ) -> Result<Vec<Eid>> {
767        let schema = self.db.schema.schema();
768        let edge_meta =
769            schema
770                .edge_types
771                .get(edge_type)
772                .ok_or_else(|| UniError::EdgeTypeNotFound {
773                    edge_type: edge_type.to_string(),
774                })?;
775        let type_id = edge_meta.id;
776
777        // Allocate EIDs
778        let mut eids = Vec::with_capacity(edges.len());
779        {
780            let writer = self.db.writer.as_ref().unwrap().read().await;
781            for _ in 0..edges.len() {
782                eids.push(writer.next_eid(type_id).await.map_err(UniError::Internal)?);
783            }
784        }
785
786        // Convert to L1Entry format and track buffer size
787        let now = self.get_current_timestamp_micros();
788        let mut added_size = 0usize;
789        let entries: Vec<L1Entry> = edges
790            .into_iter()
791            .enumerate()
792            .map(|(i, edge)| {
793                // Estimate size for buffer tracking (16 bytes for VIDs + 8 for EID + properties)
794                added_size += 32 + Self::estimate_properties_size(&edge.properties);
795                L1Entry {
796                    src_vid: edge.src_vid,
797                    dst_vid: edge.dst_vid,
798                    eid: eids[i],
799                    op: Op::Insert,
800                    version: 1,
801                    properties: edge.properties,
802                    created_at: Some(now),
803                    updated_at: Some(now),
804                }
805            })
806            .collect();
807        self.buffer_size_bytes += added_size;
808        self.pending_edges
809            .entry(edge_type.to_string())
810            .or_default()
811            .extend(entries);
812
813        self.touched_edge_types.insert(edge_type.to_string());
814
815        // Check if we need to checkpoint based on buffer size
816        if self.buffer_size_bytes >= self.config.max_buffer_size_bytes {
817            self.checkpoint().await?;
818        } else {
819            self.check_flush_edges(edge_type).await?;
820        }
821
822        self.stats.edges_inserted += eids.len();
823        self.report_progress(
824            BulkPhase::Inserting,
825            self.stats.vertices_inserted + self.stats.edges_inserted,
826            Some(edge_type.to_string()),
827        );
828
829        Ok(eids)
830    }
831
832    /// Check and flush edge buffer if full.
833    async fn check_flush_edges(&mut self, edge_type: &str) -> Result<()> {
834        let should_flush = self
835            .pending_edges
836            .get(edge_type)
837            .is_some_and(|buf| buf.len() >= self.config.batch_size);
838
839        if should_flush {
840            self.flush_edges_buffer(edge_type).await?;
841        }
842        Ok(())
843    }
844
845    /// Flush edge buffer to delta datasets.
846    ///
847    /// Records initial table versions before first write for rollback support.
848    /// Writes to both per-type delta tables and main edges table.
849    #[expect(
850        clippy::map_entry,
851        reason = "async code between contains_key and insert"
852    )]
853    async fn flush_edges_buffer(&mut self, edge_type: &str) -> Result<()> {
854        if let Some(entries) = self.pending_edges.remove(edge_type) {
855            if entries.is_empty() {
856                return Ok(());
857            }
858
859            let schema = self.db.schema.schema();
860            let lancedb_store = self.db.storage.lancedb_store();
861
862            // Record initial versions for abort rollback (FWD and BWD tables)
863            let fwd_table_name =
864                uni_store::lancedb::LanceDbStore::delta_table_name(edge_type, "fwd");
865            if !self.initial_table_versions.contains_key(&fwd_table_name) {
866                let version = lancedb_store
867                    .get_table_version(&fwd_table_name)
868                    .await
869                    .map_err(UniError::Internal)?;
870                self.initial_table_versions.insert(fwd_table_name, version);
871            }
872            let bwd_table_name =
873                uni_store::lancedb::LanceDbStore::delta_table_name(edge_type, "bwd");
874            if !self.initial_table_versions.contains_key(&bwd_table_name) {
875                let version = lancedb_store
876                    .get_table_version(&bwd_table_name)
877                    .await
878                    .map_err(UniError::Internal)?;
879                self.initial_table_versions.insert(bwd_table_name, version);
880            }
881
882            // Record main edges table version for rollback
883            let main_edge_table_name =
884                uni_store::lancedb::LanceDbStore::main_edge_table_name().to_string();
885            if !self
886                .initial_table_versions
887                .contains_key(&main_edge_table_name)
888            {
889                let version = lancedb_store
890                    .get_table_version(&main_edge_table_name)
891                    .await
892                    .map_err(UniError::Internal)?;
893                self.initial_table_versions
894                    .insert(main_edge_table_name.clone(), version);
895            }
896
897            // Write to FWD delta (sorted by src_vid)
898            let mut fwd_entries = entries.clone();
899            fwd_entries.sort_by_key(|e| e.src_vid);
900            let fwd_ds = self
901                .db
902                .storage
903                .delta_dataset(edge_type, "fwd")
904                .map_err(UniError::Internal)?;
905            let fwd_batch = fwd_ds
906                .build_record_batch(&fwd_entries, &schema)
907                .map_err(UniError::Internal)?;
908            let fwd_table = fwd_ds
909                .write_run_lancedb(lancedb_store, fwd_batch)
910                .await
911                .map_err(UniError::Internal)?;
912            fwd_ds
913                .ensure_eid_index_lancedb(&fwd_table)
914                .await
915                .map_err(UniError::Internal)?;
916
917            // Write to BWD delta (sorted by dst_vid)
918            let mut bwd_entries = entries.clone();
919            bwd_entries.sort_by_key(|e| e.dst_vid);
920            let bwd_ds = self
921                .db
922                .storage
923                .delta_dataset(edge_type, "bwd")
924                .map_err(UniError::Internal)?;
925            let bwd_batch = bwd_ds
926                .build_record_batch(&bwd_entries, &schema)
927                .map_err(UniError::Internal)?;
928            let bwd_table = bwd_ds
929                .write_run_lancedb(lancedb_store, bwd_batch)
930                .await
931                .map_err(UniError::Internal)?;
932            bwd_ds
933                .ensure_eid_index_lancedb(&bwd_table)
934                .await
935                .map_err(UniError::Internal)?;
936
937            // Dual-write to main edges table
938            let mut edge_created_at: HashMap<Eid, i64> = HashMap::new();
939            let mut edge_updated_at: HashMap<Eid, i64> = HashMap::new();
940            let main_edges: Vec<(Eid, Vid, Vid, String, Properties, bool, u64)> = entries
941                .iter()
942                .map(|e| {
943                    let deleted = matches!(e.op, Op::Delete);
944                    if let Some(ts) = e.created_at {
945                        edge_created_at.insert(e.eid, ts);
946                    }
947                    if let Some(ts) = e.updated_at {
948                        edge_updated_at.insert(e.eid, ts);
949                    }
950                    (
951                        e.eid,
952                        e.src_vid,
953                        e.dst_vid,
954                        edge_type.to_string(),
955                        e.properties.clone(),
956                        deleted,
957                        e.version,
958                    )
959                })
960                .collect();
961
962            if !main_edges.is_empty() {
963                let main_batch = MainEdgeDataset::build_record_batch(
964                    &main_edges,
965                    Some(&edge_created_at),
966                    Some(&edge_updated_at),
967                )
968                .map_err(UniError::Internal)?;
969
970                let main_table = MainEdgeDataset::write_batch_lancedb(lancedb_store, main_batch)
971                    .await
972                    .map_err(UniError::Internal)?;
973
974                MainEdgeDataset::ensure_default_indexes_lancedb(&main_table)
975                    .await
976                    .map_err(UniError::Internal)?;
977            }
978        }
979        Ok(())
980    }
981
982    /// Commit all pending data and rebuild indexes.
983    ///
984    /// Flushes remaining buffered data, rebuilds deferred indexes, and updates
985    /// the snapshot manifest.
986    ///
987    /// # Errors
988    ///
989    /// Returns an error if flushing, index rebuilding, or snapshot update fails.
990    pub async fn commit(mut self) -> Result<BulkStats> {
991        // 1. Flush remaining vertex buffers
992        let labels: Vec<String> = self.pending_vertices.keys().cloned().collect();
993        for label in labels {
994            self.flush_vertices_buffer(&label).await?;
995        }
996
997        // 2. Flush remaining edge buffers
998        let edge_types: Vec<String> = self.pending_edges.keys().cloned().collect();
999        for edge_type in edge_types {
1000            self.flush_edges_buffer(&edge_type).await?;
1001        }
1002
1003        let index_start = Instant::now();
1004
1005        // 3. Rebuild indexes for vertices
1006        if self.config.defer_vector_indexes || self.config.defer_scalar_indexes {
1007            let labels_to_rebuild: Vec<String> = self.touched_labels.iter().cloned().collect();
1008
1009            if self.config.async_indexes && !labels_to_rebuild.is_empty() {
1010                // Async mode: mark affected indexes as Stale before scheduling
1011                let schema = self.db.schema.schema();
1012                for label in &labels_to_rebuild {
1013                    for idx in &schema.indexes {
1014                        if idx.label() == label.as_str() {
1015                            let _ = self.db.schema.update_index_metadata(idx.name(), |m| {
1016                                m.status = uni_common::core::schema::IndexStatus::Stale;
1017                            });
1018                        }
1019                    }
1020                }
1021
1022                let rebuild_manager = IndexRebuildManager::new(
1023                    self.db.storage.clone(),
1024                    self.db.schema.clone(),
1025                    self.db.config.index_rebuild.clone(),
1026                )
1027                .await
1028                .map_err(UniError::Internal)?;
1029
1030                let task_ids = rebuild_manager
1031                    .schedule(labels_to_rebuild)
1032                    .await
1033                    .map_err(UniError::Internal)?;
1034
1035                self.stats.index_task_ids = task_ids;
1036                self.stats.indexes_pending = true;
1037
1038                let manager = Arc::new(rebuild_manager);
1039                let handle = manager.start_background_worker(self.db.shutdown_handle.subscribe());
1040                self.db.shutdown_handle.track_task(handle);
1041            } else {
1042                // Sync mode: rebuild indexes blocking
1043                for label in &labels_to_rebuild {
1044                    self.report_progress(
1045                        BulkPhase::RebuildingIndexes {
1046                            label: label.clone(),
1047                        },
1048                        self.stats.vertices_inserted + self.stats.edges_inserted,
1049                        Some(label.clone()),
1050                    );
1051                    let idx_mgr = IndexManager::new(
1052                        self.db.storage.base_path(),
1053                        self.db.storage.schema_manager_arc(),
1054                        self.db.storage.lancedb_store_arc(),
1055                    );
1056                    idx_mgr
1057                        .rebuild_indexes_for_label(label)
1058                        .await
1059                        .map_err(UniError::Internal)?;
1060                    self.stats.indexes_rebuilt += 1;
1061
1062                    // Update index metadata after successful sync rebuild
1063                    let now = chrono::Utc::now();
1064                    let row_count = self
1065                        .db
1066                        .storage
1067                        .lancedb_store()
1068                        .open_vertex_table(label)
1069                        .await
1070                        .ok()
1071                        .map(|t| async move { t.count_rows(None).await.ok().map(|c| c as u64) });
1072                    let row_count = match row_count {
1073                        Some(fut) => fut.await,
1074                        None => None,
1075                    };
1076
1077                    let schema = self.db.schema.schema();
1078                    for idx in &schema.indexes {
1079                        if idx.label() == label.as_str() {
1080                            let _ = self.db.schema.update_index_metadata(idx.name(), |m| {
1081                                m.status = uni_common::core::schema::IndexStatus::Online;
1082                                m.last_built_at = Some(now);
1083                                if let Some(count) = row_count {
1084                                    m.row_count_at_build = Some(count);
1085                                }
1086                            });
1087                        }
1088                    }
1089                }
1090            }
1091        }
1092
1093        self.stats.index_build_duration = index_start.elapsed();
1094
1095        // 4. Update Snapshot
1096        self.report_progress(
1097            BulkPhase::Finalizing,
1098            self.stats.vertices_inserted + self.stats.edges_inserted,
1099            None,
1100        );
1101
1102        // Load latest snapshot or create new
1103        let mut manifest = self
1104            .db
1105            .storage
1106            .snapshot_manager()
1107            .load_latest_snapshot()
1108            .await
1109            .map_err(UniError::Internal)?
1110            .unwrap_or_else(|| {
1111                SnapshotManifest::new(
1112                    Uuid::new_v4().to_string(),
1113                    self.db.schema.schema().schema_version,
1114                )
1115            });
1116
1117        // Update Manifest
1118        let parent_id = manifest.snapshot_id.clone();
1119        manifest.parent_snapshot = Some(parent_id);
1120        manifest.snapshot_id = Uuid::new_v4().to_string();
1121        manifest.created_at = Utc::now();
1122
1123        // Update counts and versions for touched labels (vertices)
1124        let lancedb_store = self.db.storage.lancedb_store();
1125        for label in &self.touched_labels {
1126            let table = lancedb_store
1127                .open_vertex_table(label)
1128                .await
1129                .map_err(UniError::Internal)?;
1130            let count = table
1131                .count_rows(None)
1132                .await
1133                .map_err(|e| UniError::Internal(anyhow::anyhow!("Count rows failed: {}", e)))?;
1134
1135            let current_snap =
1136                manifest
1137                    .vertices
1138                    .entry(label.to_string())
1139                    .or_insert(LabelSnapshot {
1140                        version: 0,
1141                        count: 0,
1142                        lance_version: 0,
1143                    });
1144            current_snap.count = count as u64;
1145            // LanceDB tables don't expose Lance version directly
1146            current_snap.lance_version = 0;
1147        }
1148
1149        // Update counts and versions for touched edge types
1150        for edge_type in &self.touched_edge_types {
1151            if let Ok(table) = lancedb_store.open_delta_table(edge_type, "fwd").await {
1152                let count = table
1153                    .count_rows(None)
1154                    .await
1155                    .map_err(|e| UniError::Internal(anyhow::anyhow!("Count rows failed: {}", e)))?;
1156
1157                let current_snap =
1158                    manifest
1159                        .edges
1160                        .entry(edge_type.to_string())
1161                        .or_insert(EdgeSnapshot {
1162                            version: 0,
1163                            count: 0,
1164                            lance_version: 0,
1165                        });
1166                current_snap.count = count as u64;
1167                // LanceDB tables don't expose Lance version directly
1168                current_snap.lance_version = 0;
1169            }
1170        }
1171
1172        // Save Snapshot
1173        self.db
1174            .storage
1175            .snapshot_manager()
1176            .save_snapshot(&manifest)
1177            .await
1178            .map_err(UniError::Internal)?;
1179        self.db
1180            .storage
1181            .snapshot_manager()
1182            .set_latest_snapshot(&manifest.snapshot_id)
1183            .await
1184            .map_err(UniError::Internal)?;
1185
1186        // Save schema with updated index metadata
1187        self.db.schema.save().await.map_err(UniError::Internal)?;
1188
1189        // Warm adjacency CSRs for all edge types written during bulk import
1190        // so that subsequent traversal queries can find the edges.
1191        let schema = self.db.storage.schema_manager().schema();
1192        for edge_type_name in &self.touched_edge_types {
1193            if let Some(meta) = schema.edge_types.get(edge_type_name.as_str()) {
1194                let type_id = meta.id;
1195                for &dir in uni_store::storage::direction::Direction::Both.expand() {
1196                    let _ = self.db.storage.warm_adjacency(type_id, dir, None).await;
1197                }
1198            }
1199        }
1200
1201        self.committed = true;
1202        self.stats.duration = self.start_time.elapsed();
1203        Ok(self.stats)
1204    }
1205
1206    /// Abort bulk loading and roll back all changes.
1207    ///
1208    /// Rolls back LanceDB tables to their pre-bulk-load versions using LanceDB's
1209    /// version API. Tables created during the bulk load are dropped. Buffered
1210    /// data that hasn't been flushed is discarded.
1211    ///
1212    /// # Errors
1213    ///
1214    /// Returns an error if rollback fails. The error message includes details
1215    /// about which tables failed to roll back.
1216    pub async fn abort(mut self) -> Result<()> {
1217        if self.committed {
1218            return Err(anyhow!("Cannot abort: bulk load already committed"));
1219        }
1220
1221        // 1. Clear pending buffers (not yet flushed to storage)
1222        self.pending_vertices.clear();
1223        self.pending_edges.clear();
1224        self.buffer_size_bytes = 0;
1225
1226        // 2. Roll back each modified table to its initial version
1227        let lancedb_store = self.db.storage.lancedb_store();
1228        let mut rollback_errors = Vec::new();
1229        let mut rolled_back_count = 0;
1230        let mut dropped_count = 0;
1231
1232        for (table_name, initial_version) in &self.initial_table_versions {
1233            match initial_version {
1234                Some(version) => {
1235                    // Table existed before - rollback to initial version
1236                    match lancedb_store.rollback_table(table_name, *version).await {
1237                        Ok(()) => {
1238                            log::info!("Rolled back table '{}' to version {}", table_name, version);
1239                            rolled_back_count += 1;
1240                        }
1241                        Err(e) => {
1242                            rollback_errors.push(format!("{}: {}", table_name, e));
1243                        }
1244                    }
1245                }
1246                None => {
1247                    // Table was created during bulk load - drop it
1248                    match lancedb_store.drop_table(table_name).await {
1249                        Ok(()) => {
1250                            log::info!("Dropped table '{}' (created during bulk load)", table_name);
1251                            dropped_count += 1;
1252                        }
1253                        Err(e) => {
1254                            rollback_errors.push(format!("{}: {}", table_name, e));
1255                        }
1256                    }
1257                }
1258            }
1259        }
1260
1261        // 3. Clear table cache to ensure next read picks up rolled-back state
1262        self.db.storage.clear_table_cache();
1263
1264        if rollback_errors.is_empty() {
1265            log::info!(
1266                "Bulk load aborted successfully. Rolled back {} tables, dropped {} tables.",
1267                rolled_back_count,
1268                dropped_count
1269            );
1270            Ok(())
1271        } else {
1272            Err(anyhow!(
1273                "Bulk load abort had {} rollback errors: {}",
1274                rollback_errors.len(),
1275                rollback_errors.join("; ")
1276            ))
1277        }
1278    }
1279
1280    fn report_progress(&self, phase: BulkPhase, rows: usize, label: Option<String>) {
1281        if let Some(cb) = &self.progress_callback {
1282            cb(BulkProgress {
1283                phase,
1284                rows_processed: rows,
1285                total_rows: None,
1286                current_label: label,
1287                elapsed: self.start_time.elapsed(),
1288            });
1289        }
1290    }
1291}