Skip to main content

uni_bulk/
bulk.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Bulk loading API for high-throughput data ingestion.
5//!
6//! This module provides `BulkWriter` for efficiently loading large amounts of
7//! vertices and edges while deferring index updates until commit time.
8//!
9//! ## Async Index Building
10//!
11//! By default, `commit()` blocks until all indexes are rebuilt. For large datasets,
12//! you can enable async index building to return immediately while indexes are
13//! built in the background:
14//!
15//! ```ignore
16//! let stats = db.bulk_writer()
17//!     .async_indexes(true)
18//!     .build()?
19//!     .insert_vertices(...)
20//!     .await?
21//!     .commit()
22//!     .await?;
23//!
24//! // Data is queryable immediately (may use full scans)
25//! // Check index status later:
26//! let status = db.index_rebuild_status().await?;
27//! ```
28
29use anyhow::{Result, anyhow};
30use chrono::Utc;
31use std::cmp::Ordering;
32use std::collections::{HashMap, HashSet};
33use std::sync::Arc;
34use std::time::{Duration, Instant};
35use uni_common::UniConfig;
36use uni_common::Value;
37use uni_common::core::id::{Eid, Vid};
38use uni_common::core::schema::SchemaManager;
39use uni_common::core::snapshot::{EdgeSnapshot, LabelSnapshot, SnapshotManifest};
40use uni_common::{Properties, UniError};
41use uni_plugin_host::shutdown::ShutdownHandle;
42use uni_store::runtime::writer::Writer;
43use uni_store::storage::delta::{L1Entry, Op};
44use uni_store::storage::main_edge::MainEdgeDataset;
45use uni_store::storage::main_vertex::MainVertexDataset;
46use uni_store::storage::manager::StorageManager;
47use uni_store::storage::{IndexManager, IndexRebuildManager};
48use uuid::Uuid;
49
50use crate::flush_intent;
51
52/// Test-only fault-injection: when set, `flush_vertices_buffer` returns an error
53/// *after* committing the per-label table but *before* the main table, exactly
54/// reproducing the crash-in-the-middle scenario H9 guards against. Always
55/// compiled (so integration tests across the crate boundary can arm it) but only
56/// ever set by tests.
57#[doc(hidden)]
58pub static FAIL_AFTER_PERLABEL_WRITE: std::sync::atomic::AtomicBool =
59    std::sync::atomic::AtomicBool::new(false);
60
61/// Concrete handle bundle injected into the bulk write path.
62///
63/// `BulkWriter`/`BulkWriterBuilder`/`StreamingAppender` need only field access
64/// to the database's storage, writer, schema, shutdown coordinator, and config —
65/// they call no methods on the owning `uni_db::Uni` inner state. The uni-db
66/// driver constructs a `BulkBackend` from its `UniInner` fields and hands it in,
67/// avoiding a dependency cycle.
68#[derive(Clone)]
69pub struct BulkBackend {
70    /// Storage manager (datasets, backend, snapshot manager).
71    pub storage: Arc<StorageManager>,
72    /// Writer for ID allocation. `None` on a read-only database.
73    pub writer: Option<Arc<Writer>>,
74    /// Schema manager (labels, edge types, constraints, index metadata).
75    pub schema: Arc<SchemaManager>,
76    /// Shutdown coordinator — tracks background index-rebuild tasks.
77    pub shutdown: Arc<ShutdownHandle>,
78    /// Database configuration (e.g. index-rebuild tuning).
79    pub config: UniConfig,
80}
81
82/// Trait for types that can be converted to property maps for bulk insertion.
83///
84/// Enables `insert_vertices` to accept both `Vec<HashMap<String, Value>>`
85/// and `RecordBatch` (Arrow columnar data).
86pub trait IntoArrow {
87    /// Convert to a vector of property maps.
88    fn into_property_maps(self) -> Vec<HashMap<String, Value>>;
89}
90
91impl IntoArrow for Vec<HashMap<String, Value>> {
92    fn into_property_maps(self) -> Vec<HashMap<String, Value>> {
93        self
94    }
95}
96
97impl IntoArrow for arrow_array::RecordBatch {
98    fn into_property_maps(self) -> Vec<HashMap<String, Value>> {
99        record_batch_to_property_maps(&self)
100    }
101}
102
103/// Convert each row of an Arrow `RecordBatch` to a property map.
104///
105/// Columns become property keys; values are converted from Arrow types to Uni
106/// [`Value`]s via `arrow_to_value`. Null values are omitted from the map.
107pub fn record_batch_to_property_maps(
108    batch: &arrow_array::RecordBatch,
109) -> Vec<HashMap<String, Value>> {
110    let schema = batch.schema();
111    let num_rows = batch.num_rows();
112    let mut rows = Vec::with_capacity(num_rows);
113    for row_idx in 0..num_rows {
114        let mut props = HashMap::with_capacity(schema.fields().len());
115        for (col_idx, field) in schema.fields().iter().enumerate() {
116            let col = batch.column(col_idx);
117            let value =
118                uni_store::storage::arrow_convert::arrow_to_value(col.as_ref(), row_idx, None);
119            if !value.is_null() {
120                props.insert(field.name().clone(), value);
121            }
122        }
123        rows.push(props);
124    }
125    rows
126}
127
128/// Builder for configuring a bulk writer.
129pub struct BulkWriterBuilder {
130    backend: BulkBackend,
131    config: BulkConfig,
132    progress_callback: Option<Box<dyn Fn(BulkProgress) + Send>>,
133}
134
135impl BulkWriterBuilder {
136    /// Create a bulk writer builder.
137    ///
138    /// Used by `Transaction::bulk_writer()` and `AppenderBuilder` — the
139    /// Transaction already holds the session write guard, so the BulkWriter
140    /// neither acquires nor releases one.
141    pub fn new_unguarded(backend: BulkBackend) -> Self {
142        Self {
143            backend,
144            config: BulkConfig::default(),
145            progress_callback: None,
146        }
147    }
148
149    /// Set whether to defer vector index building until commit.
150    pub fn defer_vector_indexes(mut self, defer: bool) -> Self {
151        self.config.defer_vector_indexes = defer;
152        self
153    }
154
155    /// Set whether to defer scalar index building until commit.
156    pub fn defer_scalar_indexes(mut self, defer: bool) -> Self {
157        self.config.defer_scalar_indexes = defer;
158        self
159    }
160
161    /// Set the batch size for buffering before flush.
162    pub fn batch_size(mut self, size: usize) -> Self {
163        self.config.batch_size = size;
164        self
165    }
166
167    /// Set a progress callback for monitoring bulk load progress.
168    pub fn on_progress<F: Fn(BulkProgress) + Send + 'static>(mut self, f: F) -> Self {
169        self.progress_callback = Some(Box::new(f));
170        self
171    }
172
173    /// Build indexes asynchronously after commit.
174    ///
175    /// When enabled, `commit()` returns immediately after data is written,
176    /// and indexes are rebuilt in the background. The data is queryable
177    /// immediately but queries may use full scans until indexes are ready.
178    ///
179    /// Use `Uni::index_rebuild_status()` to check progress.
180    ///
181    /// Default: `false` (blocking index rebuild)
182    pub fn async_indexes(mut self, async_: bool) -> Self {
183        self.config.async_indexes = async_;
184        self
185    }
186
187    /// Set whether to validate constraints during bulk load.
188    ///
189    /// When enabled (default), BulkWriter validates NOT NULL, UNIQUE, and CHECK
190    /// constraints before each flush, matching the behavior of regular Writer.
191    /// Set to `false` for trusted data sources to improve performance.
192    ///
193    /// Default: `true`
194    pub fn validate_constraints(mut self, validate: bool) -> Self {
195        self.config.validate_constraints = validate;
196        self
197    }
198
199    /// Set the maximum buffer size before triggering a checkpoint flush.
200    ///
201    /// When the in-memory buffer exceeds this size, a checkpoint is triggered
202    /// to flush data to storage. This allows bulk loading of arbitrarily large
203    /// datasets while controlling memory usage.
204    ///
205    /// Default: 1 GB (1_073_741_824 bytes)
206    pub fn max_buffer_size_bytes(mut self, size: usize) -> Self {
207        self.config.max_buffer_size_bytes = size;
208        self
209    }
210
211    /// Build the bulk writer.
212    ///
213    /// # Errors
214    ///
215    /// Returns an error if the database is not writable.
216    pub fn build(self) -> Result<BulkWriter> {
217        if self.backend.writer.is_none() {
218            return Err(anyhow!("BulkWriter requires a writable database instance"));
219        }
220
221        Ok(BulkWriter {
222            backend: self.backend,
223            config: self.config,
224            progress_callback: self.progress_callback,
225            stats: BulkStats::default(),
226            start_time: Instant::now(),
227            pending_vertices: HashMap::new(),
228            pending_edges: HashMap::new(),
229            touched_labels: HashSet::new(),
230            touched_edge_types: HashSet::new(),
231            initial_table_versions: HashMap::new(),
232            seen_unique_keys: HashMap::new(),
233            buffer_size_bytes: 0,
234            committed: false,
235        })
236    }
237}
238
239/// Configuration for bulk loading operations.
240pub struct BulkConfig {
241    /// Whether to defer vector index building until commit.
242    pub defer_vector_indexes: bool,
243    /// Whether to defer scalar index building until commit.
244    pub defer_scalar_indexes: bool,
245    /// Number of rows to buffer before flushing to storage.
246    pub batch_size: usize,
247    /// Whether to build indexes asynchronously after commit.
248    pub async_indexes: bool,
249    /// Whether to validate constraints (NOT NULL, UNIQUE, CHECK) during bulk load.
250    ///
251    /// Default: `true`. Set to `false` to skip validation for trusted data sources.
252    pub validate_constraints: bool,
253    /// Maximum buffer size in bytes before triggering a checkpoint flush.
254    ///
255    /// Default: 1 GB (1_073_741_824 bytes). When buffer size exceeds this limit,
256    /// a checkpoint is triggered to flush data to storage while continuing to
257    /// accept new data.
258    pub max_buffer_size_bytes: usize,
259}
260
261impl Default for BulkConfig {
262    fn default() -> Self {
263        Self {
264            defer_vector_indexes: true,
265            defer_scalar_indexes: true,
266            batch_size: 10_000,
267            async_indexes: false,
268            validate_constraints: true,
269            max_buffer_size_bytes: 1_073_741_824, // 1 GB
270        }
271    }
272}
273
274#[derive(Debug, Clone)]
275pub struct BulkProgress {
276    pub phase: BulkPhase,
277    pub rows_processed: usize,
278    pub total_rows: Option<usize>,
279    pub current_label: Option<String>,
280    pub elapsed: Duration,
281}
282
283#[derive(Debug, Clone)]
284pub enum BulkPhase {
285    Inserting,
286    RebuildingIndexes { label: String },
287    Finalizing,
288}
289
290#[derive(Debug, Clone, Default)]
291pub struct BulkStats {
292    pub vertices_inserted: usize,
293    pub edges_inserted: usize,
294    pub indexes_rebuilt: usize,
295    pub duration: Duration,
296    pub index_build_duration: Duration,
297    /// Task IDs for async index rebuilds (populated when `async_indexes` is true).
298    pub index_task_ids: Vec<String>,
299    /// True if index building was deferred to background (async mode).
300    pub indexes_pending: bool,
301}
302
303/// Edge data for bulk insertion.
304///
305/// Contains source/destination vertex IDs and properties.
306#[derive(Debug, Clone)]
307pub struct EdgeData {
308    /// Source vertex ID.
309    pub src_vid: Vid,
310    /// Destination vertex ID.
311    pub dst_vid: Vid,
312    /// Edge properties.
313    pub properties: Properties,
314}
315
316impl EdgeData {
317    /// Create new edge data.
318    pub fn new(src_vid: Vid, dst_vid: Vid, properties: Properties) -> Self {
319        Self {
320            src_vid,
321            dst_vid,
322            properties,
323        }
324    }
325}
326
327/// Bulk writer for high-throughput data ingestion.
328///
329/// Buffers vertices and edges, deferring index updates until commit.
330/// Supports constraint validation, automatic checkpointing when buffer limits
331/// are exceeded, and proper rollback via LanceDB version tracking.
332///
333/// Use `abort()` to discard uncommitted changes and roll back storage to its
334/// pre-bulk-load state.
335pub struct BulkWriter {
336    backend: BulkBackend,
337    config: BulkConfig,
338    progress_callback: Option<Box<dyn Fn(BulkProgress) + Send>>,
339    stats: BulkStats,
340    start_time: Instant,
341    // Buffered data per label/type
342    pending_vertices: HashMap<String, Vec<(Vid, Properties)>>,
343    pending_edges: HashMap<String, Vec<L1Entry>>,
344    // Track what was written (for index rebuild)
345    touched_labels: HashSet<String>,
346    touched_edge_types: HashSet<String>,
347    // Track LanceDB table versions before bulk load started (for abort rollback)
348    // Key: table name, Value: version before first write (None = table created during bulk load)
349    initial_table_versions: HashMap<String, Option<u64>>,
350    // UNIQUE-constraint keys accepted over the WHOLE load (not just the current
351    // buffer, which `flush_vertices_buffer` drains): without this, a duplicate
352    // spanning two flushes slipped through. Keyed by constraint identity
353    // (label + unique props), value = set of value-keys seen. (review H8)
354    seen_unique_keys: HashMap<String, HashSet<String>>,
355    // Current buffer size in bytes (approximate)
356    buffer_size_bytes: usize,
357    committed: bool,
358}
359
360/// Stable identity for a UNIQUE constraint's seen-key namespace: its label plus
361/// its property list. `compute_unique_key` encodes only VALUES, so two different
362/// UNIQUE constraints on the same label must not share a key namespace.
363fn unique_set_key(label: &str, unique_props: &[String]) -> String {
364    let mut s = String::from(label);
365    for p in unique_props {
366        s.push('\u{1f}'); // unit separator — won't appear in a label/prop name
367        s.push_str(p);
368    }
369    s
370}
371
372impl BulkWriter {
373    /// Returns a snapshot of the current bulk load statistics.
374    /// Updated after each batch flush.
375    pub fn stats(&self) -> &BulkStats {
376        &self.stats
377    }
378
379    /// Returns the set of vertex labels that have been written to.
380    pub fn touched_labels(&self) -> Vec<String> {
381        self.touched_labels.iter().cloned().collect()
382    }
383
384    /// Returns the set of edge types that have been written to.
385    pub fn touched_edge_types(&self) -> Vec<String> {
386        self.touched_edge_types.iter().cloned().collect()
387    }
388
389    /// Returns the current timestamp in microseconds since Unix epoch.
390    fn get_current_timestamp_micros() -> i64 {
391        use std::time::{SystemTime, UNIX_EPOCH};
392        SystemTime::now()
393            .duration_since(UNIX_EPOCH)
394            .map(|d| d.as_micros() as i64)
395            .unwrap_or(0)
396    }
397
398    /// Insert vertices in bulk.
399    ///
400    /// The vertices are buffered until `batch_size` is reached, then written to storage.
401    /// When constraint validation is enabled, constraints are checked before each flush.
402    /// When the buffer size exceeds `max_buffer_size_bytes`, a checkpoint is triggered.
403    ///
404    /// # Errors
405    ///
406    /// Returns an error if:
407    /// - The label is not found in the schema
408    /// - Constraint validation fails (when enabled)
409    /// - Storage write fails
410    pub async fn insert_vertices(
411        &mut self,
412        label: &str,
413        vertices: impl IntoArrow,
414    ) -> Result<Vec<Vid>> {
415        let vertices = vertices.into_property_maps();
416        let schema = self.backend.schema.schema();
417        // Validate label exists in schema
418        schema
419            .labels
420            .get(label)
421            .ok_or_else(|| UniError::LabelNotFound {
422                label: label.to_string(),
423            })?;
424        // Validate constraints before buffering (if enabled)
425        if self.config.validate_constraints {
426            self.validate_vertex_batch_constraints(label, &vertices)
427                .await?;
428        }
429
430        // Allocate VIDs (batched for performance)
431        let vids = {
432            let writer = self.backend.writer.as_ref().unwrap();
433            writer
434                .allocate_vids(vertices.len())
435                .await
436                .map_err(UniError::Internal)?
437        };
438
439        // Track buffer size and add to buffer
440        let buffer = self.pending_vertices.entry(label.to_string()).or_default();
441        for (i, props) in vertices.into_iter().enumerate() {
442            self.buffer_size_bytes += Self::estimate_properties_size(&props);
443            buffer.push((vids[i], props));
444        }
445
446        self.touched_labels.insert(label.to_string());
447
448        // Check if we need to checkpoint based on buffer size
449        if self.buffer_size_bytes >= self.config.max_buffer_size_bytes {
450            self.checkpoint().await?;
451        } else {
452            // Otherwise, check batch size threshold for this label only
453            self.check_flush_vertices(label).await?;
454        }
455
456        self.stats.vertices_inserted += vids.len();
457        self.report_progress(
458            BulkPhase::Inserting,
459            self.stats.vertices_inserted,
460            Some(label.to_string()),
461        );
462
463        Ok(vids)
464    }
465
466    /// Estimate the size of a properties map in bytes.
467    fn estimate_properties_size(props: &Properties) -> usize {
468        let mut size = 0;
469        for (key, value) in props {
470            size += key.len();
471            size += Self::estimate_value_size(value);
472        }
473        size
474    }
475
476    /// Estimate the size of a value in bytes.
477    fn estimate_value_size(value: &Value) -> usize {
478        match value {
479            Value::Null => 1,
480            Value::Bool(_) => 1,
481            Value::Int(_) | Value::Float(_) => 8,
482            Value::String(s) => s.len(),
483            Value::Bytes(b) => b.len(),
484            Value::List(arr) => arr.iter().map(Self::estimate_value_size).sum::<usize>() + 8,
485            Value::Map(obj) => {
486                obj.iter()
487                    .map(|(k, v)| k.len() + Self::estimate_value_size(v))
488                    .sum::<usize>()
489                    + 8
490            }
491            Value::Vector(v) => v.len() * 4,
492            _ => 16, // Node, Edge, Path
493        }
494    }
495
496    /// Validate constraints for a batch of vertices before insertion.
497    ///
498    /// Checks NOT NULL, UNIQUE, and CHECK constraints. For UNIQUE constraints,
499    /// validates both within the batch and against already-buffered data.
500    async fn validate_vertex_batch_constraints(
501        &mut self,
502        label: &str,
503        vertices: &[Properties],
504    ) -> Result<()> {
505        let schema = self.backend.schema.schema();
506
507        // UNIQUE keys to commit to the writer-lifetime set once EVERY constraint
508        // on this batch has passed. (review H8)
509        let mut pending_unique_records: Vec<(String, Vec<String>)> = Vec::new();
510
511        // Check NOT NULL and CHECK constraints for each vertex
512        if let Some(props_meta) = schema.properties.get(label) {
513            for (idx, props) in vertices.iter().enumerate() {
514                // NOT NULL constraints
515                for (prop_name, meta) in props_meta {
516                    if !meta.nullable && props.get(prop_name).is_none_or(|v| v.is_null()) {
517                        return Err(anyhow!(
518                            "NOT NULL constraint violation at row {}: property '{}' cannot be null for label '{}'",
519                            idx,
520                            prop_name,
521                            label
522                        ));
523                    }
524                }
525            }
526        }
527
528        // Check explicit constraints (UNIQUE, CHECK)
529        for constraint in &schema.constraints {
530            if !constraint.enabled {
531                continue;
532            }
533            match &constraint.target {
534                uni_common::core::schema::ConstraintTarget::Label(l) if l == label => {}
535                _ => continue,
536            }
537
538            match &constraint.constraint_type {
539                uni_common::core::schema::ConstraintType::Unique {
540                    properties: unique_props,
541                } => {
542                    // Check for duplicates within the batch
543                    let mut seen_keys: HashSet<String> = HashSet::new();
544                    for (idx, props) in vertices.iter().enumerate() {
545                        let key = self.compute_unique_key(unique_props, props);
546                        if let Some(k) = key
547                            && !seen_keys.insert(k.clone())
548                        {
549                            return Err(anyhow!(
550                                "UNIQUE constraint violation at row {}: duplicate key '{}' in batch",
551                                idx,
552                                k
553                            ));
554                        }
555                    }
556
557                    // Check against EVERY vertex accepted so far in this load,
558                    // not just the current buffer (`pending_vertices` is drained
559                    // on each flush, so a duplicate spanning two flushes used to
560                    // slip through). (review H8)
561                    let set_key = unique_set_key(label, unique_props);
562                    let mut batch_keys = Vec::with_capacity(vertices.len());
563                    for (idx, props) in vertices.iter().enumerate() {
564                        if let Some(k) = self.compute_unique_key(unique_props, props) {
565                            if self
566                                .seen_unique_keys
567                                .get(&set_key)
568                                .is_some_and(|seen| seen.contains(&k))
569                            {
570                                return Err(anyhow!(
571                                    "UNIQUE constraint violation at row {}: key '{}' conflicts with an already-loaded vertex",
572                                    idx,
573                                    k
574                                ));
575                            }
576                            batch_keys.push(k);
577                        }
578                    }
579                    pending_unique_records.push((set_key, batch_keys));
580                }
581                uni_common::core::schema::ConstraintType::Exists { property } => {
582                    for (idx, props) in vertices.iter().enumerate() {
583                        if props.get(property).is_none_or(|v| v.is_null()) {
584                            return Err(anyhow!(
585                                "EXISTS constraint violation at row {}: property '{}' must exist",
586                                idx,
587                                property
588                            ));
589                        }
590                    }
591                }
592                uni_common::core::schema::ConstraintType::Check { expression } => {
593                    for (idx, props) in vertices.iter().enumerate() {
594                        if !self.evaluate_check_expression(expression, props)? {
595                            return Err(anyhow!(
596                                "CHECK constraint '{}' violated at row {}: expression '{}' evaluated to false",
597                                constraint.name,
598                                idx,
599                                expression
600                            ));
601                        }
602                    }
603                }
604                _ => {}
605            }
606        }
607
608        // All constraints passed — commit the UNIQUE keys to the writer-lifetime
609        // set so a later batch (after a buffer flush) still sees them. (review H8)
610        for (set_key, keys) in pending_unique_records {
611            self.seen_unique_keys
612                .entry(set_key)
613                .or_default()
614                .extend(keys);
615        }
616
617        Ok(())
618    }
619
620    /// Compute a unique key string from properties for UNIQUE constraint checking.
621    fn compute_unique_key(&self, unique_props: &[String], props: &Properties) -> Option<String> {
622        let mut parts = Vec::new();
623        for prop in unique_props {
624            match props.get(prop) {
625                Some(v) if !v.is_null() => parts.push(v.to_string()),
626                _ => return None, // Missing property means can't enforce uniqueness
627            }
628        }
629        Some(parts.join(":"))
630    }
631
632    /// Evaluate a simple CHECK constraint expression.
633    fn evaluate_check_expression(&self, expression: &str, properties: &Properties) -> Result<bool> {
634        let parts: Vec<&str> = expression.split_whitespace().collect();
635        if parts.len() != 3 {
636            // Complex expression - allow for now
637            return Ok(true);
638        }
639
640        let prop_part = parts[0].trim_start_matches('(');
641        let prop_name = if let Some(idx) = prop_part.find('.') {
642            &prop_part[idx + 1..]
643        } else {
644            prop_part
645        };
646
647        let op = parts[1];
648        let val_str = parts[2].trim_end_matches(')');
649
650        let prop_val = match properties.get(prop_name) {
651            Some(v) => v,
652            None => return Ok(true), // Missing property passes CHECK
653        };
654
655        // Parse target value
656        let target_val = if (val_str.starts_with('\'') && val_str.ends_with('\''))
657            || (val_str.starts_with('"') && val_str.ends_with('"'))
658        {
659            Value::String(val_str[1..val_str.len() - 1].to_string())
660        } else if let Ok(n) = val_str.parse::<i64>() {
661            Value::Int(n)
662        } else if let Ok(n) = val_str.parse::<f64>() {
663            Value::Float(n)
664        } else if let Ok(b) = val_str.parse::<bool>() {
665            Value::Bool(b)
666        } else {
667            Value::String(val_str.to_string())
668        };
669
670        match op {
671            "=" | "==" => Ok(prop_val == &target_val),
672            "!=" | "<>" => Ok(prop_val != &target_val),
673            ">" => self.compare_values(prop_val, &target_val).map(|c| c > 0),
674            "<" => self.compare_values(prop_val, &target_val).map(|c| c < 0),
675            ">=" => self.compare_values(prop_val, &target_val).map(|c| c >= 0),
676            "<=" => self.compare_values(prop_val, &target_val).map(|c| c <= 0),
677            _ => Ok(true), // Unknown operator - allow
678        }
679    }
680
681    /// Compare two values, returning -1, 0, or 1.
682    ///
683    /// Incomparable floats (NaN) compare as equal (0), matching the prior
684    /// branch-based implementation.
685    fn compare_values(&self, a: &Value, b: &Value) -> Result<i8> {
686        let ordering = match (a, b) {
687            (Value::Int(n1), Value::Int(n2)) => n1.cmp(n2),
688            (Value::Float(f1), Value::Float(f2)) => f1.partial_cmp(f2).unwrap_or(Ordering::Equal),
689            (Value::Int(n), Value::Float(f)) => {
690                (*n as f64).partial_cmp(f).unwrap_or(Ordering::Equal)
691            }
692            (Value::Float(f), Value::Int(n)) => {
693                f.partial_cmp(&(*n as f64)).unwrap_or(Ordering::Equal)
694            }
695            (Value::String(s1), Value::String(s2)) => s1.cmp(s2),
696            _ => {
697                return Err(anyhow!(
698                    "Cannot compare incompatible types: {:?} vs {:?}",
699                    a,
700                    b
701                ));
702            }
703        };
704        Ok(ordering as i8)
705    }
706
707    /// Checkpoint: flush all pending data to storage.
708    ///
709    /// Called automatically when buffer size exceeds `max_buffer_size_bytes`.
710    /// Flushes all buffered vertices and edges, then resets the buffer size counter.
711    async fn checkpoint(&mut self) -> Result<()> {
712        log::debug!(
713            "Checkpoint triggered at {} bytes (limit: {})",
714            self.buffer_size_bytes,
715            self.config.max_buffer_size_bytes
716        );
717
718        // Flush all pending vertices
719        let labels: Vec<String> = self.pending_vertices.keys().cloned().collect();
720        for label in labels {
721            self.flush_vertices_buffer(&label).await?;
722        }
723
724        // Flush all pending edges
725        let edge_types: Vec<String> = self.pending_edges.keys().cloned().collect();
726        for edge_type in edge_types {
727            self.flush_edges_buffer(&edge_type).await?;
728        }
729
730        // Reset buffer size
731        self.buffer_size_bytes = 0;
732
733        Ok(())
734    }
735
736    // Helper to flush vertex buffer if full
737    async fn check_flush_vertices(&mut self, label: &str) -> Result<()> {
738        let should_flush = self
739            .pending_vertices
740            .get(label)
741            .is_some_and(|buf| buf.len() >= self.config.batch_size);
742
743        if should_flush {
744            self.flush_vertices_buffer(label).await?;
745        }
746        Ok(())
747    }
748
749    /// Flush vertex buffer to LanceDB storage.
750    ///
751    /// Records the initial table version before first write for rollback support.
752    /// Writes to both per-label table and main vertices table.
753    async fn flush_vertices_buffer(&mut self, label: &str) -> Result<()> {
754        if let Some(vertices) = self.pending_vertices.remove(label) {
755            if vertices.is_empty() {
756                return Ok(());
757            }
758
759            // Record initial version for abort rollback (only once per table)
760            let table_name = uni_store::backend::table_names::vertex_table_name(label);
761            if !self.initial_table_versions.contains_key(&table_name) {
762                let backend = self.backend.storage.backend();
763                let version = backend
764                    .get_table_version(&table_name)
765                    .await
766                    .map_err(UniError::Internal)?;
767                self.initial_table_versions.insert(table_name, version);
768            }
769
770            // Record main vertices table version for rollback
771            let main_table_name =
772                uni_store::backend::table_names::main_vertex_table_name().to_string();
773            if !self.initial_table_versions.contains_key(&main_table_name) {
774                let backend = self.backend.storage.backend();
775                let version = backend
776                    .get_table_version(&main_table_name)
777                    .await
778                    .map_err(UniError::Internal)?;
779                self.initial_table_versions
780                    .insert(main_table_name.clone(), version);
781            }
782
783            // Durably record the intent to mutate these tables BEFORE writing
784            // any of them, so a crash between the per-label and main commits is
785            // reconciled (rolled back) on reopen (H9).
786            self.persist_active_intent().await?;
787
788            let ds = self
789                .backend
790                .storage
791                .vertex_dataset(label)
792                .map_err(UniError::Internal)?;
793            let schema = self.backend.schema.schema();
794
795            let deleted = vec![false; vertices.len()];
796            let versions = vec![1; vertices.len()]; // Version 1 for bulk load
797
798            // Generate timestamps for this batch
799            let now = Self::get_current_timestamp_micros();
800            let mut created_at: HashMap<Vid, i64> = HashMap::new();
801            let mut updated_at: HashMap<Vid, i64> = HashMap::new();
802            for (vid, _) in &vertices {
803                created_at.insert(*vid, now);
804                updated_at.insert(*vid, now);
805            }
806
807            // Build per-label and main-vertex entries from the 2-tuple input.
808            // Both tables need labels attached; compute once per vertex.
809            let labels = vec![label.to_string()];
810            let vertices_with_labels: Vec<(Vid, Vec<String>, Properties)> = vertices
811                .iter()
812                .map(|(vid, props)| (*vid, labels.clone(), props.clone()))
813                .collect();
814
815            let batch = ds
816                .build_record_batch_with_timestamps(
817                    &vertices_with_labels,
818                    &deleted,
819                    &versions,
820                    &schema,
821                    Some(&created_at),
822                    Some(&updated_at),
823                )
824                .map_err(UniError::Internal)?;
825
826            // Write to per-label table via backend
827            let backend = self.backend.storage.backend();
828            ds.write_batch(backend, batch, &schema)
829                .await
830                .map_err(UniError::Internal)?;
831
832            // Create default scalar indexes (_vid, _uid) which are critical for basic function
833            ds.ensure_default_indexes(backend)
834                .await
835                .map_err(UniError::Internal)?;
836
837            // Test-only: simulate a crash between the per-label commit (done) and
838            // the main-table commit (below). The intent marker is already durable,
839            // so reopen recovery must roll the per-label table back.
840            if FAIL_AFTER_PERLABEL_WRITE.load(std::sync::atomic::Ordering::SeqCst) {
841                return Err(anyhow!("injected failure after per-label vertex commit"));
842            }
843
844            // Dual-write to main vertices table
845            let main_vertices: Vec<(Vid, Vec<String>, Properties, bool, u64)> =
846                vertices_with_labels
847                    .into_iter()
848                    .map(|(vid, lbls, props)| (vid, lbls, props, false, 1u64))
849                    .collect();
850
851            if !main_vertices.is_empty() {
852                let main_batch = MainVertexDataset::build_record_batch(
853                    &main_vertices,
854                    Some(&created_at),
855                    Some(&updated_at),
856                )
857                .map_err(UniError::Internal)?;
858
859                MainVertexDataset::write_batch(backend, main_batch)
860                    .await
861                    .map_err(UniError::Internal)?;
862
863                MainVertexDataset::ensure_default_indexes(backend)
864                    .await
865                    .map_err(UniError::Internal)?;
866            }
867        }
868        Ok(())
869    }
870
871    /// Insert edges in bulk.
872    ///
873    /// Edges are buffered until `batch_size` is reached, then written to storage.
874    /// When the buffer size exceeds `max_buffer_size_bytes`, a checkpoint is triggered.
875    /// Indexes are NOT updated during these writes.
876    ///
877    /// # Errors
878    ///
879    /// Returns an error if the edge type is not found in the schema or if
880    /// storage write fails.
881    pub async fn insert_edges(
882        &mut self,
883        edge_type: &str,
884        edges: Vec<EdgeData>,
885    ) -> Result<Vec<Eid>> {
886        let schema = self.backend.schema.schema();
887        // Validate the edge type exists, but its id is no longer needed:
888        // `allocate_eids` does not partition by type.
889        schema
890            .edge_types
891            .get(edge_type)
892            .ok_or_else(|| UniError::EdgeTypeNotFound {
893                edge_type: edge_type.to_string(),
894            })?;
895
896        // Allocate EIDs in one IdAllocator mutex acquisition.
897        let eids = {
898            let writer = self.backend.writer.as_ref().unwrap();
899            writer
900                .allocate_eids(edges.len())
901                .await
902                .map_err(UniError::Internal)?
903        };
904
905        // Convert to L1Entry format and track buffer size
906        let now = Self::get_current_timestamp_micros();
907        let mut added_size = 0usize;
908        let entries: Vec<L1Entry> = edges
909            .into_iter()
910            .enumerate()
911            .map(|(i, edge)| {
912                // Estimate size for buffer tracking (16 bytes for VIDs + 8 for EID + properties)
913                added_size += 32 + Self::estimate_properties_size(&edge.properties);
914                L1Entry {
915                    src_vid: edge.src_vid,
916                    dst_vid: edge.dst_vid,
917                    eid: eids[i],
918                    op: Op::Insert,
919                    version: 1,
920                    properties: edge.properties,
921                    created_at: Some(now),
922                    updated_at: Some(now),
923                }
924            })
925            .collect();
926        self.buffer_size_bytes += added_size;
927        self.pending_edges
928            .entry(edge_type.to_string())
929            .or_default()
930            .extend(entries);
931
932        self.touched_edge_types.insert(edge_type.to_string());
933
934        // Check if we need to checkpoint based on buffer size
935        if self.buffer_size_bytes >= self.config.max_buffer_size_bytes {
936            self.checkpoint().await?;
937        } else {
938            self.check_flush_edges(edge_type).await?;
939        }
940
941        self.stats.edges_inserted += eids.len();
942        self.report_progress(
943            BulkPhase::Inserting,
944            self.stats.vertices_inserted + self.stats.edges_inserted,
945            Some(edge_type.to_string()),
946        );
947
948        Ok(eids)
949    }
950
951    /// Check and flush edge buffer if full.
952    async fn check_flush_edges(&mut self, edge_type: &str) -> Result<()> {
953        let should_flush = self
954            .pending_edges
955            .get(edge_type)
956            .is_some_and(|buf| buf.len() >= self.config.batch_size);
957
958        if should_flush {
959            self.flush_edges_buffer(edge_type).await?;
960        }
961        Ok(())
962    }
963
964    /// Flush edge buffer to delta datasets.
965    ///
966    /// Records initial table versions before first write for rollback support.
967    /// Writes to both per-type delta tables and main edges table.
968    #[expect(
969        clippy::map_entry,
970        reason = "async code between contains_key and insert"
971    )]
972    async fn flush_edges_buffer(&mut self, edge_type: &str) -> Result<()> {
973        if let Some(entries) = self.pending_edges.remove(edge_type) {
974            if entries.is_empty() {
975                return Ok(());
976            }
977
978            let schema = self.backend.schema.schema();
979            let backend = self.backend.storage.backend();
980
981            // Record initial versions for abort rollback (FWD and BWD tables)
982            let fwd_table_name =
983                uni_store::backend::table_names::delta_table_name(edge_type, "fwd");
984            if !self.initial_table_versions.contains_key(&fwd_table_name) {
985                let version = backend
986                    .get_table_version(&fwd_table_name)
987                    .await
988                    .map_err(UniError::Internal)?;
989                self.initial_table_versions.insert(fwd_table_name, version);
990            }
991            let bwd_table_name =
992                uni_store::backend::table_names::delta_table_name(edge_type, "bwd");
993            if !self.initial_table_versions.contains_key(&bwd_table_name) {
994                let version = backend
995                    .get_table_version(&bwd_table_name)
996                    .await
997                    .map_err(UniError::Internal)?;
998                self.initial_table_versions.insert(bwd_table_name, version);
999            }
1000
1001            // Record main edges table version for rollback
1002            let main_edge_table_name =
1003                uni_store::backend::table_names::main_edge_table_name().to_string();
1004            if !self
1005                .initial_table_versions
1006                .contains_key(&main_edge_table_name)
1007            {
1008                let version = backend
1009                    .get_table_version(&main_edge_table_name)
1010                    .await
1011                    .map_err(UniError::Internal)?;
1012                self.initial_table_versions
1013                    .insert(main_edge_table_name.clone(), version);
1014            }
1015
1016            // Record the intent before writing any of the three edge datasets
1017            // (fwd delta, bwd delta, main edges) — see the vertex path (H9).
1018            self.persist_active_intent().await?;
1019
1020            // Write to FWD delta (sorted by src_vid)
1021            let mut fwd_entries = entries.clone();
1022            fwd_entries.sort_by_key(|e| e.src_vid);
1023            let fwd_ds = self
1024                .backend
1025                .storage
1026                .delta_dataset(edge_type, "fwd")
1027                .map_err(UniError::Internal)?;
1028            let fwd_batch = fwd_ds
1029                .build_record_batch(&fwd_entries, &schema)
1030                .map_err(UniError::Internal)?;
1031            let backend = self.backend.storage.backend();
1032            fwd_ds
1033                .write_run(backend, fwd_batch)
1034                .await
1035                .map_err(UniError::Internal)?;
1036            fwd_ds
1037                .ensure_eid_index(backend)
1038                .await
1039                .map_err(UniError::Internal)?;
1040
1041            // Write to BWD delta (sorted by dst_vid)
1042            let mut bwd_entries = entries.clone();
1043            bwd_entries.sort_by_key(|e| e.dst_vid);
1044            let bwd_ds = self
1045                .backend
1046                .storage
1047                .delta_dataset(edge_type, "bwd")
1048                .map_err(UniError::Internal)?;
1049            let bwd_batch = bwd_ds
1050                .build_record_batch(&bwd_entries, &schema)
1051                .map_err(UniError::Internal)?;
1052            bwd_ds
1053                .write_run(backend, bwd_batch)
1054                .await
1055                .map_err(UniError::Internal)?;
1056            bwd_ds
1057                .ensure_eid_index(backend)
1058                .await
1059                .map_err(UniError::Internal)?;
1060
1061            // Dual-write to main edges table
1062            let mut edge_created_at: HashMap<Eid, i64> = HashMap::new();
1063            let mut edge_updated_at: HashMap<Eid, i64> = HashMap::new();
1064            let main_edges: Vec<(Eid, Vid, Vid, String, Properties, bool, u64)> = entries
1065                .iter()
1066                .map(|e| {
1067                    let deleted = matches!(e.op, Op::Delete);
1068                    if let Some(ts) = e.created_at {
1069                        edge_created_at.insert(e.eid, ts);
1070                    }
1071                    if let Some(ts) = e.updated_at {
1072                        edge_updated_at.insert(e.eid, ts);
1073                    }
1074                    (
1075                        e.eid,
1076                        e.src_vid,
1077                        e.dst_vid,
1078                        edge_type.to_string(),
1079                        e.properties.clone(),
1080                        deleted,
1081                        e.version,
1082                    )
1083                })
1084                .collect();
1085
1086            if !main_edges.is_empty() {
1087                let main_batch = MainEdgeDataset::build_record_batch(
1088                    &main_edges,
1089                    Some(&edge_created_at),
1090                    Some(&edge_updated_at),
1091                )
1092                .map_err(UniError::Internal)?;
1093
1094                MainEdgeDataset::write_batch(self.backend.storage.backend(), main_batch)
1095                    .await
1096                    .map_err(UniError::Internal)?;
1097
1098                MainEdgeDataset::ensure_default_indexes(self.backend.storage.backend())
1099                    .await
1100                    .map_err(UniError::Internal)?;
1101            }
1102        }
1103        Ok(())
1104    }
1105
1106    /// Commit all pending data and rebuild indexes.
1107    ///
1108    /// Flushes remaining buffered data, rebuilds deferred indexes, and updates
1109    /// the snapshot manifest.
1110    ///
1111    /// # Errors
1112    ///
1113    /// Returns an error if flushing, index rebuilding, or snapshot update fails.
1114    pub async fn commit(mut self) -> Result<BulkStats> {
1115        // 1. Flush remaining vertex buffers
1116        let labels: Vec<String> = self.pending_vertices.keys().cloned().collect();
1117        for label in labels {
1118            self.flush_vertices_buffer(&label).await?;
1119        }
1120
1121        // 2. Flush remaining edge buffers
1122        let edge_types: Vec<String> = self.pending_edges.keys().cloned().collect();
1123        for edge_type in edge_types {
1124            self.flush_edges_buffer(&edge_type).await?;
1125        }
1126
1127        let index_start = Instant::now();
1128
1129        // 3. Rebuild indexes for vertices
1130        if self.config.defer_vector_indexes || self.config.defer_scalar_indexes {
1131            let labels_to_rebuild: Vec<String> = self.touched_labels.iter().cloned().collect();
1132
1133            if self.config.async_indexes && !labels_to_rebuild.is_empty() {
1134                // Async mode: mark affected indexes as Stale before scheduling
1135                let schema = self.backend.schema.schema();
1136                for label in &labels_to_rebuild {
1137                    for idx in &schema.indexes {
1138                        if idx.label() == label.as_str() {
1139                            let _ = self.backend.schema.update_index_metadata(idx.name(), |m| {
1140                                m.status = uni_common::core::schema::IndexStatus::Stale;
1141                            });
1142                        }
1143                    }
1144                }
1145
1146                let rebuild_manager = IndexRebuildManager::new(
1147                    self.backend.storage.clone(),
1148                    self.backend.schema.clone(),
1149                    self.backend.config.index_rebuild.clone(),
1150                )
1151                .await
1152                .map_err(UniError::Internal)?;
1153
1154                let task_ids = rebuild_manager
1155                    .schedule(labels_to_rebuild)
1156                    .await
1157                    .map_err(UniError::Internal)?;
1158
1159                self.stats.index_task_ids = task_ids;
1160                self.stats.indexes_pending = true;
1161
1162                let manager = Arc::new(rebuild_manager);
1163                let handle = manager.start_background_worker(self.backend.shutdown.subscribe());
1164                self.backend.shutdown.track_task(handle);
1165            } else {
1166                // Sync mode: rebuild indexes blocking
1167                for label in &labels_to_rebuild {
1168                    self.report_progress(
1169                        BulkPhase::RebuildingIndexes {
1170                            label: label.clone(),
1171                        },
1172                        self.stats.vertices_inserted + self.stats.edges_inserted,
1173                        Some(label.clone()),
1174                    );
1175                    let idx_mgr = IndexManager::new(
1176                        self.backend.storage.base_path(),
1177                        self.backend.storage.schema_manager_arc(),
1178                    );
1179                    idx_mgr
1180                        .rebuild_indexes_for_label(label)
1181                        .await
1182                        .map_err(UniError::Internal)?;
1183                    self.stats.indexes_rebuilt += 1;
1184
1185                    // Update index metadata after successful sync rebuild
1186                    let now = Utc::now();
1187                    let vtable_name = uni_store::backend::table_names::vertex_table_name(label);
1188                    let row_count = self
1189                        .backend
1190                        .storage
1191                        .backend()
1192                        .count_rows(&vtable_name, None)
1193                        .await
1194                        .ok()
1195                        .map(|c| c as u64);
1196
1197                    let schema = self.backend.schema.schema();
1198                    for idx in &schema.indexes {
1199                        if idx.label() == label.as_str() {
1200                            let _ = self.backend.schema.update_index_metadata(idx.name(), |m| {
1201                                m.status = uni_common::core::schema::IndexStatus::Online;
1202                                m.last_built_at = Some(now);
1203                                if let Some(count) = row_count {
1204                                    m.row_count_at_build = Some(count);
1205                                }
1206                            });
1207                        }
1208                    }
1209                }
1210            }
1211        }
1212
1213        self.stats.index_build_duration = index_start.elapsed();
1214
1215        // 4. Update Snapshot
1216        self.report_progress(
1217            BulkPhase::Finalizing,
1218            self.stats.vertices_inserted + self.stats.edges_inserted,
1219            None,
1220        );
1221
1222        // Load latest snapshot or create new
1223        let mut manifest = self
1224            .backend
1225            .storage
1226            .snapshot_manager()
1227            .load_latest_snapshot()
1228            .await
1229            .map_err(UniError::Internal)?
1230            .unwrap_or_else(|| {
1231                SnapshotManifest::new(
1232                    Uuid::new_v4().to_string(),
1233                    self.backend.schema.schema().schema_version,
1234                )
1235            });
1236
1237        // Update Manifest
1238        let parent_id = manifest.snapshot_id.clone();
1239        manifest.parent_snapshot = Some(parent_id);
1240        manifest.snapshot_id = Uuid::new_v4().to_string();
1241        manifest.created_at = Utc::now();
1242
1243        // Update counts and versions for touched labels (vertices)
1244        let backend = self.backend.storage.backend();
1245        for label in &self.touched_labels {
1246            let vtable_name = uni_store::backend::table_names::vertex_table_name(label);
1247            let count = backend
1248                .count_rows(&vtable_name, None)
1249                .await
1250                .map_err(UniError::Internal)?;
1251
1252            let current_snap =
1253                manifest
1254                    .vertices
1255                    .entry(label.to_string())
1256                    .or_insert(LabelSnapshot {
1257                        version: 0,
1258                        count: 0,
1259                        lance_version: 0,
1260                    });
1261            current_snap.count = count as u64;
1262            // LanceDB tables don't expose Lance version directly
1263            current_snap.lance_version = 0;
1264        }
1265
1266        // Update counts and versions for touched edge types
1267        for edge_type in &self.touched_edge_types {
1268            let delta_name = uni_store::backend::table_names::delta_table_name(edge_type, "fwd");
1269            if let Ok(count) = backend.count_rows(&delta_name, None).await {
1270                let current_snap =
1271                    manifest
1272                        .edges
1273                        .entry(edge_type.to_string())
1274                        .or_insert(EdgeSnapshot {
1275                            version: 0,
1276                            count: 0,
1277                            lance_version: 0,
1278                        });
1279                current_snap.count = count as u64;
1280                // LanceDB tables don't expose Lance version directly
1281                current_snap.lance_version = 0;
1282            }
1283        }
1284
1285        // Save Snapshot. The manifest body is durable after `save_snapshot`;
1286        // only after that do we promote the recovery marker to `Committed`
1287        // (carrying the new snapshot id) so that a crash before the latest
1288        // pointer flip rolls *forward* to this snapshot rather than rolling the
1289        // committed tables back (H9).
1290        self.backend
1291            .storage
1292            .snapshot_manager()
1293            .save_snapshot(&manifest)
1294            .await
1295            .map_err(UniError::Internal)?;
1296        if !self.initial_table_versions.is_empty() {
1297            let store = self.backend.storage.store();
1298            flush_intent::write_committed(
1299                &store,
1300                &self.initial_table_versions,
1301                &manifest.snapshot_id,
1302            )
1303            .await
1304            .map_err(UniError::Internal)?;
1305        }
1306        self.backend
1307            .storage
1308            .snapshot_manager()
1309            .set_latest_snapshot(&manifest.snapshot_id)
1310            .await
1311            .map_err(UniError::Internal)?;
1312
1313        // Save schema with updated index metadata
1314        self.backend
1315            .schema
1316            .save()
1317            .await
1318            .map_err(UniError::Internal)?;
1319
1320        // Warm adjacency CSRs for all edge types written during bulk import
1321        // so that subsequent traversal queries can find the edges.
1322        let schema = self.backend.storage.schema_manager().schema();
1323        for edge_type_name in &self.touched_edge_types {
1324            if let Some(meta) = schema.edge_types.get(edge_type_name.as_str()) {
1325                let type_id = meta.id;
1326                for &dir in uni_store::storage::direction::Direction::Both.expand() {
1327                    let _ = self
1328                        .backend
1329                        .storage
1330                        .warm_adjacency(type_id, dir, None)
1331                        .await;
1332                }
1333            }
1334        }
1335
1336        // The load is fully finalized — drop the recovery marker. (A crash
1337        // before this point leaves the `Committed` marker, which reopen
1338        // reconciliation rolls forward; a crash before the `Committed` write
1339        // leaves the `Active` marker, which it rolls back.)
1340        if !self.initial_table_versions.is_empty() {
1341            let store = self.backend.storage.store();
1342            flush_intent::clear(&store)
1343                .await
1344                .map_err(UniError::Internal)?;
1345        }
1346
1347        self.committed = true;
1348        self.stats.duration = self.start_time.elapsed();
1349        Ok(self.stats.clone())
1350    }
1351
1352    /// Abort bulk loading and roll back all changes.
1353    ///
1354    /// Rolls back LanceDB tables to their pre-bulk-load versions using LanceDB's
1355    /// version API. Tables created during the bulk load are dropped. Buffered
1356    /// data that hasn't been flushed is discarded.
1357    ///
1358    /// # Errors
1359    ///
1360    /// Returns an error if rollback fails. The error message includes details
1361    /// about which tables failed to roll back.
1362    pub async fn abort(mut self) -> Result<()> {
1363        if self.committed {
1364            return Err(anyhow!("Cannot abort: bulk load already committed"));
1365        }
1366
1367        // 1. Clear pending buffers (not yet flushed to storage)
1368        self.pending_vertices.clear();
1369        self.pending_edges.clear();
1370        self.buffer_size_bytes = 0;
1371
1372        // 2. Roll back each modified table to its initial version
1373        let backend = self.backend.storage.backend();
1374        let mut rollback_errors = Vec::new();
1375        let mut rolled_back_count = 0;
1376        let mut dropped_count = 0;
1377
1378        for (table_name, initial_version) in &self.initial_table_versions {
1379            match initial_version {
1380                Some(version) => {
1381                    // Table existed before - rollback to initial version
1382                    match backend.rollback_table(table_name, *version).await {
1383                        Ok(()) => {
1384                            log::info!("Rolled back table '{}' to version {}", table_name, version);
1385                            rolled_back_count += 1;
1386                        }
1387                        Err(e) => {
1388                            rollback_errors.push(format!("{}: {}", table_name, e));
1389                        }
1390                    }
1391                }
1392                None => {
1393                    // Table was created during bulk load - drop it
1394                    match backend.drop_table(table_name).await {
1395                        Ok(()) => {
1396                            log::info!("Dropped table '{}' (created during bulk load)", table_name);
1397                            dropped_count += 1;
1398                        }
1399                        Err(e) => {
1400                            rollback_errors.push(format!("{}: {}", table_name, e));
1401                        }
1402                    }
1403                }
1404            }
1405        }
1406
1407        // 3. Clear backend cache to ensure next read picks up rolled-back state
1408        self.backend.storage.backend().clear_cache();
1409
1410        // 4. Drop the crash-recovery marker — this abort already did the
1411        // rollback, so reopen must not try to roll back again.
1412        let store = self.backend.storage.store();
1413        if let Err(e) = flush_intent::clear(&store).await {
1414            rollback_errors.push(format!("intent marker: {e}"));
1415        }
1416
1417        if rollback_errors.is_empty() {
1418            log::info!(
1419                "Bulk load aborted successfully. Rolled back {} tables, dropped {} tables.",
1420                rolled_back_count,
1421                dropped_count
1422            );
1423            Ok(())
1424        } else {
1425            Err(anyhow!(
1426                "Bulk load abort had {} rollback errors: {}",
1427                rollback_errors.len(),
1428                rollback_errors.join("; ")
1429            ))
1430        }
1431    }
1432
1433    /// Persist the crash-recovery intent marker (phase `Active`) with the
1434    /// current set of touched tables and their pre-load versions. Called before
1435    /// each multi-table flush writes anything.
1436    async fn persist_active_intent(&self) -> Result<()> {
1437        let store = self.backend.storage.store();
1438        flush_intent::write_active(&store, &self.initial_table_versions)
1439            .await
1440            .map_err(UniError::Internal)?;
1441        Ok(())
1442    }
1443
1444    fn report_progress(&self, phase: BulkPhase, rows: usize, label: Option<String>) {
1445        if let Some(cb) = &self.progress_callback {
1446            cb(BulkProgress {
1447                phase,
1448                rows_processed: rows,
1449                total_rows: None,
1450                current_label: label,
1451                elapsed: self.start_time.elapsed(),
1452            });
1453        }
1454    }
1455}