1use anyhow::{Result, anyhow};
30use chrono::Utc;
31use std::cmp::Ordering;
32use std::collections::{HashMap, HashSet};
33use std::sync::Arc;
34use std::time::{Duration, Instant};
35use uni_common::UniConfig;
36use uni_common::Value;
37use uni_common::core::id::{Eid, Vid};
38use uni_common::core::schema::SchemaManager;
39use uni_common::core::snapshot::{EdgeSnapshot, LabelSnapshot, SnapshotManifest};
40use uni_common::{Properties, UniError};
41use uni_plugin_host::shutdown::ShutdownHandle;
42use uni_store::runtime::writer::Writer;
43use uni_store::storage::delta::{L1Entry, Op};
44use uni_store::storage::main_edge::MainEdgeDataset;
45use uni_store::storage::main_vertex::MainVertexDataset;
46use uni_store::storage::manager::StorageManager;
47use uni_store::storage::{IndexManager, IndexRebuildManager};
48use uuid::Uuid;
49
50#[derive(Clone)]
58pub struct BulkBackend {
59 pub storage: Arc<StorageManager>,
61 pub writer: Option<Arc<Writer>>,
63 pub schema: Arc<SchemaManager>,
65 pub shutdown: Arc<ShutdownHandle>,
67 pub config: UniConfig,
69}
70
71pub trait IntoArrow {
76 fn into_property_maps(self) -> Vec<HashMap<String, Value>>;
78}
79
80impl IntoArrow for Vec<HashMap<String, Value>> {
81 fn into_property_maps(self) -> Vec<HashMap<String, Value>> {
82 self
83 }
84}
85
86impl IntoArrow for arrow_array::RecordBatch {
87 fn into_property_maps(self) -> Vec<HashMap<String, Value>> {
88 record_batch_to_property_maps(&self)
89 }
90}
91
92pub fn record_batch_to_property_maps(
97 batch: &arrow_array::RecordBatch,
98) -> Vec<HashMap<String, Value>> {
99 let schema = batch.schema();
100 let num_rows = batch.num_rows();
101 let mut rows = Vec::with_capacity(num_rows);
102 for row_idx in 0..num_rows {
103 let mut props = HashMap::with_capacity(schema.fields().len());
104 for (col_idx, field) in schema.fields().iter().enumerate() {
105 let col = batch.column(col_idx);
106 let value =
107 uni_store::storage::arrow_convert::arrow_to_value(col.as_ref(), row_idx, None);
108 if !value.is_null() {
109 props.insert(field.name().clone(), value);
110 }
111 }
112 rows.push(props);
113 }
114 rows
115}
116
117pub struct BulkWriterBuilder {
119 backend: BulkBackend,
120 config: BulkConfig,
121 progress_callback: Option<Box<dyn Fn(BulkProgress) + Send>>,
122}
123
124impl BulkWriterBuilder {
125 pub fn new_unguarded(backend: BulkBackend) -> Self {
131 Self {
132 backend,
133 config: BulkConfig::default(),
134 progress_callback: None,
135 }
136 }
137
138 pub fn defer_vector_indexes(mut self, defer: bool) -> Self {
140 self.config.defer_vector_indexes = defer;
141 self
142 }
143
144 pub fn defer_scalar_indexes(mut self, defer: bool) -> Self {
146 self.config.defer_scalar_indexes = defer;
147 self
148 }
149
150 pub fn batch_size(mut self, size: usize) -> Self {
152 self.config.batch_size = size;
153 self
154 }
155
156 pub fn on_progress<F: Fn(BulkProgress) + Send + 'static>(mut self, f: F) -> Self {
158 self.progress_callback = Some(Box::new(f));
159 self
160 }
161
162 pub fn async_indexes(mut self, async_: bool) -> Self {
172 self.config.async_indexes = async_;
173 self
174 }
175
176 pub fn validate_constraints(mut self, validate: bool) -> Self {
184 self.config.validate_constraints = validate;
185 self
186 }
187
188 pub fn max_buffer_size_bytes(mut self, size: usize) -> Self {
196 self.config.max_buffer_size_bytes = size;
197 self
198 }
199
200 pub fn build(self) -> Result<BulkWriter> {
206 if self.backend.writer.is_none() {
207 return Err(anyhow!("BulkWriter requires a writable database instance"));
208 }
209
210 Ok(BulkWriter {
211 backend: self.backend,
212 config: self.config,
213 progress_callback: self.progress_callback,
214 stats: BulkStats::default(),
215 start_time: Instant::now(),
216 pending_vertices: HashMap::new(),
217 pending_edges: HashMap::new(),
218 touched_labels: HashSet::new(),
219 touched_edge_types: HashSet::new(),
220 initial_table_versions: HashMap::new(),
221 buffer_size_bytes: 0,
222 committed: false,
223 })
224 }
225}
226
227pub struct BulkConfig {
229 pub defer_vector_indexes: bool,
231 pub defer_scalar_indexes: bool,
233 pub batch_size: usize,
235 pub async_indexes: bool,
237 pub validate_constraints: bool,
241 pub max_buffer_size_bytes: usize,
247}
248
249impl Default for BulkConfig {
250 fn default() -> Self {
251 Self {
252 defer_vector_indexes: true,
253 defer_scalar_indexes: true,
254 batch_size: 10_000,
255 async_indexes: false,
256 validate_constraints: true,
257 max_buffer_size_bytes: 1_073_741_824, }
259 }
260}
261
262#[derive(Debug, Clone)]
263pub struct BulkProgress {
264 pub phase: BulkPhase,
265 pub rows_processed: usize,
266 pub total_rows: Option<usize>,
267 pub current_label: Option<String>,
268 pub elapsed: Duration,
269}
270
271#[derive(Debug, Clone)]
272pub enum BulkPhase {
273 Inserting,
274 RebuildingIndexes { label: String },
275 Finalizing,
276}
277
278#[derive(Debug, Clone, Default)]
279pub struct BulkStats {
280 pub vertices_inserted: usize,
281 pub edges_inserted: usize,
282 pub indexes_rebuilt: usize,
283 pub duration: Duration,
284 pub index_build_duration: Duration,
285 pub index_task_ids: Vec<String>,
287 pub indexes_pending: bool,
289}
290
291#[derive(Debug, Clone)]
295pub struct EdgeData {
296 pub src_vid: Vid,
298 pub dst_vid: Vid,
300 pub properties: Properties,
302}
303
304impl EdgeData {
305 pub fn new(src_vid: Vid, dst_vid: Vid, properties: Properties) -> Self {
307 Self {
308 src_vid,
309 dst_vid,
310 properties,
311 }
312 }
313}
314
315pub struct BulkWriter {
324 backend: BulkBackend,
325 config: BulkConfig,
326 progress_callback: Option<Box<dyn Fn(BulkProgress) + Send>>,
327 stats: BulkStats,
328 start_time: Instant,
329 pending_vertices: HashMap<String, Vec<(Vid, Properties)>>,
331 pending_edges: HashMap<String, Vec<L1Entry>>,
332 touched_labels: HashSet<String>,
334 touched_edge_types: HashSet<String>,
335 initial_table_versions: HashMap<String, Option<u64>>,
338 buffer_size_bytes: usize,
340 committed: bool,
341}
342
343impl BulkWriter {
344 pub fn stats(&self) -> &BulkStats {
347 &self.stats
348 }
349
350 pub fn touched_labels(&self) -> Vec<String> {
352 self.touched_labels.iter().cloned().collect()
353 }
354
355 pub fn touched_edge_types(&self) -> Vec<String> {
357 self.touched_edge_types.iter().cloned().collect()
358 }
359
360 fn get_current_timestamp_micros() -> i64 {
362 use std::time::{SystemTime, UNIX_EPOCH};
363 SystemTime::now()
364 .duration_since(UNIX_EPOCH)
365 .map(|d| d.as_micros() as i64)
366 .unwrap_or(0)
367 }
368
369 pub async fn insert_vertices(
382 &mut self,
383 label: &str,
384 vertices: impl IntoArrow,
385 ) -> Result<Vec<Vid>> {
386 let vertices = vertices.into_property_maps();
387 let schema = self.backend.schema.schema();
388 schema
390 .labels
391 .get(label)
392 .ok_or_else(|| UniError::LabelNotFound {
393 label: label.to_string(),
394 })?;
395 if self.config.validate_constraints {
397 self.validate_vertex_batch_constraints(label, &vertices)
398 .await?;
399 }
400
401 let vids = {
403 let writer = self.backend.writer.as_ref().unwrap();
404 writer
405 .allocate_vids(vertices.len())
406 .await
407 .map_err(UniError::Internal)?
408 };
409
410 let buffer = self.pending_vertices.entry(label.to_string()).or_default();
412 for (i, props) in vertices.into_iter().enumerate() {
413 self.buffer_size_bytes += Self::estimate_properties_size(&props);
414 buffer.push((vids[i], props));
415 }
416
417 self.touched_labels.insert(label.to_string());
418
419 if self.buffer_size_bytes >= self.config.max_buffer_size_bytes {
421 self.checkpoint().await?;
422 } else {
423 self.check_flush_vertices(label).await?;
425 }
426
427 self.stats.vertices_inserted += vids.len();
428 self.report_progress(
429 BulkPhase::Inserting,
430 self.stats.vertices_inserted,
431 Some(label.to_string()),
432 );
433
434 Ok(vids)
435 }
436
437 fn estimate_properties_size(props: &Properties) -> usize {
439 let mut size = 0;
440 for (key, value) in props {
441 size += key.len();
442 size += Self::estimate_value_size(value);
443 }
444 size
445 }
446
447 fn estimate_value_size(value: &Value) -> usize {
449 match value {
450 Value::Null => 1,
451 Value::Bool(_) => 1,
452 Value::Int(_) | Value::Float(_) => 8,
453 Value::String(s) => s.len(),
454 Value::Bytes(b) => b.len(),
455 Value::List(arr) => arr.iter().map(Self::estimate_value_size).sum::<usize>() + 8,
456 Value::Map(obj) => {
457 obj.iter()
458 .map(|(k, v)| k.len() + Self::estimate_value_size(v))
459 .sum::<usize>()
460 + 8
461 }
462 Value::Vector(v) => v.len() * 4,
463 _ => 16, }
465 }
466
467 async fn validate_vertex_batch_constraints(
472 &self,
473 label: &str,
474 vertices: &[Properties],
475 ) -> Result<()> {
476 let schema = self.backend.schema.schema();
477
478 if let Some(props_meta) = schema.properties.get(label) {
480 for (idx, props) in vertices.iter().enumerate() {
481 for (prop_name, meta) in props_meta {
483 if !meta.nullable && props.get(prop_name).is_none_or(|v| v.is_null()) {
484 return Err(anyhow!(
485 "NOT NULL constraint violation at row {}: property '{}' cannot be null for label '{}'",
486 idx,
487 prop_name,
488 label
489 ));
490 }
491 }
492 }
493 }
494
495 for constraint in &schema.constraints {
497 if !constraint.enabled {
498 continue;
499 }
500 match &constraint.target {
501 uni_common::core::schema::ConstraintTarget::Label(l) if l == label => {}
502 _ => continue,
503 }
504
505 match &constraint.constraint_type {
506 uni_common::core::schema::ConstraintType::Unique {
507 properties: unique_props,
508 } => {
509 let mut seen_keys: HashSet<String> = HashSet::new();
511 for (idx, props) in vertices.iter().enumerate() {
512 let key = self.compute_unique_key(unique_props, props);
513 if let Some(k) = key
514 && !seen_keys.insert(k.clone())
515 {
516 return Err(anyhow!(
517 "UNIQUE constraint violation at row {}: duplicate key '{}' in batch",
518 idx,
519 k
520 ));
521 }
522 }
523
524 if let Some(buffered) = self.pending_vertices.get(label) {
526 for (idx, props) in vertices.iter().enumerate() {
527 let key = self.compute_unique_key(unique_props, props);
528 if let Some(k) = key {
529 for (_, buffered_props) in buffered {
530 let buffered_key =
531 self.compute_unique_key(unique_props, buffered_props);
532 if buffered_key.as_ref() == Some(&k) {
533 return Err(anyhow!(
534 "UNIQUE constraint violation at row {}: key '{}' conflicts with buffered data",
535 idx,
536 k
537 ));
538 }
539 }
540 }
541 }
542 }
543 }
544 uni_common::core::schema::ConstraintType::Exists { property } => {
545 for (idx, props) in vertices.iter().enumerate() {
546 if props.get(property).is_none_or(|v| v.is_null()) {
547 return Err(anyhow!(
548 "EXISTS constraint violation at row {}: property '{}' must exist",
549 idx,
550 property
551 ));
552 }
553 }
554 }
555 uni_common::core::schema::ConstraintType::Check { expression } => {
556 for (idx, props) in vertices.iter().enumerate() {
557 if !self.evaluate_check_expression(expression, props)? {
558 return Err(anyhow!(
559 "CHECK constraint '{}' violated at row {}: expression '{}' evaluated to false",
560 constraint.name,
561 idx,
562 expression
563 ));
564 }
565 }
566 }
567 _ => {}
568 }
569 }
570
571 Ok(())
572 }
573
574 fn compute_unique_key(&self, unique_props: &[String], props: &Properties) -> Option<String> {
576 let mut parts = Vec::new();
577 for prop in unique_props {
578 match props.get(prop) {
579 Some(v) if !v.is_null() => parts.push(v.to_string()),
580 _ => return None, }
582 }
583 Some(parts.join(":"))
584 }
585
586 fn evaluate_check_expression(&self, expression: &str, properties: &Properties) -> Result<bool> {
588 let parts: Vec<&str> = expression.split_whitespace().collect();
589 if parts.len() != 3 {
590 return Ok(true);
592 }
593
594 let prop_part = parts[0].trim_start_matches('(');
595 let prop_name = if let Some(idx) = prop_part.find('.') {
596 &prop_part[idx + 1..]
597 } else {
598 prop_part
599 };
600
601 let op = parts[1];
602 let val_str = parts[2].trim_end_matches(')');
603
604 let prop_val = match properties.get(prop_name) {
605 Some(v) => v,
606 None => return Ok(true), };
608
609 let target_val = if (val_str.starts_with('\'') && val_str.ends_with('\''))
611 || (val_str.starts_with('"') && val_str.ends_with('"'))
612 {
613 Value::String(val_str[1..val_str.len() - 1].to_string())
614 } else if let Ok(n) = val_str.parse::<i64>() {
615 Value::Int(n)
616 } else if let Ok(n) = val_str.parse::<f64>() {
617 Value::Float(n)
618 } else if let Ok(b) = val_str.parse::<bool>() {
619 Value::Bool(b)
620 } else {
621 Value::String(val_str.to_string())
622 };
623
624 match op {
625 "=" | "==" => Ok(prop_val == &target_val),
626 "!=" | "<>" => Ok(prop_val != &target_val),
627 ">" => self.compare_values(prop_val, &target_val).map(|c| c > 0),
628 "<" => self.compare_values(prop_val, &target_val).map(|c| c < 0),
629 ">=" => self.compare_values(prop_val, &target_val).map(|c| c >= 0),
630 "<=" => self.compare_values(prop_val, &target_val).map(|c| c <= 0),
631 _ => Ok(true), }
633 }
634
635 fn compare_values(&self, a: &Value, b: &Value) -> Result<i8> {
640 let ordering = match (a, b) {
641 (Value::Int(n1), Value::Int(n2)) => n1.cmp(n2),
642 (Value::Float(f1), Value::Float(f2)) => f1.partial_cmp(f2).unwrap_or(Ordering::Equal),
643 (Value::Int(n), Value::Float(f)) => {
644 (*n as f64).partial_cmp(f).unwrap_or(Ordering::Equal)
645 }
646 (Value::Float(f), Value::Int(n)) => {
647 f.partial_cmp(&(*n as f64)).unwrap_or(Ordering::Equal)
648 }
649 (Value::String(s1), Value::String(s2)) => s1.cmp(s2),
650 _ => {
651 return Err(anyhow!(
652 "Cannot compare incompatible types: {:?} vs {:?}",
653 a,
654 b
655 ));
656 }
657 };
658 Ok(ordering as i8)
659 }
660
661 async fn checkpoint(&mut self) -> Result<()> {
666 log::debug!(
667 "Checkpoint triggered at {} bytes (limit: {})",
668 self.buffer_size_bytes,
669 self.config.max_buffer_size_bytes
670 );
671
672 let labels: Vec<String> = self.pending_vertices.keys().cloned().collect();
674 for label in labels {
675 self.flush_vertices_buffer(&label).await?;
676 }
677
678 let edge_types: Vec<String> = self.pending_edges.keys().cloned().collect();
680 for edge_type in edge_types {
681 self.flush_edges_buffer(&edge_type).await?;
682 }
683
684 self.buffer_size_bytes = 0;
686
687 Ok(())
688 }
689
690 async fn check_flush_vertices(&mut self, label: &str) -> Result<()> {
692 let should_flush = self
693 .pending_vertices
694 .get(label)
695 .is_some_and(|buf| buf.len() >= self.config.batch_size);
696
697 if should_flush {
698 self.flush_vertices_buffer(label).await?;
699 }
700 Ok(())
701 }
702
703 async fn flush_vertices_buffer(&mut self, label: &str) -> Result<()> {
708 if let Some(vertices) = self.pending_vertices.remove(label) {
709 if vertices.is_empty() {
710 return Ok(());
711 }
712
713 let table_name = uni_store::backend::table_names::vertex_table_name(label);
715 if !self.initial_table_versions.contains_key(&table_name) {
716 let backend = self.backend.storage.backend();
717 let version = backend
718 .get_table_version(&table_name)
719 .await
720 .map_err(UniError::Internal)?;
721 self.initial_table_versions.insert(table_name, version);
722 }
723
724 let main_table_name =
726 uni_store::backend::table_names::main_vertex_table_name().to_string();
727 if !self.initial_table_versions.contains_key(&main_table_name) {
728 let backend = self.backend.storage.backend();
729 let version = backend
730 .get_table_version(&main_table_name)
731 .await
732 .map_err(UniError::Internal)?;
733 self.initial_table_versions
734 .insert(main_table_name.clone(), version);
735 }
736
737 let ds = self
738 .backend
739 .storage
740 .vertex_dataset(label)
741 .map_err(UniError::Internal)?;
742 let schema = self.backend.schema.schema();
743
744 let deleted = vec![false; vertices.len()];
745 let versions = vec![1; vertices.len()]; let now = Self::get_current_timestamp_micros();
749 let mut created_at: HashMap<Vid, i64> = HashMap::new();
750 let mut updated_at: HashMap<Vid, i64> = HashMap::new();
751 for (vid, _) in &vertices {
752 created_at.insert(*vid, now);
753 updated_at.insert(*vid, now);
754 }
755
756 let labels = vec![label.to_string()];
759 let vertices_with_labels: Vec<(Vid, Vec<String>, Properties)> = vertices
760 .iter()
761 .map(|(vid, props)| (*vid, labels.clone(), props.clone()))
762 .collect();
763
764 let batch = ds
765 .build_record_batch_with_timestamps(
766 &vertices_with_labels,
767 &deleted,
768 &versions,
769 &schema,
770 Some(&created_at),
771 Some(&updated_at),
772 )
773 .map_err(UniError::Internal)?;
774
775 let backend = self.backend.storage.backend();
777 ds.write_batch(backend, batch, &schema)
778 .await
779 .map_err(UniError::Internal)?;
780
781 ds.ensure_default_indexes(backend)
783 .await
784 .map_err(UniError::Internal)?;
785
786 let main_vertices: Vec<(Vid, Vec<String>, Properties, bool, u64)> =
788 vertices_with_labels
789 .into_iter()
790 .map(|(vid, lbls, props)| (vid, lbls, props, false, 1u64))
791 .collect();
792
793 if !main_vertices.is_empty() {
794 let main_batch = MainVertexDataset::build_record_batch(
795 &main_vertices,
796 Some(&created_at),
797 Some(&updated_at),
798 )
799 .map_err(UniError::Internal)?;
800
801 MainVertexDataset::write_batch(backend, main_batch)
802 .await
803 .map_err(UniError::Internal)?;
804
805 MainVertexDataset::ensure_default_indexes(backend)
806 .await
807 .map_err(UniError::Internal)?;
808 }
809 }
810 Ok(())
811 }
812
813 pub async fn insert_edges(
824 &mut self,
825 edge_type: &str,
826 edges: Vec<EdgeData>,
827 ) -> Result<Vec<Eid>> {
828 let schema = self.backend.schema.schema();
829 schema
832 .edge_types
833 .get(edge_type)
834 .ok_or_else(|| UniError::EdgeTypeNotFound {
835 edge_type: edge_type.to_string(),
836 })?;
837
838 let eids = {
840 let writer = self.backend.writer.as_ref().unwrap();
841 writer
842 .allocate_eids(edges.len())
843 .await
844 .map_err(UniError::Internal)?
845 };
846
847 let now = Self::get_current_timestamp_micros();
849 let mut added_size = 0usize;
850 let entries: Vec<L1Entry> = edges
851 .into_iter()
852 .enumerate()
853 .map(|(i, edge)| {
854 added_size += 32 + Self::estimate_properties_size(&edge.properties);
856 L1Entry {
857 src_vid: edge.src_vid,
858 dst_vid: edge.dst_vid,
859 eid: eids[i],
860 op: Op::Insert,
861 version: 1,
862 properties: edge.properties,
863 created_at: Some(now),
864 updated_at: Some(now),
865 }
866 })
867 .collect();
868 self.buffer_size_bytes += added_size;
869 self.pending_edges
870 .entry(edge_type.to_string())
871 .or_default()
872 .extend(entries);
873
874 self.touched_edge_types.insert(edge_type.to_string());
875
876 if self.buffer_size_bytes >= self.config.max_buffer_size_bytes {
878 self.checkpoint().await?;
879 } else {
880 self.check_flush_edges(edge_type).await?;
881 }
882
883 self.stats.edges_inserted += eids.len();
884 self.report_progress(
885 BulkPhase::Inserting,
886 self.stats.vertices_inserted + self.stats.edges_inserted,
887 Some(edge_type.to_string()),
888 );
889
890 Ok(eids)
891 }
892
893 async fn check_flush_edges(&mut self, edge_type: &str) -> Result<()> {
895 let should_flush = self
896 .pending_edges
897 .get(edge_type)
898 .is_some_and(|buf| buf.len() >= self.config.batch_size);
899
900 if should_flush {
901 self.flush_edges_buffer(edge_type).await?;
902 }
903 Ok(())
904 }
905
906 #[expect(
911 clippy::map_entry,
912 reason = "async code between contains_key and insert"
913 )]
914 async fn flush_edges_buffer(&mut self, edge_type: &str) -> Result<()> {
915 if let Some(entries) = self.pending_edges.remove(edge_type) {
916 if entries.is_empty() {
917 return Ok(());
918 }
919
920 let schema = self.backend.schema.schema();
921 let backend = self.backend.storage.backend();
922
923 let fwd_table_name =
925 uni_store::backend::table_names::delta_table_name(edge_type, "fwd");
926 if !self.initial_table_versions.contains_key(&fwd_table_name) {
927 let version = backend
928 .get_table_version(&fwd_table_name)
929 .await
930 .map_err(UniError::Internal)?;
931 self.initial_table_versions.insert(fwd_table_name, version);
932 }
933 let bwd_table_name =
934 uni_store::backend::table_names::delta_table_name(edge_type, "bwd");
935 if !self.initial_table_versions.contains_key(&bwd_table_name) {
936 let version = backend
937 .get_table_version(&bwd_table_name)
938 .await
939 .map_err(UniError::Internal)?;
940 self.initial_table_versions.insert(bwd_table_name, version);
941 }
942
943 let main_edge_table_name =
945 uni_store::backend::table_names::main_edge_table_name().to_string();
946 if !self
947 .initial_table_versions
948 .contains_key(&main_edge_table_name)
949 {
950 let version = backend
951 .get_table_version(&main_edge_table_name)
952 .await
953 .map_err(UniError::Internal)?;
954 self.initial_table_versions
955 .insert(main_edge_table_name.clone(), version);
956 }
957
958 let mut fwd_entries = entries.clone();
960 fwd_entries.sort_by_key(|e| e.src_vid);
961 let fwd_ds = self
962 .backend
963 .storage
964 .delta_dataset(edge_type, "fwd")
965 .map_err(UniError::Internal)?;
966 let fwd_batch = fwd_ds
967 .build_record_batch(&fwd_entries, &schema)
968 .map_err(UniError::Internal)?;
969 let backend = self.backend.storage.backend();
970 fwd_ds
971 .write_run(backend, fwd_batch)
972 .await
973 .map_err(UniError::Internal)?;
974 fwd_ds
975 .ensure_eid_index(backend)
976 .await
977 .map_err(UniError::Internal)?;
978
979 let mut bwd_entries = entries.clone();
981 bwd_entries.sort_by_key(|e| e.dst_vid);
982 let bwd_ds = self
983 .backend
984 .storage
985 .delta_dataset(edge_type, "bwd")
986 .map_err(UniError::Internal)?;
987 let bwd_batch = bwd_ds
988 .build_record_batch(&bwd_entries, &schema)
989 .map_err(UniError::Internal)?;
990 bwd_ds
991 .write_run(backend, bwd_batch)
992 .await
993 .map_err(UniError::Internal)?;
994 bwd_ds
995 .ensure_eid_index(backend)
996 .await
997 .map_err(UniError::Internal)?;
998
999 let mut edge_created_at: HashMap<Eid, i64> = HashMap::new();
1001 let mut edge_updated_at: HashMap<Eid, i64> = HashMap::new();
1002 let main_edges: Vec<(Eid, Vid, Vid, String, Properties, bool, u64)> = entries
1003 .iter()
1004 .map(|e| {
1005 let deleted = matches!(e.op, Op::Delete);
1006 if let Some(ts) = e.created_at {
1007 edge_created_at.insert(e.eid, ts);
1008 }
1009 if let Some(ts) = e.updated_at {
1010 edge_updated_at.insert(e.eid, ts);
1011 }
1012 (
1013 e.eid,
1014 e.src_vid,
1015 e.dst_vid,
1016 edge_type.to_string(),
1017 e.properties.clone(),
1018 deleted,
1019 e.version,
1020 )
1021 })
1022 .collect();
1023
1024 if !main_edges.is_empty() {
1025 let main_batch = MainEdgeDataset::build_record_batch(
1026 &main_edges,
1027 Some(&edge_created_at),
1028 Some(&edge_updated_at),
1029 )
1030 .map_err(UniError::Internal)?;
1031
1032 MainEdgeDataset::write_batch(self.backend.storage.backend(), main_batch)
1033 .await
1034 .map_err(UniError::Internal)?;
1035
1036 MainEdgeDataset::ensure_default_indexes(self.backend.storage.backend())
1037 .await
1038 .map_err(UniError::Internal)?;
1039 }
1040 }
1041 Ok(())
1042 }
1043
1044 pub async fn commit(mut self) -> Result<BulkStats> {
1053 let labels: Vec<String> = self.pending_vertices.keys().cloned().collect();
1055 for label in labels {
1056 self.flush_vertices_buffer(&label).await?;
1057 }
1058
1059 let edge_types: Vec<String> = self.pending_edges.keys().cloned().collect();
1061 for edge_type in edge_types {
1062 self.flush_edges_buffer(&edge_type).await?;
1063 }
1064
1065 let index_start = Instant::now();
1066
1067 if self.config.defer_vector_indexes || self.config.defer_scalar_indexes {
1069 let labels_to_rebuild: Vec<String> = self.touched_labels.iter().cloned().collect();
1070
1071 if self.config.async_indexes && !labels_to_rebuild.is_empty() {
1072 let schema = self.backend.schema.schema();
1074 for label in &labels_to_rebuild {
1075 for idx in &schema.indexes {
1076 if idx.label() == label.as_str() {
1077 let _ = self.backend.schema.update_index_metadata(idx.name(), |m| {
1078 m.status = uni_common::core::schema::IndexStatus::Stale;
1079 });
1080 }
1081 }
1082 }
1083
1084 let rebuild_manager = IndexRebuildManager::new(
1085 self.backend.storage.clone(),
1086 self.backend.schema.clone(),
1087 self.backend.config.index_rebuild.clone(),
1088 )
1089 .await
1090 .map_err(UniError::Internal)?;
1091
1092 let task_ids = rebuild_manager
1093 .schedule(labels_to_rebuild)
1094 .await
1095 .map_err(UniError::Internal)?;
1096
1097 self.stats.index_task_ids = task_ids;
1098 self.stats.indexes_pending = true;
1099
1100 let manager = Arc::new(rebuild_manager);
1101 let handle = manager.start_background_worker(self.backend.shutdown.subscribe());
1102 self.backend.shutdown.track_task(handle);
1103 } else {
1104 for label in &labels_to_rebuild {
1106 self.report_progress(
1107 BulkPhase::RebuildingIndexes {
1108 label: label.clone(),
1109 },
1110 self.stats.vertices_inserted + self.stats.edges_inserted,
1111 Some(label.clone()),
1112 );
1113 let idx_mgr = IndexManager::new(
1114 self.backend.storage.base_path(),
1115 self.backend.storage.schema_manager_arc(),
1116 );
1117 idx_mgr
1118 .rebuild_indexes_for_label(label)
1119 .await
1120 .map_err(UniError::Internal)?;
1121 self.stats.indexes_rebuilt += 1;
1122
1123 let now = Utc::now();
1125 let vtable_name = uni_store::backend::table_names::vertex_table_name(label);
1126 let row_count = self
1127 .backend
1128 .storage
1129 .backend()
1130 .count_rows(&vtable_name, None)
1131 .await
1132 .ok()
1133 .map(|c| c as u64);
1134
1135 let schema = self.backend.schema.schema();
1136 for idx in &schema.indexes {
1137 if idx.label() == label.as_str() {
1138 let _ = self.backend.schema.update_index_metadata(idx.name(), |m| {
1139 m.status = uni_common::core::schema::IndexStatus::Online;
1140 m.last_built_at = Some(now);
1141 if let Some(count) = row_count {
1142 m.row_count_at_build = Some(count);
1143 }
1144 });
1145 }
1146 }
1147 }
1148 }
1149 }
1150
1151 self.stats.index_build_duration = index_start.elapsed();
1152
1153 self.report_progress(
1155 BulkPhase::Finalizing,
1156 self.stats.vertices_inserted + self.stats.edges_inserted,
1157 None,
1158 );
1159
1160 let mut manifest = self
1162 .backend
1163 .storage
1164 .snapshot_manager()
1165 .load_latest_snapshot()
1166 .await
1167 .map_err(UniError::Internal)?
1168 .unwrap_or_else(|| {
1169 SnapshotManifest::new(
1170 Uuid::new_v4().to_string(),
1171 self.backend.schema.schema().schema_version,
1172 )
1173 });
1174
1175 let parent_id = manifest.snapshot_id.clone();
1177 manifest.parent_snapshot = Some(parent_id);
1178 manifest.snapshot_id = Uuid::new_v4().to_string();
1179 manifest.created_at = Utc::now();
1180
1181 let backend = self.backend.storage.backend();
1183 for label in &self.touched_labels {
1184 let vtable_name = uni_store::backend::table_names::vertex_table_name(label);
1185 let count = backend
1186 .count_rows(&vtable_name, None)
1187 .await
1188 .map_err(UniError::Internal)?;
1189
1190 let current_snap =
1191 manifest
1192 .vertices
1193 .entry(label.to_string())
1194 .or_insert(LabelSnapshot {
1195 version: 0,
1196 count: 0,
1197 lance_version: 0,
1198 });
1199 current_snap.count = count as u64;
1200 current_snap.lance_version = 0;
1202 }
1203
1204 for edge_type in &self.touched_edge_types {
1206 let delta_name = uni_store::backend::table_names::delta_table_name(edge_type, "fwd");
1207 if let Ok(count) = backend.count_rows(&delta_name, None).await {
1208 let current_snap =
1209 manifest
1210 .edges
1211 .entry(edge_type.to_string())
1212 .or_insert(EdgeSnapshot {
1213 version: 0,
1214 count: 0,
1215 lance_version: 0,
1216 });
1217 current_snap.count = count as u64;
1218 current_snap.lance_version = 0;
1220 }
1221 }
1222
1223 self.backend
1225 .storage
1226 .snapshot_manager()
1227 .save_snapshot(&manifest)
1228 .await
1229 .map_err(UniError::Internal)?;
1230 self.backend
1231 .storage
1232 .snapshot_manager()
1233 .set_latest_snapshot(&manifest.snapshot_id)
1234 .await
1235 .map_err(UniError::Internal)?;
1236
1237 self.backend
1239 .schema
1240 .save()
1241 .await
1242 .map_err(UniError::Internal)?;
1243
1244 let schema = self.backend.storage.schema_manager().schema();
1247 for edge_type_name in &self.touched_edge_types {
1248 if let Some(meta) = schema.edge_types.get(edge_type_name.as_str()) {
1249 let type_id = meta.id;
1250 for &dir in uni_store::storage::direction::Direction::Both.expand() {
1251 let _ = self
1252 .backend
1253 .storage
1254 .warm_adjacency(type_id, dir, None)
1255 .await;
1256 }
1257 }
1258 }
1259
1260 self.committed = true;
1261 self.stats.duration = self.start_time.elapsed();
1262 Ok(self.stats.clone())
1263 }
1264
1265 pub async fn abort(mut self) -> Result<()> {
1276 if self.committed {
1277 return Err(anyhow!("Cannot abort: bulk load already committed"));
1278 }
1279
1280 self.pending_vertices.clear();
1282 self.pending_edges.clear();
1283 self.buffer_size_bytes = 0;
1284
1285 let backend = self.backend.storage.backend();
1287 let mut rollback_errors = Vec::new();
1288 let mut rolled_back_count = 0;
1289 let mut dropped_count = 0;
1290
1291 for (table_name, initial_version) in &self.initial_table_versions {
1292 match initial_version {
1293 Some(version) => {
1294 match backend.rollback_table(table_name, *version).await {
1296 Ok(()) => {
1297 log::info!("Rolled back table '{}' to version {}", table_name, version);
1298 rolled_back_count += 1;
1299 }
1300 Err(e) => {
1301 rollback_errors.push(format!("{}: {}", table_name, e));
1302 }
1303 }
1304 }
1305 None => {
1306 match backend.drop_table(table_name).await {
1308 Ok(()) => {
1309 log::info!("Dropped table '{}' (created during bulk load)", table_name);
1310 dropped_count += 1;
1311 }
1312 Err(e) => {
1313 rollback_errors.push(format!("{}: {}", table_name, e));
1314 }
1315 }
1316 }
1317 }
1318 }
1319
1320 self.backend.storage.backend().clear_cache();
1322
1323 if rollback_errors.is_empty() {
1324 log::info!(
1325 "Bulk load aborted successfully. Rolled back {} tables, dropped {} tables.",
1326 rolled_back_count,
1327 dropped_count
1328 );
1329 Ok(())
1330 } else {
1331 Err(anyhow!(
1332 "Bulk load abort had {} rollback errors: {}",
1333 rollback_errors.len(),
1334 rollback_errors.join("; ")
1335 ))
1336 }
1337 }
1338
1339 fn report_progress(&self, phase: BulkPhase, rows: usize, label: Option<String>) {
1340 if let Some(cb) = &self.progress_callback {
1341 cb(BulkProgress {
1342 phase,
1343 rows_processed: rows,
1344 total_rows: None,
1345 current_label: label,
1346 elapsed: self.start_time.elapsed(),
1347 });
1348 }
1349 }
1350}