Skip to main content

uni_db/api/
bulk.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Bulk loading API for high-throughput data ingestion.
5//!
6//! This module provides `BulkWriter` for efficiently loading large amounts of
7//! vertices and edges while deferring index updates until commit time.
8//!
9//! ## Async Index Building
10//!
11//! By default, `commit()` blocks until all indexes are rebuilt. For large datasets,
12//! you can enable async index building to return immediately while indexes are
13//! built in the background:
14//!
15//! ```ignore
16//! let stats = db.bulk_writer()
17//!     .async_indexes(true)
18//!     .build()?
19//!     .insert_vertices(...)
20//!     .await?
21//!     .commit()
22//!     .await?;
23//!
24//! // Data is queryable immediately (may use full scans)
25//! // Check index status later:
26//! let status = db.index_rebuild_status().await?;
27//! ```
28
29use crate::api::UniInner;
30use anyhow::{Result, anyhow};
31use chrono::Utc;
32use std::collections::{HashMap, HashSet};
33use std::sync::Arc;
34use std::sync::atomic::{AtomicBool, Ordering};
35use std::time::{Duration, Instant};
36use uni_common::Value;
37use uni_common::core::id::{Eid, Vid};
38use uni_common::core::snapshot::{EdgeSnapshot, LabelSnapshot, SnapshotManifest};
39use uni_common::{Properties, UniError};
40use uni_store::storage::delta::{L1Entry, Op};
41use uni_store::storage::main_edge::MainEdgeDataset;
42use uni_store::storage::main_vertex::MainVertexDataset;
43use uni_store::storage::{IndexManager, IndexRebuildManager};
44use uuid::Uuid;
45
46/// Trait for types that can be converted to property maps for bulk insertion.
47///
48/// Enables `insert_vertices` to accept both `Vec<HashMap<String, Value>>`
49/// and `RecordBatch` (Arrow columnar data).
50pub trait IntoArrow {
51    /// Convert to a vector of property maps.
52    fn into_property_maps(self) -> Vec<HashMap<String, Value>>;
53}
54
55impl IntoArrow for Vec<HashMap<String, Value>> {
56    fn into_property_maps(self) -> Vec<HashMap<String, Value>> {
57        self
58    }
59}
60
61impl IntoArrow for arrow_array::RecordBatch {
62    fn into_property_maps(self) -> Vec<HashMap<String, Value>> {
63        let schema = self.schema();
64        let num_rows = self.num_rows();
65        let mut rows = Vec::with_capacity(num_rows);
66        for row_idx in 0..num_rows {
67            let mut props = HashMap::with_capacity(schema.fields().len());
68            for (col_idx, field) in schema.fields().iter().enumerate() {
69                let col = self.column(col_idx);
70                let value =
71                    uni_store::storage::arrow_convert::arrow_to_value(col.as_ref(), row_idx, None);
72                if !value.is_null() {
73                    props.insert(field.name().clone(), value);
74                }
75            }
76            rows.push(props);
77        }
78        rows
79    }
80}
81
82/// Builder for configuring a bulk writer.
83pub struct BulkWriterBuilder {
84    db: Arc<UniInner>,
85    config: BulkConfig,
86    progress_callback: Option<Box<dyn Fn(BulkProgress) + Send>>,
87    /// Session write guard โ€” released when the BulkWriter is committed/aborted/dropped.
88    session_write_guard: Option<Arc<AtomicBool>>,
89    /// If true, the write guard was already acquired by the caller (e.g. AppenderBuilder).
90    guard_pre_acquired: bool,
91    /// If true, the session is pinned to a read-only snapshot.
92    is_pinned: bool,
93    /// Session ID for error messages.
94    session_id: String,
95}
96
97impl BulkWriterBuilder {
98    /// Create a new bulk writer builder with a pre-acquired write guard.
99    ///
100    /// Used by `AppenderBuilder` which acquires the guard before creating the builder.
101    pub(crate) fn new_with_guard(db: Arc<UniInner>, guard: Arc<AtomicBool>) -> Self {
102        Self {
103            db,
104            config: BulkConfig::default(),
105            progress_callback: None,
106            session_write_guard: Some(guard),
107            guard_pre_acquired: true,
108            is_pinned: false,
109            session_id: String::new(),
110        }
111    }
112
113    /// Create a bulk writer builder without a write guard.
114    ///
115    /// Used by `Transaction::bulk_writer()` โ€” the Transaction already holds
116    /// the session write guard, so the BulkWriter must not release it.
117    pub(crate) fn new_unguarded(db: Arc<UniInner>) -> Self {
118        Self {
119            db,
120            config: BulkConfig::default(),
121            progress_callback: None,
122            session_write_guard: None,
123            guard_pre_acquired: true,
124            is_pinned: false,
125            session_id: String::new(),
126        }
127    }
128
129    /// Create a new bulk writer builder with deferred guard acquisition.
130    ///
131    /// The guard is acquired in [`build()`](Self::build) rather than at creation time,
132    /// so that `Session::bulk_writer()` is infallible.
133    #[allow(dead_code)] // Scaffolding for Session::bulk_writer()
134    pub(crate) fn new_deferred(
135        db: Arc<UniInner>,
136        guard: Arc<AtomicBool>,
137        session_id: String,
138        is_pinned: bool,
139    ) -> Self {
140        Self {
141            db,
142            config: BulkConfig::default(),
143            progress_callback: None,
144            session_write_guard: Some(guard),
145            guard_pre_acquired: false,
146            is_pinned,
147            session_id,
148        }
149    }
150
151    /// Set whether to defer vector index building until commit.
152    pub fn defer_vector_indexes(mut self, defer: bool) -> Self {
153        self.config.defer_vector_indexes = defer;
154        self
155    }
156
157    /// Set whether to defer scalar index building until commit.
158    pub fn defer_scalar_indexes(mut self, defer: bool) -> Self {
159        self.config.defer_scalar_indexes = defer;
160        self
161    }
162
163    /// Set the batch size for buffering before flush.
164    pub fn batch_size(mut self, size: usize) -> Self {
165        self.config.batch_size = size;
166        self
167    }
168
169    /// Set a progress callback for monitoring bulk load progress.
170    pub fn on_progress<F: Fn(BulkProgress) + Send + 'static>(mut self, f: F) -> Self {
171        self.progress_callback = Some(Box::new(f));
172        self
173    }
174
175    /// Build indexes asynchronously after commit.
176    ///
177    /// When enabled, `commit()` returns immediately after data is written,
178    /// and indexes are rebuilt in the background. The data is queryable
179    /// immediately but queries may use full scans until indexes are ready.
180    ///
181    /// Use `Uni::index_rebuild_status()` to check progress.
182    ///
183    /// Default: `false` (blocking index rebuild)
184    pub fn async_indexes(mut self, async_: bool) -> Self {
185        self.config.async_indexes = async_;
186        self
187    }
188
189    /// Set whether to validate constraints during bulk load.
190    ///
191    /// When enabled (default), BulkWriter validates NOT NULL, UNIQUE, and CHECK
192    /// constraints before each flush, matching the behavior of regular Writer.
193    /// Set to `false` for trusted data sources to improve performance.
194    ///
195    /// Default: `true`
196    pub fn validate_constraints(mut self, validate: bool) -> Self {
197        self.config.validate_constraints = validate;
198        self
199    }
200
201    /// Set the maximum buffer size before triggering a checkpoint flush.
202    ///
203    /// When the in-memory buffer exceeds this size, a checkpoint is triggered
204    /// to flush data to storage. This allows bulk loading of arbitrarily large
205    /// datasets while controlling memory usage.
206    ///
207    /// Default: 1 GB (1_073_741_824 bytes)
208    pub fn max_buffer_size_bytes(mut self, size: usize) -> Self {
209        self.config.max_buffer_size_bytes = size;
210        self
211    }
212
213    /// Build the bulk writer.
214    ///
215    /// # Errors
216    ///
217    /// Returns an error if:
218    /// - The session is pinned to a read-only snapshot
219    /// - Another write context is already active on the session
220    /// - The database is not writable
221    pub fn build(self) -> Result<BulkWriter> {
222        // Check pinned state (deferred from Session::bulk_writer)
223        if self.is_pinned {
224            return Err(UniError::ReadOnly {
225                operation: "bulk_writer".to_string(),
226            }
227            .into());
228        }
229
230        // Acquire write guard if not pre-acquired (deferred from Session::bulk_writer)
231        if !self.guard_pre_acquired
232            && let Some(guard) = &self.session_write_guard
233            && guard
234                .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
235                .is_err()
236        {
237            return Err(UniError::WriteContextAlreadyActive {
238                session_id: self.session_id.clone(),
239                hint: "Only one Transaction, BulkWriter, or Appender can be active per Session at a time. Commit or rollback the active one first, or create a separate Session for concurrent writes.",
240            }.into());
241        }
242
243        if self.db.writer.is_none() {
244            // Release session guard on failure so the session isn't stuck.
245            if let Some(guard) = &self.session_write_guard {
246                guard.store(false, Ordering::SeqCst);
247            }
248            return Err(anyhow!("BulkWriter requires a writable database instance"));
249        }
250
251        Ok(BulkWriter {
252            db: self.db,
253            config: self.config,
254            progress_callback: self.progress_callback,
255            stats: BulkStats::default(),
256            start_time: Instant::now(),
257            pending_vertices: HashMap::new(),
258            pending_edges: HashMap::new(),
259            touched_labels: HashSet::new(),
260            touched_edge_types: HashSet::new(),
261            initial_table_versions: HashMap::new(),
262            buffer_size_bytes: 0,
263            committed: false,
264            session_write_guard: self.session_write_guard,
265        })
266    }
267}
268
269/// Configuration for bulk loading operations.
270pub struct BulkConfig {
271    /// Whether to defer vector index building until commit.
272    pub defer_vector_indexes: bool,
273    /// Whether to defer scalar index building until commit.
274    pub defer_scalar_indexes: bool,
275    /// Number of rows to buffer before flushing to storage.
276    pub batch_size: usize,
277    /// Whether to build indexes asynchronously after commit.
278    pub async_indexes: bool,
279    /// Whether to validate constraints (NOT NULL, UNIQUE, CHECK) during bulk load.
280    ///
281    /// Default: `true`. Set to `false` to skip validation for trusted data sources.
282    pub validate_constraints: bool,
283    /// Maximum buffer size in bytes before triggering a checkpoint flush.
284    ///
285    /// Default: 1 GB (1_073_741_824 bytes). When buffer size exceeds this limit,
286    /// a checkpoint is triggered to flush data to storage while continuing to
287    /// accept new data.
288    pub max_buffer_size_bytes: usize,
289}
290
291impl Default for BulkConfig {
292    fn default() -> Self {
293        Self {
294            defer_vector_indexes: true,
295            defer_scalar_indexes: true,
296            batch_size: 10_000,
297            async_indexes: false,
298            validate_constraints: true,
299            max_buffer_size_bytes: 1_073_741_824, // 1 GB
300        }
301    }
302}
303
304#[derive(Debug, Clone)]
305pub struct BulkProgress {
306    pub phase: BulkPhase,
307    pub rows_processed: usize,
308    pub total_rows: Option<usize>,
309    pub current_label: Option<String>,
310    pub elapsed: Duration,
311}
312
313#[derive(Debug, Clone)]
314pub enum BulkPhase {
315    Inserting,
316    RebuildingIndexes { label: String },
317    Finalizing,
318}
319
320#[derive(Debug, Clone, Default)]
321pub struct BulkStats {
322    pub vertices_inserted: usize,
323    pub edges_inserted: usize,
324    pub indexes_rebuilt: usize,
325    pub duration: Duration,
326    pub index_build_duration: Duration,
327    /// Task IDs for async index rebuilds (populated when `async_indexes` is true).
328    pub index_task_ids: Vec<String>,
329    /// True if index building was deferred to background (async mode).
330    pub indexes_pending: bool,
331}
332
333/// Alias for [`BulkStats`] (spec ยง8.1 compatibility).
334pub type BulkStatsAccumulator = BulkStats;
335
336/// Edge data for bulk insertion.
337///
338/// Contains source/destination vertex IDs and properties.
339#[derive(Debug, Clone)]
340pub struct EdgeData {
341    /// Source vertex ID.
342    pub src_vid: Vid,
343    /// Destination vertex ID.
344    pub dst_vid: Vid,
345    /// Edge properties.
346    pub properties: Properties,
347}
348
349impl EdgeData {
350    /// Create new edge data.
351    pub fn new(src_vid: Vid, dst_vid: Vid, properties: Properties) -> Self {
352        Self {
353            src_vid,
354            dst_vid,
355            properties,
356        }
357    }
358}
359
360/// Bulk writer for high-throughput data ingestion.
361///
362/// Buffers vertices and edges, deferring index updates until commit.
363/// Supports constraint validation, automatic checkpointing when buffer limits
364/// are exceeded, and proper rollback via LanceDB version tracking.
365///
366/// Use `abort()` to discard uncommitted changes and roll back storage to its
367/// pre-bulk-load state.
368pub struct BulkWriter {
369    db: Arc<UniInner>,
370    config: BulkConfig,
371    progress_callback: Option<Box<dyn Fn(BulkProgress) + Send>>,
372    stats: BulkStats,
373    start_time: Instant,
374    // Buffered data per label/type
375    pending_vertices: HashMap<String, Vec<(Vid, Properties)>>,
376    pending_edges: HashMap<String, Vec<L1Entry>>,
377    // Track what was written (for index rebuild)
378    touched_labels: HashSet<String>,
379    touched_edge_types: HashSet<String>,
380    // Track LanceDB table versions before bulk load started (for abort rollback)
381    // Key: table name, Value: version before first write (None = table created during bulk load)
382    initial_table_versions: HashMap<String, Option<u64>>,
383    // Current buffer size in bytes (approximate)
384    buffer_size_bytes: usize,
385    committed: bool,
386    /// Session write guard โ€” released when committed/aborted/dropped.
387    session_write_guard: Option<Arc<AtomicBool>>,
388}
389
390impl BulkWriter {
391    /// Returns a snapshot of the current bulk load statistics.
392    /// Updated after each batch flush.
393    pub fn stats(&self) -> &BulkStats {
394        &self.stats
395    }
396
397    /// Returns the set of vertex labels that have been written to.
398    pub fn touched_labels(&self) -> Vec<String> {
399        self.touched_labels.iter().cloned().collect()
400    }
401
402    /// Returns the set of edge types that have been written to.
403    pub fn touched_edge_types(&self) -> Vec<String> {
404        self.touched_edge_types.iter().cloned().collect()
405    }
406
407    /// Returns the current timestamp in microseconds since Unix epoch.
408    fn get_current_timestamp_micros(&self) -> i64 {
409        use std::time::{SystemTime, UNIX_EPOCH};
410        SystemTime::now()
411            .duration_since(UNIX_EPOCH)
412            .map(|d| d.as_micros() as i64)
413            .unwrap_or(0)
414    }
415
416    /// Insert vertices in bulk.
417    ///
418    /// The vertices are buffered until `batch_size` is reached, then written to storage.
419    /// When constraint validation is enabled, constraints are checked before each flush.
420    /// When the buffer size exceeds `max_buffer_size_bytes`, a checkpoint is triggered.
421    ///
422    /// # Errors
423    ///
424    /// Returns an error if:
425    /// - The label is not found in the schema
426    /// - Constraint validation fails (when enabled)
427    /// - Storage write fails
428    pub async fn insert_vertices(
429        &mut self,
430        label: &str,
431        vertices: impl IntoArrow,
432    ) -> Result<Vec<Vid>> {
433        let vertices = vertices.into_property_maps();
434        let schema = self.db.schema.schema();
435        // Validate label exists in schema
436        schema
437            .labels
438            .get(label)
439            .ok_or_else(|| UniError::LabelNotFound {
440                label: label.to_string(),
441            })?;
442        // Validate constraints before buffering (if enabled)
443        if self.config.validate_constraints {
444            self.validate_vertex_batch_constraints(label, &vertices)
445                .await?;
446        }
447
448        // Allocate VIDs (batched for performance)
449        let vids = {
450            let writer = self.db.writer.as_ref().unwrap().read().await;
451            writer
452                .allocate_vids(vertices.len())
453                .await
454                .map_err(UniError::Internal)?
455        };
456
457        // Track buffer size and add to buffer
458        let buffer = self.pending_vertices.entry(label.to_string()).or_default();
459        for (i, props) in vertices.into_iter().enumerate() {
460            self.buffer_size_bytes += Self::estimate_properties_size(&props);
461            buffer.push((vids[i], props));
462        }
463
464        self.touched_labels.insert(label.to_string());
465
466        // Check if we need to checkpoint based on buffer size
467        if self.buffer_size_bytes >= self.config.max_buffer_size_bytes {
468            self.checkpoint().await?;
469        } else {
470            // Otherwise, check batch size threshold for this label only
471            self.check_flush_vertices(label).await?;
472        }
473
474        self.stats.vertices_inserted += vids.len();
475        self.report_progress(
476            BulkPhase::Inserting,
477            self.stats.vertices_inserted,
478            Some(label.to_string()),
479        );
480
481        Ok(vids)
482    }
483
484    /// Estimate the size of a properties map in bytes.
485    fn estimate_properties_size(props: &Properties) -> usize {
486        let mut size = 0;
487        for (key, value) in props {
488            size += key.len();
489            size += Self::estimate_value_size(value);
490        }
491        size
492    }
493
494    /// Estimate the size of a value in bytes.
495    fn estimate_value_size(value: &Value) -> usize {
496        match value {
497            Value::Null => 1,
498            Value::Bool(_) => 1,
499            Value::Int(_) | Value::Float(_) => 8,
500            Value::String(s) => s.len(),
501            Value::Bytes(b) => b.len(),
502            Value::List(arr) => arr.iter().map(Self::estimate_value_size).sum::<usize>() + 8,
503            Value::Map(obj) => {
504                obj.iter()
505                    .map(|(k, v)| k.len() + Self::estimate_value_size(v))
506                    .sum::<usize>()
507                    + 8
508            }
509            Value::Vector(v) => v.len() * 4,
510            _ => 16, // Node, Edge, Path
511        }
512    }
513
514    /// Validate constraints for a batch of vertices before insertion.
515    ///
516    /// Checks NOT NULL, UNIQUE, and CHECK constraints. For UNIQUE constraints,
517    /// validates both within the batch and against already-buffered data.
518    async fn validate_vertex_batch_constraints(
519        &self,
520        label: &str,
521        vertices: &[Properties],
522    ) -> Result<()> {
523        let schema = self.db.schema.schema();
524
525        // Check NOT NULL and CHECK constraints for each vertex
526        if let Some(props_meta) = schema.properties.get(label) {
527            for (idx, props) in vertices.iter().enumerate() {
528                // NOT NULL constraints
529                for (prop_name, meta) in props_meta {
530                    if !meta.nullable && props.get(prop_name).is_none_or(|v| v.is_null()) {
531                        return Err(anyhow!(
532                            "NOT NULL constraint violation at row {}: property '{}' cannot be null for label '{}'",
533                            idx,
534                            prop_name,
535                            label
536                        ));
537                    }
538                }
539            }
540        }
541
542        // Check explicit constraints (UNIQUE, CHECK)
543        for constraint in &schema.constraints {
544            if !constraint.enabled {
545                continue;
546            }
547            match &constraint.target {
548                uni_common::core::schema::ConstraintTarget::Label(l) if l == label => {}
549                _ => continue,
550            }
551
552            match &constraint.constraint_type {
553                uni_common::core::schema::ConstraintType::Unique {
554                    properties: unique_props,
555                } => {
556                    // Check for duplicates within the batch
557                    let mut seen_keys: HashSet<String> = HashSet::new();
558                    for (idx, props) in vertices.iter().enumerate() {
559                        let key = self.compute_unique_key(unique_props, props);
560                        if let Some(k) = key
561                            && !seen_keys.insert(k.clone())
562                        {
563                            return Err(anyhow!(
564                                "UNIQUE constraint violation at row {}: duplicate key '{}' in batch",
565                                idx,
566                                k
567                            ));
568                        }
569                    }
570
571                    // Check against already-buffered data
572                    if let Some(buffered) = self.pending_vertices.get(label) {
573                        for (idx, props) in vertices.iter().enumerate() {
574                            let key = self.compute_unique_key(unique_props, props);
575                            if let Some(k) = key {
576                                for (_, buffered_props) in buffered {
577                                    let buffered_key =
578                                        self.compute_unique_key(unique_props, buffered_props);
579                                    if buffered_key.as_ref() == Some(&k) {
580                                        return Err(anyhow!(
581                                            "UNIQUE constraint violation at row {}: key '{}' conflicts with buffered data",
582                                            idx,
583                                            k
584                                        ));
585                                    }
586                                }
587                            }
588                        }
589                    }
590                }
591                uni_common::core::schema::ConstraintType::Exists { property } => {
592                    for (idx, props) in vertices.iter().enumerate() {
593                        if props.get(property).is_none_or(|v| v.is_null()) {
594                            return Err(anyhow!(
595                                "EXISTS constraint violation at row {}: property '{}' must exist",
596                                idx,
597                                property
598                            ));
599                        }
600                    }
601                }
602                uni_common::core::schema::ConstraintType::Check { expression } => {
603                    for (idx, props) in vertices.iter().enumerate() {
604                        if !self.evaluate_check_expression(expression, props)? {
605                            return Err(anyhow!(
606                                "CHECK constraint '{}' violated at row {}: expression '{}' evaluated to false",
607                                constraint.name,
608                                idx,
609                                expression
610                            ));
611                        }
612                    }
613                }
614                _ => {}
615            }
616        }
617
618        Ok(())
619    }
620
621    /// Compute a unique key string from properties for UNIQUE constraint checking.
622    fn compute_unique_key(&self, unique_props: &[String], props: &Properties) -> Option<String> {
623        let mut parts = Vec::new();
624        for prop in unique_props {
625            match props.get(prop) {
626                Some(v) if !v.is_null() => parts.push(v.to_string()),
627                _ => return None, // Missing property means can't enforce uniqueness
628            }
629        }
630        Some(parts.join(":"))
631    }
632
633    /// Evaluate a simple CHECK constraint expression.
634    fn evaluate_check_expression(&self, expression: &str, properties: &Properties) -> Result<bool> {
635        let parts: Vec<&str> = expression.split_whitespace().collect();
636        if parts.len() != 3 {
637            // Complex expression - allow for now
638            return Ok(true);
639        }
640
641        let prop_part = parts[0].trim_start_matches('(');
642        let prop_name = if let Some(idx) = prop_part.find('.') {
643            &prop_part[idx + 1..]
644        } else {
645            prop_part
646        };
647
648        let op = parts[1];
649        let val_str = parts[2].trim_end_matches(')');
650
651        let prop_val = match properties.get(prop_name) {
652            Some(v) => v,
653            None => return Ok(true), // Missing property passes CHECK
654        };
655
656        // Parse target value
657        let target_val = if (val_str.starts_with('\'') && val_str.ends_with('\''))
658            || (val_str.starts_with('"') && val_str.ends_with('"'))
659        {
660            Value::String(val_str[1..val_str.len() - 1].to_string())
661        } else if let Ok(n) = val_str.parse::<i64>() {
662            Value::Int(n)
663        } else if let Ok(n) = val_str.parse::<f64>() {
664            Value::Float(n)
665        } else if let Ok(b) = val_str.parse::<bool>() {
666            Value::Bool(b)
667        } else {
668            Value::String(val_str.to_string())
669        };
670
671        match op {
672            "=" | "==" => Ok(prop_val == &target_val),
673            "!=" | "<>" => Ok(prop_val != &target_val),
674            ">" => self
675                .compare_json_values(prop_val, &target_val)
676                .map(|c| c > 0),
677            "<" => self
678                .compare_json_values(prop_val, &target_val)
679                .map(|c| c < 0),
680            ">=" => self
681                .compare_json_values(prop_val, &target_val)
682                .map(|c| c >= 0),
683            "<=" => self
684                .compare_json_values(prop_val, &target_val)
685                .map(|c| c <= 0),
686            _ => Ok(true), // Unknown operator - allow
687        }
688    }
689
690    /// Compare two values, returning -1, 0, or 1.
691    fn compare_json_values(&self, a: &Value, b: &Value) -> Result<i8> {
692        match (a, b) {
693            (Value::Int(n1), Value::Int(n2)) => Ok(n1.cmp(n2) as i8),
694            (Value::Float(f1), Value::Float(f2)) => {
695                if f1 < f2 {
696                    Ok(-1)
697                } else if f1 > f2 {
698                    Ok(1)
699                } else {
700                    Ok(0)
701                }
702            }
703            (Value::Int(n), Value::Float(f)) => {
704                let nf = *n as f64;
705                if nf < *f {
706                    Ok(-1)
707                } else if nf > *f {
708                    Ok(1)
709                } else {
710                    Ok(0)
711                }
712            }
713            (Value::Float(f), Value::Int(n)) => {
714                let nf = *n as f64;
715                if *f < nf {
716                    Ok(-1)
717                } else if *f > nf {
718                    Ok(1)
719                } else {
720                    Ok(0)
721                }
722            }
723            (Value::String(s1), Value::String(s2)) => match s1.cmp(s2) {
724                std::cmp::Ordering::Less => Ok(-1),
725                std::cmp::Ordering::Greater => Ok(1),
726                std::cmp::Ordering::Equal => Ok(0),
727            },
728            _ => Err(anyhow!(
729                "Cannot compare incompatible types: {:?} vs {:?}",
730                a,
731                b
732            )),
733        }
734    }
735
736    /// Checkpoint: flush all pending data to storage.
737    ///
738    /// Called automatically when buffer size exceeds `max_buffer_size_bytes`.
739    /// Flushes all buffered vertices and edges, then resets the buffer size counter.
740    async fn checkpoint(&mut self) -> Result<()> {
741        log::debug!(
742            "Checkpoint triggered at {} bytes (limit: {})",
743            self.buffer_size_bytes,
744            self.config.max_buffer_size_bytes
745        );
746
747        // Flush all pending vertices
748        let labels: Vec<String> = self.pending_vertices.keys().cloned().collect();
749        for label in labels {
750            self.flush_vertices_buffer(&label).await?;
751        }
752
753        // Flush all pending edges
754        let edge_types: Vec<String> = self.pending_edges.keys().cloned().collect();
755        for edge_type in edge_types {
756            self.flush_edges_buffer(&edge_type).await?;
757        }
758
759        // Reset buffer size
760        self.buffer_size_bytes = 0;
761
762        Ok(())
763    }
764
765    // Helper to flush vertex buffer if full
766    async fn check_flush_vertices(&mut self, label: &str) -> Result<()> {
767        let should_flush = {
768            if let Some(buf) = self.pending_vertices.get(label) {
769                buf.len() >= self.config.batch_size
770            } else {
771                false
772            }
773        };
774
775        if should_flush {
776            self.flush_vertices_buffer(label).await?;
777        }
778        Ok(())
779    }
780
781    /// Flush vertex buffer to LanceDB storage.
782    ///
783    /// Records the initial table version before first write for rollback support.
784    /// Writes to both per-label table and main vertices table.
785    async fn flush_vertices_buffer(&mut self, label: &str) -> Result<()> {
786        if let Some(vertices) = self.pending_vertices.remove(label) {
787            if vertices.is_empty() {
788                return Ok(());
789            }
790
791            // Record initial version for abort rollback (only once per table)
792            let table_name = uni_store::backend::table_names::vertex_table_name(label);
793            if !self.initial_table_versions.contains_key(&table_name) {
794                let backend = self.db.storage.backend();
795                let version = backend
796                    .get_table_version(&table_name)
797                    .await
798                    .map_err(UniError::Internal)?;
799                self.initial_table_versions.insert(table_name, version);
800            }
801
802            // Record main vertices table version for rollback
803            let main_table_name =
804                uni_store::backend::table_names::main_vertex_table_name().to_string();
805            if !self.initial_table_versions.contains_key(&main_table_name) {
806                let backend = self.db.storage.backend();
807                let version = backend
808                    .get_table_version(&main_table_name)
809                    .await
810                    .map_err(UniError::Internal)?;
811                self.initial_table_versions
812                    .insert(main_table_name.clone(), version);
813            }
814
815            let ds = self
816                .db
817                .storage
818                .vertex_dataset(label)
819                .map_err(UniError::Internal)?;
820            let schema = self.db.schema.schema();
821
822            let deleted = vec![false; vertices.len()];
823            let versions = vec![1; vertices.len()]; // Version 1 for bulk load
824
825            // Generate timestamps for this batch
826            let now = self.get_current_timestamp_micros();
827            let mut created_at: HashMap<Vid, i64> = HashMap::new();
828            let mut updated_at: HashMap<Vid, i64> = HashMap::new();
829            for (vid, _) in &vertices {
830                created_at.insert(*vid, now);
831                updated_at.insert(*vid, now);
832            }
833
834            // Build per-label and main-vertex entries from the 2-tuple input.
835            // Both tables need labels attached; compute once per vertex.
836            let labels = vec![label.to_string()];
837            let vertices_with_labels: Vec<(Vid, Vec<String>, Properties)> = vertices
838                .iter()
839                .map(|(vid, props)| (*vid, labels.clone(), props.clone()))
840                .collect();
841
842            let batch = ds
843                .build_record_batch_with_timestamps(
844                    &vertices_with_labels,
845                    &deleted,
846                    &versions,
847                    &schema,
848                    Some(&created_at),
849                    Some(&updated_at),
850                )
851                .map_err(UniError::Internal)?;
852
853            // Write to per-label table via backend
854            let backend = self.db.storage.backend();
855            ds.write_batch(backend, batch, &schema)
856                .await
857                .map_err(UniError::Internal)?;
858
859            // Create default scalar indexes (_vid, _uid) which are critical for basic function
860            ds.ensure_default_indexes(backend)
861                .await
862                .map_err(UniError::Internal)?;
863
864            // Dual-write to main vertices table
865            let main_vertices: Vec<(Vid, Vec<String>, Properties, bool, u64)> =
866                vertices_with_labels
867                    .into_iter()
868                    .map(|(vid, lbls, props)| (vid, lbls, props, false, 1u64))
869                    .collect();
870
871            if !main_vertices.is_empty() {
872                let main_batch = MainVertexDataset::build_record_batch(
873                    &main_vertices,
874                    Some(&created_at),
875                    Some(&updated_at),
876                )
877                .map_err(UniError::Internal)?;
878
879                MainVertexDataset::write_batch(backend, main_batch)
880                    .await
881                    .map_err(UniError::Internal)?;
882
883                MainVertexDataset::ensure_default_indexes(backend)
884                    .await
885                    .map_err(UniError::Internal)?;
886            }
887        }
888        Ok(())
889    }
890
891    /// Insert edges in bulk.
892    ///
893    /// Edges are buffered until `batch_size` is reached, then written to storage.
894    /// When the buffer size exceeds `max_buffer_size_bytes`, a checkpoint is triggered.
895    /// Indexes are NOT updated during these writes.
896    ///
897    /// # Errors
898    ///
899    /// Returns an error if the edge type is not found in the schema or if
900    /// storage write fails.
901    pub async fn insert_edges(
902        &mut self,
903        edge_type: &str,
904        edges: Vec<EdgeData>,
905    ) -> Result<Vec<Eid>> {
906        let schema = self.db.schema.schema();
907        let edge_meta =
908            schema
909                .edge_types
910                .get(edge_type)
911                .ok_or_else(|| UniError::EdgeTypeNotFound {
912                    edge_type: edge_type.to_string(),
913                })?;
914        let type_id = edge_meta.id;
915
916        // Allocate EIDs
917        let mut eids = Vec::with_capacity(edges.len());
918        {
919            let writer = self.db.writer.as_ref().unwrap().read().await;
920            for _ in 0..edges.len() {
921                eids.push(writer.next_eid(type_id).await.map_err(UniError::Internal)?);
922            }
923        }
924
925        // Convert to L1Entry format and track buffer size
926        let now = self.get_current_timestamp_micros();
927        let mut added_size = 0usize;
928        let entries: Vec<L1Entry> = edges
929            .into_iter()
930            .enumerate()
931            .map(|(i, edge)| {
932                // Estimate size for buffer tracking (16 bytes for VIDs + 8 for EID + properties)
933                added_size += 32 + Self::estimate_properties_size(&edge.properties);
934                L1Entry {
935                    src_vid: edge.src_vid,
936                    dst_vid: edge.dst_vid,
937                    eid: eids[i],
938                    op: Op::Insert,
939                    version: 1,
940                    properties: edge.properties,
941                    created_at: Some(now),
942                    updated_at: Some(now),
943                }
944            })
945            .collect();
946        self.buffer_size_bytes += added_size;
947        self.pending_edges
948            .entry(edge_type.to_string())
949            .or_default()
950            .extend(entries);
951
952        self.touched_edge_types.insert(edge_type.to_string());
953
954        // Check if we need to checkpoint based on buffer size
955        if self.buffer_size_bytes >= self.config.max_buffer_size_bytes {
956            self.checkpoint().await?;
957        } else {
958            self.check_flush_edges(edge_type).await?;
959        }
960
961        self.stats.edges_inserted += eids.len();
962        self.report_progress(
963            BulkPhase::Inserting,
964            self.stats.vertices_inserted + self.stats.edges_inserted,
965            Some(edge_type.to_string()),
966        );
967
968        Ok(eids)
969    }
970
971    /// Check and flush edge buffer if full.
972    async fn check_flush_edges(&mut self, edge_type: &str) -> Result<()> {
973        let should_flush = self
974            .pending_edges
975            .get(edge_type)
976            .is_some_and(|buf| buf.len() >= self.config.batch_size);
977
978        if should_flush {
979            self.flush_edges_buffer(edge_type).await?;
980        }
981        Ok(())
982    }
983
984    /// Flush edge buffer to delta datasets.
985    ///
986    /// Records initial table versions before first write for rollback support.
987    /// Writes to both per-type delta tables and main edges table.
988    #[expect(
989        clippy::map_entry,
990        reason = "async code between contains_key and insert"
991    )]
992    async fn flush_edges_buffer(&mut self, edge_type: &str) -> Result<()> {
993        if let Some(entries) = self.pending_edges.remove(edge_type) {
994            if entries.is_empty() {
995                return Ok(());
996            }
997
998            let schema = self.db.schema.schema();
999            let backend = self.db.storage.backend();
1000
1001            // Record initial versions for abort rollback (FWD and BWD tables)
1002            let fwd_table_name =
1003                uni_store::backend::table_names::delta_table_name(edge_type, "fwd");
1004            if !self.initial_table_versions.contains_key(&fwd_table_name) {
1005                let version = backend
1006                    .get_table_version(&fwd_table_name)
1007                    .await
1008                    .map_err(UniError::Internal)?;
1009                self.initial_table_versions.insert(fwd_table_name, version);
1010            }
1011            let bwd_table_name =
1012                uni_store::backend::table_names::delta_table_name(edge_type, "bwd");
1013            if !self.initial_table_versions.contains_key(&bwd_table_name) {
1014                let version = backend
1015                    .get_table_version(&bwd_table_name)
1016                    .await
1017                    .map_err(UniError::Internal)?;
1018                self.initial_table_versions.insert(bwd_table_name, version);
1019            }
1020
1021            // Record main edges table version for rollback
1022            let main_edge_table_name =
1023                uni_store::backend::table_names::main_edge_table_name().to_string();
1024            if !self
1025                .initial_table_versions
1026                .contains_key(&main_edge_table_name)
1027            {
1028                let version = backend
1029                    .get_table_version(&main_edge_table_name)
1030                    .await
1031                    .map_err(UniError::Internal)?;
1032                self.initial_table_versions
1033                    .insert(main_edge_table_name.clone(), version);
1034            }
1035
1036            // Write to FWD delta (sorted by src_vid)
1037            let mut fwd_entries = entries.clone();
1038            fwd_entries.sort_by_key(|e| e.src_vid);
1039            let fwd_ds = self
1040                .db
1041                .storage
1042                .delta_dataset(edge_type, "fwd")
1043                .map_err(UniError::Internal)?;
1044            let fwd_batch = fwd_ds
1045                .build_record_batch(&fwd_entries, &schema)
1046                .map_err(UniError::Internal)?;
1047            let backend = self.db.storage.backend();
1048            fwd_ds
1049                .write_run(backend, fwd_batch)
1050                .await
1051                .map_err(UniError::Internal)?;
1052            fwd_ds
1053                .ensure_eid_index(backend)
1054                .await
1055                .map_err(UniError::Internal)?;
1056
1057            // Write to BWD delta (sorted by dst_vid)
1058            let mut bwd_entries = entries.clone();
1059            bwd_entries.sort_by_key(|e| e.dst_vid);
1060            let bwd_ds = self
1061                .db
1062                .storage
1063                .delta_dataset(edge_type, "bwd")
1064                .map_err(UniError::Internal)?;
1065            let bwd_batch = bwd_ds
1066                .build_record_batch(&bwd_entries, &schema)
1067                .map_err(UniError::Internal)?;
1068            bwd_ds
1069                .write_run(backend, bwd_batch)
1070                .await
1071                .map_err(UniError::Internal)?;
1072            bwd_ds
1073                .ensure_eid_index(backend)
1074                .await
1075                .map_err(UniError::Internal)?;
1076
1077            // Dual-write to main edges table
1078            let mut edge_created_at: HashMap<Eid, i64> = HashMap::new();
1079            let mut edge_updated_at: HashMap<Eid, i64> = HashMap::new();
1080            let main_edges: Vec<(Eid, Vid, Vid, String, Properties, bool, u64)> = entries
1081                .iter()
1082                .map(|e| {
1083                    let deleted = matches!(e.op, Op::Delete);
1084                    if let Some(ts) = e.created_at {
1085                        edge_created_at.insert(e.eid, ts);
1086                    }
1087                    if let Some(ts) = e.updated_at {
1088                        edge_updated_at.insert(e.eid, ts);
1089                    }
1090                    (
1091                        e.eid,
1092                        e.src_vid,
1093                        e.dst_vid,
1094                        edge_type.to_string(),
1095                        e.properties.clone(),
1096                        deleted,
1097                        e.version,
1098                    )
1099                })
1100                .collect();
1101
1102            if !main_edges.is_empty() {
1103                let main_batch = MainEdgeDataset::build_record_batch(
1104                    &main_edges,
1105                    Some(&edge_created_at),
1106                    Some(&edge_updated_at),
1107                )
1108                .map_err(UniError::Internal)?;
1109
1110                MainEdgeDataset::write_batch(self.db.storage.backend(), main_batch)
1111                    .await
1112                    .map_err(UniError::Internal)?;
1113
1114                MainEdgeDataset::ensure_default_indexes(self.db.storage.backend())
1115                    .await
1116                    .map_err(UniError::Internal)?;
1117            }
1118        }
1119        Ok(())
1120    }
1121
1122    /// Commit all pending data and rebuild indexes.
1123    ///
1124    /// Flushes remaining buffered data, rebuilds deferred indexes, and updates
1125    /// the snapshot manifest.
1126    ///
1127    /// # Errors
1128    ///
1129    /// Returns an error if flushing, index rebuilding, or snapshot update fails.
1130    pub async fn commit(mut self) -> Result<BulkStats> {
1131        // 1. Flush remaining vertex buffers
1132        let labels: Vec<String> = self.pending_vertices.keys().cloned().collect();
1133        for label in labels {
1134            self.flush_vertices_buffer(&label).await?;
1135        }
1136
1137        // 2. Flush remaining edge buffers
1138        let edge_types: Vec<String> = self.pending_edges.keys().cloned().collect();
1139        for edge_type in edge_types {
1140            self.flush_edges_buffer(&edge_type).await?;
1141        }
1142
1143        let index_start = Instant::now();
1144
1145        // 3. Rebuild indexes for vertices
1146        if self.config.defer_vector_indexes || self.config.defer_scalar_indexes {
1147            let labels_to_rebuild: Vec<String> = self.touched_labels.iter().cloned().collect();
1148
1149            if self.config.async_indexes && !labels_to_rebuild.is_empty() {
1150                // Async mode: mark affected indexes as Stale before scheduling
1151                let schema = self.db.schema.schema();
1152                for label in &labels_to_rebuild {
1153                    for idx in &schema.indexes {
1154                        if idx.label() == label.as_str() {
1155                            let _ = self.db.schema.update_index_metadata(idx.name(), |m| {
1156                                m.status = uni_common::core::schema::IndexStatus::Stale;
1157                            });
1158                        }
1159                    }
1160                }
1161
1162                let rebuild_manager = IndexRebuildManager::new(
1163                    self.db.storage.clone(),
1164                    self.db.schema.clone(),
1165                    self.db.config.index_rebuild.clone(),
1166                )
1167                .await
1168                .map_err(UniError::Internal)?;
1169
1170                let task_ids = rebuild_manager
1171                    .schedule(labels_to_rebuild)
1172                    .await
1173                    .map_err(UniError::Internal)?;
1174
1175                self.stats.index_task_ids = task_ids;
1176                self.stats.indexes_pending = true;
1177
1178                let manager = Arc::new(rebuild_manager);
1179                let handle = manager.start_background_worker(self.db.shutdown_handle.subscribe());
1180                self.db.shutdown_handle.track_task(handle);
1181            } else {
1182                // Sync mode: rebuild indexes blocking
1183                for label in &labels_to_rebuild {
1184                    self.report_progress(
1185                        BulkPhase::RebuildingIndexes {
1186                            label: label.clone(),
1187                        },
1188                        self.stats.vertices_inserted + self.stats.edges_inserted,
1189                        Some(label.clone()),
1190                    );
1191                    let idx_mgr = IndexManager::new(
1192                        self.db.storage.base_path(),
1193                        self.db.storage.schema_manager_arc(),
1194                        self.db.storage.backend_arc(),
1195                    );
1196                    idx_mgr
1197                        .rebuild_indexes_for_label(label)
1198                        .await
1199                        .map_err(UniError::Internal)?;
1200                    self.stats.indexes_rebuilt += 1;
1201
1202                    // Update index metadata after successful sync rebuild
1203                    let now = chrono::Utc::now();
1204                    let vtable_name = uni_store::backend::table_names::vertex_table_name(label);
1205                    let row_count = self
1206                        .db
1207                        .storage
1208                        .backend()
1209                        .count_rows(&vtable_name, None)
1210                        .await
1211                        .ok()
1212                        .map(|c| c as u64);
1213
1214                    let schema = self.db.schema.schema();
1215                    for idx in &schema.indexes {
1216                        if idx.label() == label.as_str() {
1217                            let _ = self.db.schema.update_index_metadata(idx.name(), |m| {
1218                                m.status = uni_common::core::schema::IndexStatus::Online;
1219                                m.last_built_at = Some(now);
1220                                if let Some(count) = row_count {
1221                                    m.row_count_at_build = Some(count);
1222                                }
1223                            });
1224                        }
1225                    }
1226                }
1227            }
1228        }
1229
1230        self.stats.index_build_duration = index_start.elapsed();
1231
1232        // 4. Update Snapshot
1233        self.report_progress(
1234            BulkPhase::Finalizing,
1235            self.stats.vertices_inserted + self.stats.edges_inserted,
1236            None,
1237        );
1238
1239        // Load latest snapshot or create new
1240        let mut manifest = self
1241            .db
1242            .storage
1243            .snapshot_manager()
1244            .load_latest_snapshot()
1245            .await
1246            .map_err(UniError::Internal)?
1247            .unwrap_or_else(|| {
1248                SnapshotManifest::new(
1249                    Uuid::new_v4().to_string(),
1250                    self.db.schema.schema().schema_version,
1251                )
1252            });
1253
1254        // Update Manifest
1255        let parent_id = manifest.snapshot_id.clone();
1256        manifest.parent_snapshot = Some(parent_id);
1257        manifest.snapshot_id = Uuid::new_v4().to_string();
1258        manifest.created_at = Utc::now();
1259
1260        // Update counts and versions for touched labels (vertices)
1261        let backend = self.db.storage.backend();
1262        for label in &self.touched_labels {
1263            let vtable_name = uni_store::backend::table_names::vertex_table_name(label);
1264            let count = backend
1265                .count_rows(&vtable_name, None)
1266                .await
1267                .map_err(UniError::Internal)?;
1268
1269            let current_snap =
1270                manifest
1271                    .vertices
1272                    .entry(label.to_string())
1273                    .or_insert(LabelSnapshot {
1274                        version: 0,
1275                        count: 0,
1276                        lance_version: 0,
1277                    });
1278            current_snap.count = count as u64;
1279            // LanceDB tables don't expose Lance version directly
1280            current_snap.lance_version = 0;
1281        }
1282
1283        // Update counts and versions for touched edge types
1284        for edge_type in &self.touched_edge_types {
1285            let delta_name = uni_store::backend::table_names::delta_table_name(edge_type, "fwd");
1286            if let Ok(count) = backend.count_rows(&delta_name, None).await {
1287                let current_snap =
1288                    manifest
1289                        .edges
1290                        .entry(edge_type.to_string())
1291                        .or_insert(EdgeSnapshot {
1292                            version: 0,
1293                            count: 0,
1294                            lance_version: 0,
1295                        });
1296                current_snap.count = count as u64;
1297                // LanceDB tables don't expose Lance version directly
1298                current_snap.lance_version = 0;
1299            }
1300        }
1301
1302        // Save Snapshot
1303        self.db
1304            .storage
1305            .snapshot_manager()
1306            .save_snapshot(&manifest)
1307            .await
1308            .map_err(UniError::Internal)?;
1309        self.db
1310            .storage
1311            .snapshot_manager()
1312            .set_latest_snapshot(&manifest.snapshot_id)
1313            .await
1314            .map_err(UniError::Internal)?;
1315
1316        // Save schema with updated index metadata
1317        self.db.schema.save().await.map_err(UniError::Internal)?;
1318
1319        // Warm adjacency CSRs for all edge types written during bulk import
1320        // so that subsequent traversal queries can find the edges.
1321        let schema = self.db.storage.schema_manager().schema();
1322        for edge_type_name in &self.touched_edge_types {
1323            if let Some(meta) = schema.edge_types.get(edge_type_name.as_str()) {
1324                let type_id = meta.id;
1325                for &dir in uni_store::storage::direction::Direction::Both.expand() {
1326                    let _ = self.db.storage.warm_adjacency(type_id, dir, None).await;
1327                }
1328            }
1329        }
1330
1331        self.committed = true;
1332        self.release_guard();
1333        self.stats.duration = self.start_time.elapsed();
1334        Ok(self.stats.clone())
1335    }
1336
1337    /// Abort bulk loading and roll back all changes.
1338    ///
1339    /// Rolls back LanceDB tables to their pre-bulk-load versions using LanceDB's
1340    /// version API. Tables created during the bulk load are dropped. Buffered
1341    /// data that hasn't been flushed is discarded.
1342    ///
1343    /// # Errors
1344    ///
1345    /// Returns an error if rollback fails. The error message includes details
1346    /// about which tables failed to roll back.
1347    pub async fn abort(mut self) -> Result<()> {
1348        if self.committed {
1349            return Err(anyhow!("Cannot abort: bulk load already committed"));
1350        }
1351
1352        // 1. Clear pending buffers (not yet flushed to storage)
1353        self.pending_vertices.clear();
1354        self.pending_edges.clear();
1355        self.buffer_size_bytes = 0;
1356
1357        // 2. Roll back each modified table to its initial version
1358        let backend = self.db.storage.backend();
1359        let mut rollback_errors = Vec::new();
1360        let mut rolled_back_count = 0;
1361        let mut dropped_count = 0;
1362
1363        for (table_name, initial_version) in &self.initial_table_versions {
1364            match initial_version {
1365                Some(version) => {
1366                    // Table existed before - rollback to initial version
1367                    match backend.rollback_table(table_name, *version).await {
1368                        Ok(()) => {
1369                            log::info!("Rolled back table '{}' to version {}", table_name, version);
1370                            rolled_back_count += 1;
1371                        }
1372                        Err(e) => {
1373                            rollback_errors.push(format!("{}: {}", table_name, e));
1374                        }
1375                    }
1376                }
1377                None => {
1378                    // Table was created during bulk load - drop it
1379                    match backend.drop_table(table_name).await {
1380                        Ok(()) => {
1381                            log::info!("Dropped table '{}' (created during bulk load)", table_name);
1382                            dropped_count += 1;
1383                        }
1384                        Err(e) => {
1385                            rollback_errors.push(format!("{}: {}", table_name, e));
1386                        }
1387                    }
1388                }
1389            }
1390        }
1391
1392        // 3. Clear backend cache to ensure next read picks up rolled-back state
1393        self.db.storage.backend().clear_cache();
1394
1395        // Release session write guard on abort
1396        self.release_guard();
1397
1398        if rollback_errors.is_empty() {
1399            log::info!(
1400                "Bulk load aborted successfully. Rolled back {} tables, dropped {} tables.",
1401                rolled_back_count,
1402                dropped_count
1403            );
1404            Ok(())
1405        } else {
1406            Err(anyhow!(
1407                "Bulk load abort had {} rollback errors: {}",
1408                rollback_errors.len(),
1409                rollback_errors.join("; ")
1410            ))
1411        }
1412    }
1413
1414    fn report_progress(&self, phase: BulkPhase, rows: usize, label: Option<String>) {
1415        if let Some(cb) = &self.progress_callback {
1416            cb(BulkProgress {
1417                phase,
1418                rows_processed: rows,
1419                total_rows: None,
1420                current_label: label,
1421                elapsed: self.start_time.elapsed(),
1422            });
1423        }
1424    }
1425
1426    /// Release the session write guard if this bulk writer holds one.
1427    fn release_guard(&self) {
1428        if let Some(guard) = &self.session_write_guard {
1429            guard.store(false, Ordering::SeqCst);
1430        }
1431    }
1432}
1433
1434impl Drop for BulkWriter {
1435    fn drop(&mut self) {
1436        if !self.committed {
1437            // Release session write guard on drop (abort case or forgotten writer)
1438            self.release_guard();
1439        }
1440    }
1441}