1use anyhow::{Result, anyhow};
30use chrono::Utc;
31use std::cmp::Ordering;
32use std::collections::{HashMap, HashSet};
33use std::sync::Arc;
34use std::time::{Duration, Instant};
35use uni_common::UniConfig;
36use uni_common::Value;
37use uni_common::core::id::{Eid, Vid};
38use uni_common::core::schema::SchemaManager;
39use uni_common::core::snapshot::{EdgeSnapshot, LabelSnapshot, SnapshotManifest};
40use uni_common::{Properties, UniError};
41use uni_plugin_host::shutdown::ShutdownHandle;
42use uni_store::runtime::writer::Writer;
43use uni_store::storage::delta::{L1Entry, Op};
44use uni_store::storage::main_edge::MainEdgeDataset;
45use uni_store::storage::main_vertex::MainVertexDataset;
46use uni_store::storage::manager::StorageManager;
47use uni_store::storage::{IndexManager, IndexRebuildManager};
48use uuid::Uuid;
49
50use crate::flush_intent;
51
52#[doc(hidden)]
58pub static FAIL_AFTER_PERLABEL_WRITE: std::sync::atomic::AtomicBool =
59 std::sync::atomic::AtomicBool::new(false);
60
61#[derive(Clone)]
69pub struct BulkBackend {
70 pub storage: Arc<StorageManager>,
72 pub writer: Option<Arc<Writer>>,
74 pub schema: Arc<SchemaManager>,
76 pub shutdown: Arc<ShutdownHandle>,
78 pub config: UniConfig,
80}
81
82pub trait IntoArrow {
87 fn into_property_maps(self) -> Vec<HashMap<String, Value>>;
89}
90
91impl IntoArrow for Vec<HashMap<String, Value>> {
92 fn into_property_maps(self) -> Vec<HashMap<String, Value>> {
93 self
94 }
95}
96
97impl IntoArrow for arrow_array::RecordBatch {
98 fn into_property_maps(self) -> Vec<HashMap<String, Value>> {
99 record_batch_to_property_maps(&self)
100 }
101}
102
103pub fn record_batch_to_property_maps(
108 batch: &arrow_array::RecordBatch,
109) -> Vec<HashMap<String, Value>> {
110 let schema = batch.schema();
111 let num_rows = batch.num_rows();
112 let mut rows = Vec::with_capacity(num_rows);
113 for row_idx in 0..num_rows {
114 let mut props = HashMap::with_capacity(schema.fields().len());
115 for (col_idx, field) in schema.fields().iter().enumerate() {
116 let col = batch.column(col_idx);
117 let value =
118 uni_store::storage::arrow_convert::arrow_to_value(col.as_ref(), row_idx, None);
119 if !value.is_null() {
120 props.insert(field.name().clone(), value);
121 }
122 }
123 rows.push(props);
124 }
125 rows
126}
127
128pub struct BulkWriterBuilder {
130 backend: BulkBackend,
131 config: BulkConfig,
132 progress_callback: Option<Box<dyn Fn(BulkProgress) + Send>>,
133}
134
135impl BulkWriterBuilder {
136 pub fn new_unguarded(backend: BulkBackend) -> Self {
142 Self {
143 backend,
144 config: BulkConfig::default(),
145 progress_callback: None,
146 }
147 }
148
149 pub fn defer_vector_indexes(mut self, defer: bool) -> Self {
151 self.config.defer_vector_indexes = defer;
152 self
153 }
154
155 pub fn defer_scalar_indexes(mut self, defer: bool) -> Self {
157 self.config.defer_scalar_indexes = defer;
158 self
159 }
160
161 pub fn batch_size(mut self, size: usize) -> Self {
163 self.config.batch_size = size;
164 self
165 }
166
167 pub fn on_progress<F: Fn(BulkProgress) + Send + 'static>(mut self, f: F) -> Self {
169 self.progress_callback = Some(Box::new(f));
170 self
171 }
172
173 pub fn async_indexes(mut self, async_: bool) -> Self {
183 self.config.async_indexes = async_;
184 self
185 }
186
187 pub fn validate_constraints(mut self, validate: bool) -> Self {
195 self.config.validate_constraints = validate;
196 self
197 }
198
199 pub fn max_buffer_size_bytes(mut self, size: usize) -> Self {
207 self.config.max_buffer_size_bytes = size;
208 self
209 }
210
211 pub fn build(self) -> Result<BulkWriter> {
217 if self.backend.writer.is_none() {
218 return Err(anyhow!("BulkWriter requires a writable database instance"));
219 }
220
221 Ok(BulkWriter {
222 backend: self.backend,
223 config: self.config,
224 progress_callback: self.progress_callback,
225 stats: BulkStats::default(),
226 start_time: Instant::now(),
227 pending_vertices: HashMap::new(),
228 pending_edges: HashMap::new(),
229 touched_labels: HashSet::new(),
230 touched_edge_types: HashSet::new(),
231 initial_table_versions: HashMap::new(),
232 seen_unique_keys: HashMap::new(),
233 buffer_size_bytes: 0,
234 committed: false,
235 })
236 }
237}
238
239pub struct BulkConfig {
241 pub defer_vector_indexes: bool,
243 pub defer_scalar_indexes: bool,
245 pub batch_size: usize,
247 pub async_indexes: bool,
249 pub validate_constraints: bool,
253 pub max_buffer_size_bytes: usize,
259}
260
261impl Default for BulkConfig {
262 fn default() -> Self {
263 Self {
264 defer_vector_indexes: true,
265 defer_scalar_indexes: true,
266 batch_size: 10_000,
267 async_indexes: false,
268 validate_constraints: true,
269 max_buffer_size_bytes: 1_073_741_824, }
271 }
272}
273
274#[derive(Debug, Clone)]
275pub struct BulkProgress {
276 pub phase: BulkPhase,
277 pub rows_processed: usize,
278 pub total_rows: Option<usize>,
279 pub current_label: Option<String>,
280 pub elapsed: Duration,
281}
282
283#[derive(Debug, Clone)]
284pub enum BulkPhase {
285 Inserting,
286 RebuildingIndexes { label: String },
287 Finalizing,
288}
289
290#[derive(Debug, Clone, Default)]
291pub struct BulkStats {
292 pub vertices_inserted: usize,
293 pub edges_inserted: usize,
294 pub indexes_rebuilt: usize,
295 pub duration: Duration,
296 pub index_build_duration: Duration,
297 pub index_task_ids: Vec<String>,
299 pub indexes_pending: bool,
301}
302
303#[derive(Debug, Clone)]
307pub struct EdgeData {
308 pub src_vid: Vid,
310 pub dst_vid: Vid,
312 pub properties: Properties,
314}
315
316impl EdgeData {
317 pub fn new(src_vid: Vid, dst_vid: Vid, properties: Properties) -> Self {
319 Self {
320 src_vid,
321 dst_vid,
322 properties,
323 }
324 }
325}
326
327pub struct BulkWriter {
336 backend: BulkBackend,
337 config: BulkConfig,
338 progress_callback: Option<Box<dyn Fn(BulkProgress) + Send>>,
339 stats: BulkStats,
340 start_time: Instant,
341 pending_vertices: HashMap<String, Vec<(Vid, Properties)>>,
343 pending_edges: HashMap<String, Vec<L1Entry>>,
344 touched_labels: HashSet<String>,
346 touched_edge_types: HashSet<String>,
347 initial_table_versions: HashMap<String, Option<u64>>,
350 seen_unique_keys: HashMap<String, HashSet<String>>,
355 buffer_size_bytes: usize,
357 committed: bool,
358}
359
360fn unique_set_key(label: &str, unique_props: &[String]) -> String {
364 let mut s = String::from(label);
365 for p in unique_props {
366 s.push('\u{1f}'); s.push_str(p);
368 }
369 s
370}
371
372impl BulkWriter {
373 pub fn stats(&self) -> &BulkStats {
376 &self.stats
377 }
378
379 pub fn touched_labels(&self) -> Vec<String> {
381 self.touched_labels.iter().cloned().collect()
382 }
383
384 pub fn touched_edge_types(&self) -> Vec<String> {
386 self.touched_edge_types.iter().cloned().collect()
387 }
388
389 fn get_current_timestamp_micros() -> i64 {
391 use std::time::{SystemTime, UNIX_EPOCH};
392 SystemTime::now()
393 .duration_since(UNIX_EPOCH)
394 .map(|d| d.as_micros() as i64)
395 .unwrap_or(0)
396 }
397
398 pub async fn insert_vertices(
411 &mut self,
412 label: &str,
413 vertices: impl IntoArrow,
414 ) -> Result<Vec<Vid>> {
415 let vertices = vertices.into_property_maps();
416 let schema = self.backend.schema.schema();
417 schema
419 .labels
420 .get(label)
421 .ok_or_else(|| UniError::LabelNotFound {
422 label: label.to_string(),
423 })?;
424 if self.config.validate_constraints {
426 self.validate_vertex_batch_constraints(label, &vertices)
427 .await?;
428 }
429
430 let vids = {
432 let writer = self.backend.writer.as_ref().unwrap();
433 writer
434 .allocate_vids(vertices.len())
435 .await
436 .map_err(UniError::Internal)?
437 };
438
439 let buffer = self.pending_vertices.entry(label.to_string()).or_default();
441 for (i, props) in vertices.into_iter().enumerate() {
442 self.buffer_size_bytes += Self::estimate_properties_size(&props);
443 buffer.push((vids[i], props));
444 }
445
446 self.touched_labels.insert(label.to_string());
447
448 if self.buffer_size_bytes >= self.config.max_buffer_size_bytes {
450 self.checkpoint().await?;
451 } else {
452 self.check_flush_vertices(label).await?;
454 }
455
456 self.stats.vertices_inserted += vids.len();
457 self.report_progress(
458 BulkPhase::Inserting,
459 self.stats.vertices_inserted,
460 Some(label.to_string()),
461 );
462
463 Ok(vids)
464 }
465
466 fn estimate_properties_size(props: &Properties) -> usize {
468 let mut size = 0;
469 for (key, value) in props {
470 size += key.len();
471 size += Self::estimate_value_size(value);
472 }
473 size
474 }
475
476 fn estimate_value_size(value: &Value) -> usize {
478 match value {
479 Value::Null => 1,
480 Value::Bool(_) => 1,
481 Value::Int(_) | Value::Float(_) => 8,
482 Value::String(s) => s.len(),
483 Value::Bytes(b) => b.len(),
484 Value::List(arr) => arr.iter().map(Self::estimate_value_size).sum::<usize>() + 8,
485 Value::Map(obj) => {
486 obj.iter()
487 .map(|(k, v)| k.len() + Self::estimate_value_size(v))
488 .sum::<usize>()
489 + 8
490 }
491 Value::Vector(v) => v.len() * 4,
492 _ => 16, }
494 }
495
496 async fn validate_vertex_batch_constraints(
501 &mut self,
502 label: &str,
503 vertices: &[Properties],
504 ) -> Result<()> {
505 let schema = self.backend.schema.schema();
506
507 let mut pending_unique_records: Vec<(String, Vec<String>)> = Vec::new();
510
511 if let Some(props_meta) = schema.properties.get(label) {
513 for (idx, props) in vertices.iter().enumerate() {
514 for (prop_name, meta) in props_meta {
516 if !meta.nullable && props.get(prop_name).is_none_or(|v| v.is_null()) {
517 return Err(anyhow!(
518 "NOT NULL constraint violation at row {}: property '{}' cannot be null for label '{}'",
519 idx,
520 prop_name,
521 label
522 ));
523 }
524 }
525 }
526 }
527
528 for constraint in &schema.constraints {
530 if !constraint.enabled {
531 continue;
532 }
533 match &constraint.target {
534 uni_common::core::schema::ConstraintTarget::Label(l) if l == label => {}
535 _ => continue,
536 }
537
538 match &constraint.constraint_type {
539 uni_common::core::schema::ConstraintType::Unique {
540 properties: unique_props,
541 } => {
542 let mut seen_keys: HashSet<String> = HashSet::new();
544 for (idx, props) in vertices.iter().enumerate() {
545 let key = self.compute_unique_key(unique_props, props);
546 if let Some(k) = key
547 && !seen_keys.insert(k.clone())
548 {
549 return Err(anyhow!(
550 "UNIQUE constraint violation at row {}: duplicate key '{}' in batch",
551 idx,
552 k
553 ));
554 }
555 }
556
557 let set_key = unique_set_key(label, unique_props);
562 let mut batch_keys = Vec::with_capacity(vertices.len());
563 for (idx, props) in vertices.iter().enumerate() {
564 if let Some(k) = self.compute_unique_key(unique_props, props) {
565 if self
566 .seen_unique_keys
567 .get(&set_key)
568 .is_some_and(|seen| seen.contains(&k))
569 {
570 return Err(anyhow!(
571 "UNIQUE constraint violation at row {}: key '{}' conflicts with an already-loaded vertex",
572 idx,
573 k
574 ));
575 }
576 batch_keys.push(k);
577 }
578 }
579 pending_unique_records.push((set_key, batch_keys));
580 }
581 uni_common::core::schema::ConstraintType::Exists { property } => {
582 for (idx, props) in vertices.iter().enumerate() {
583 if props.get(property).is_none_or(|v| v.is_null()) {
584 return Err(anyhow!(
585 "EXISTS constraint violation at row {}: property '{}' must exist",
586 idx,
587 property
588 ));
589 }
590 }
591 }
592 uni_common::core::schema::ConstraintType::Check { expression } => {
593 for (idx, props) in vertices.iter().enumerate() {
594 if !self.evaluate_check_expression(expression, props)? {
595 return Err(anyhow!(
596 "CHECK constraint '{}' violated at row {}: expression '{}' evaluated to false",
597 constraint.name,
598 idx,
599 expression
600 ));
601 }
602 }
603 }
604 _ => {}
605 }
606 }
607
608 for (set_key, keys) in pending_unique_records {
611 self.seen_unique_keys
612 .entry(set_key)
613 .or_default()
614 .extend(keys);
615 }
616
617 Ok(())
618 }
619
620 fn compute_unique_key(&self, unique_props: &[String], props: &Properties) -> Option<String> {
622 let mut parts = Vec::new();
623 for prop in unique_props {
624 match props.get(prop) {
625 Some(v) if !v.is_null() => parts.push(v.to_string()),
626 _ => return None, }
628 }
629 Some(parts.join(":"))
630 }
631
632 fn evaluate_check_expression(&self, expression: &str, properties: &Properties) -> Result<bool> {
634 let parts: Vec<&str> = expression.split_whitespace().collect();
635 if parts.len() != 3 {
636 return Ok(true);
638 }
639
640 let prop_part = parts[0].trim_start_matches('(');
641 let prop_name = if let Some(idx) = prop_part.find('.') {
642 &prop_part[idx + 1..]
643 } else {
644 prop_part
645 };
646
647 let op = parts[1];
648 let val_str = parts[2].trim_end_matches(')');
649
650 let prop_val = match properties.get(prop_name) {
651 Some(v) => v,
652 None => return Ok(true), };
654
655 let target_val = if (val_str.starts_with('\'') && val_str.ends_with('\''))
657 || (val_str.starts_with('"') && val_str.ends_with('"'))
658 {
659 Value::String(val_str[1..val_str.len() - 1].to_string())
660 } else if let Ok(n) = val_str.parse::<i64>() {
661 Value::Int(n)
662 } else if let Ok(n) = val_str.parse::<f64>() {
663 Value::Float(n)
664 } else if let Ok(b) = val_str.parse::<bool>() {
665 Value::Bool(b)
666 } else {
667 Value::String(val_str.to_string())
668 };
669
670 match op {
671 "=" | "==" => Ok(prop_val == &target_val),
672 "!=" | "<>" => Ok(prop_val != &target_val),
673 ">" => self.compare_values(prop_val, &target_val).map(|c| c > 0),
674 "<" => self.compare_values(prop_val, &target_val).map(|c| c < 0),
675 ">=" => self.compare_values(prop_val, &target_val).map(|c| c >= 0),
676 "<=" => self.compare_values(prop_val, &target_val).map(|c| c <= 0),
677 _ => Ok(true), }
679 }
680
681 fn compare_values(&self, a: &Value, b: &Value) -> Result<i8> {
686 let ordering = match (a, b) {
687 (Value::Int(n1), Value::Int(n2)) => n1.cmp(n2),
688 (Value::Float(f1), Value::Float(f2)) => f1.partial_cmp(f2).unwrap_or(Ordering::Equal),
689 (Value::Int(n), Value::Float(f)) => {
690 (*n as f64).partial_cmp(f).unwrap_or(Ordering::Equal)
691 }
692 (Value::Float(f), Value::Int(n)) => {
693 f.partial_cmp(&(*n as f64)).unwrap_or(Ordering::Equal)
694 }
695 (Value::String(s1), Value::String(s2)) => s1.cmp(s2),
696 _ => {
697 return Err(anyhow!(
698 "Cannot compare incompatible types: {:?} vs {:?}",
699 a,
700 b
701 ));
702 }
703 };
704 Ok(ordering as i8)
705 }
706
707 async fn checkpoint(&mut self) -> Result<()> {
712 log::debug!(
713 "Checkpoint triggered at {} bytes (limit: {})",
714 self.buffer_size_bytes,
715 self.config.max_buffer_size_bytes
716 );
717
718 let labels: Vec<String> = self.pending_vertices.keys().cloned().collect();
720 for label in labels {
721 self.flush_vertices_buffer(&label).await?;
722 }
723
724 let edge_types: Vec<String> = self.pending_edges.keys().cloned().collect();
726 for edge_type in edge_types {
727 self.flush_edges_buffer(&edge_type).await?;
728 }
729
730 self.buffer_size_bytes = 0;
732
733 Ok(())
734 }
735
736 async fn check_flush_vertices(&mut self, label: &str) -> Result<()> {
738 let should_flush = self
739 .pending_vertices
740 .get(label)
741 .is_some_and(|buf| buf.len() >= self.config.batch_size);
742
743 if should_flush {
744 self.flush_vertices_buffer(label).await?;
745 }
746 Ok(())
747 }
748
749 async fn flush_vertices_buffer(&mut self, label: &str) -> Result<()> {
754 if let Some(vertices) = self.pending_vertices.remove(label) {
755 if vertices.is_empty() {
756 return Ok(());
757 }
758
759 let table_name = uni_store::backend::table_names::vertex_table_name(label);
761 if !self.initial_table_versions.contains_key(&table_name) {
762 let backend = self.backend.storage.backend();
763 let version = backend
764 .get_table_version(&table_name)
765 .await
766 .map_err(UniError::Internal)?;
767 self.initial_table_versions.insert(table_name, version);
768 }
769
770 let main_table_name =
772 uni_store::backend::table_names::main_vertex_table_name().to_string();
773 if !self.initial_table_versions.contains_key(&main_table_name) {
774 let backend = self.backend.storage.backend();
775 let version = backend
776 .get_table_version(&main_table_name)
777 .await
778 .map_err(UniError::Internal)?;
779 self.initial_table_versions
780 .insert(main_table_name.clone(), version);
781 }
782
783 self.persist_active_intent().await?;
787
788 let ds = self
789 .backend
790 .storage
791 .vertex_dataset(label)
792 .map_err(UniError::Internal)?;
793 let schema = self.backend.schema.schema();
794
795 let deleted = vec![false; vertices.len()];
796 let versions = vec![1; vertices.len()]; let now = Self::get_current_timestamp_micros();
800 let mut created_at: HashMap<Vid, i64> = HashMap::new();
801 let mut updated_at: HashMap<Vid, i64> = HashMap::new();
802 for (vid, _) in &vertices {
803 created_at.insert(*vid, now);
804 updated_at.insert(*vid, now);
805 }
806
807 let labels = vec![label.to_string()];
810 let vertices_with_labels: Vec<(Vid, Vec<String>, Properties)> = vertices
811 .iter()
812 .map(|(vid, props)| (*vid, labels.clone(), props.clone()))
813 .collect();
814
815 let batch = ds
816 .build_record_batch_with_timestamps(
817 &vertices_with_labels,
818 &deleted,
819 &versions,
820 &schema,
821 Some(&created_at),
822 Some(&updated_at),
823 )
824 .map_err(UniError::Internal)?;
825
826 let backend = self.backend.storage.backend();
828 ds.write_batch(backend, batch, &schema)
829 .await
830 .map_err(UniError::Internal)?;
831
832 ds.ensure_default_indexes(backend)
834 .await
835 .map_err(UniError::Internal)?;
836
837 if FAIL_AFTER_PERLABEL_WRITE.load(std::sync::atomic::Ordering::SeqCst) {
841 return Err(anyhow!("injected failure after per-label vertex commit"));
842 }
843
844 let main_vertices: Vec<(Vid, Vec<String>, Properties, bool, u64)> =
846 vertices_with_labels
847 .into_iter()
848 .map(|(vid, lbls, props)| (vid, lbls, props, false, 1u64))
849 .collect();
850
851 if !main_vertices.is_empty() {
852 let main_batch = MainVertexDataset::build_record_batch(
853 &main_vertices,
854 Some(&created_at),
855 Some(&updated_at),
856 )
857 .map_err(UniError::Internal)?;
858
859 MainVertexDataset::write_batch(backend, main_batch)
860 .await
861 .map_err(UniError::Internal)?;
862
863 MainVertexDataset::ensure_default_indexes(backend)
864 .await
865 .map_err(UniError::Internal)?;
866 }
867 }
868 Ok(())
869 }
870
871 pub async fn insert_edges(
882 &mut self,
883 edge_type: &str,
884 edges: Vec<EdgeData>,
885 ) -> Result<Vec<Eid>> {
886 let schema = self.backend.schema.schema();
887 schema
890 .edge_types
891 .get(edge_type)
892 .ok_or_else(|| UniError::EdgeTypeNotFound {
893 edge_type: edge_type.to_string(),
894 })?;
895
896 let eids = {
898 let writer = self.backend.writer.as_ref().unwrap();
899 writer
900 .allocate_eids(edges.len())
901 .await
902 .map_err(UniError::Internal)?
903 };
904
905 let now = Self::get_current_timestamp_micros();
907 let mut added_size = 0usize;
908 let entries: Vec<L1Entry> = edges
909 .into_iter()
910 .enumerate()
911 .map(|(i, edge)| {
912 added_size += 32 + Self::estimate_properties_size(&edge.properties);
914 L1Entry {
915 src_vid: edge.src_vid,
916 dst_vid: edge.dst_vid,
917 eid: eids[i],
918 op: Op::Insert,
919 version: 1,
920 properties: edge.properties,
921 created_at: Some(now),
922 updated_at: Some(now),
923 }
924 })
925 .collect();
926 self.buffer_size_bytes += added_size;
927 self.pending_edges
928 .entry(edge_type.to_string())
929 .or_default()
930 .extend(entries);
931
932 self.touched_edge_types.insert(edge_type.to_string());
933
934 if self.buffer_size_bytes >= self.config.max_buffer_size_bytes {
936 self.checkpoint().await?;
937 } else {
938 self.check_flush_edges(edge_type).await?;
939 }
940
941 self.stats.edges_inserted += eids.len();
942 self.report_progress(
943 BulkPhase::Inserting,
944 self.stats.vertices_inserted + self.stats.edges_inserted,
945 Some(edge_type.to_string()),
946 );
947
948 Ok(eids)
949 }
950
951 async fn check_flush_edges(&mut self, edge_type: &str) -> Result<()> {
953 let should_flush = self
954 .pending_edges
955 .get(edge_type)
956 .is_some_and(|buf| buf.len() >= self.config.batch_size);
957
958 if should_flush {
959 self.flush_edges_buffer(edge_type).await?;
960 }
961 Ok(())
962 }
963
964 #[expect(
969 clippy::map_entry,
970 reason = "async code between contains_key and insert"
971 )]
972 async fn flush_edges_buffer(&mut self, edge_type: &str) -> Result<()> {
973 if let Some(entries) = self.pending_edges.remove(edge_type) {
974 if entries.is_empty() {
975 return Ok(());
976 }
977
978 let schema = self.backend.schema.schema();
979 let backend = self.backend.storage.backend();
980
981 let fwd_table_name =
983 uni_store::backend::table_names::delta_table_name(edge_type, "fwd");
984 if !self.initial_table_versions.contains_key(&fwd_table_name) {
985 let version = backend
986 .get_table_version(&fwd_table_name)
987 .await
988 .map_err(UniError::Internal)?;
989 self.initial_table_versions.insert(fwd_table_name, version);
990 }
991 let bwd_table_name =
992 uni_store::backend::table_names::delta_table_name(edge_type, "bwd");
993 if !self.initial_table_versions.contains_key(&bwd_table_name) {
994 let version = backend
995 .get_table_version(&bwd_table_name)
996 .await
997 .map_err(UniError::Internal)?;
998 self.initial_table_versions.insert(bwd_table_name, version);
999 }
1000
1001 let main_edge_table_name =
1003 uni_store::backend::table_names::main_edge_table_name().to_string();
1004 if !self
1005 .initial_table_versions
1006 .contains_key(&main_edge_table_name)
1007 {
1008 let version = backend
1009 .get_table_version(&main_edge_table_name)
1010 .await
1011 .map_err(UniError::Internal)?;
1012 self.initial_table_versions
1013 .insert(main_edge_table_name.clone(), version);
1014 }
1015
1016 self.persist_active_intent().await?;
1019
1020 let mut fwd_entries = entries.clone();
1022 fwd_entries.sort_by_key(|e| e.src_vid);
1023 let fwd_ds = self
1024 .backend
1025 .storage
1026 .delta_dataset(edge_type, "fwd")
1027 .map_err(UniError::Internal)?;
1028 let fwd_batch = fwd_ds
1029 .build_record_batch(&fwd_entries, &schema)
1030 .map_err(UniError::Internal)?;
1031 let backend = self.backend.storage.backend();
1032 fwd_ds
1033 .write_run(backend, fwd_batch)
1034 .await
1035 .map_err(UniError::Internal)?;
1036 fwd_ds
1037 .ensure_eid_index(backend)
1038 .await
1039 .map_err(UniError::Internal)?;
1040
1041 let mut bwd_entries = entries.clone();
1043 bwd_entries.sort_by_key(|e| e.dst_vid);
1044 let bwd_ds = self
1045 .backend
1046 .storage
1047 .delta_dataset(edge_type, "bwd")
1048 .map_err(UniError::Internal)?;
1049 let bwd_batch = bwd_ds
1050 .build_record_batch(&bwd_entries, &schema)
1051 .map_err(UniError::Internal)?;
1052 bwd_ds
1053 .write_run(backend, bwd_batch)
1054 .await
1055 .map_err(UniError::Internal)?;
1056 bwd_ds
1057 .ensure_eid_index(backend)
1058 .await
1059 .map_err(UniError::Internal)?;
1060
1061 let mut edge_created_at: HashMap<Eid, i64> = HashMap::new();
1063 let mut edge_updated_at: HashMap<Eid, i64> = HashMap::new();
1064 let main_edges: Vec<(Eid, Vid, Vid, String, Properties, bool, u64)> = entries
1065 .iter()
1066 .map(|e| {
1067 let deleted = matches!(e.op, Op::Delete);
1068 if let Some(ts) = e.created_at {
1069 edge_created_at.insert(e.eid, ts);
1070 }
1071 if let Some(ts) = e.updated_at {
1072 edge_updated_at.insert(e.eid, ts);
1073 }
1074 (
1075 e.eid,
1076 e.src_vid,
1077 e.dst_vid,
1078 edge_type.to_string(),
1079 e.properties.clone(),
1080 deleted,
1081 e.version,
1082 )
1083 })
1084 .collect();
1085
1086 if !main_edges.is_empty() {
1087 let main_batch = MainEdgeDataset::build_record_batch(
1088 &main_edges,
1089 Some(&edge_created_at),
1090 Some(&edge_updated_at),
1091 )
1092 .map_err(UniError::Internal)?;
1093
1094 MainEdgeDataset::write_batch(self.backend.storage.backend(), main_batch)
1095 .await
1096 .map_err(UniError::Internal)?;
1097
1098 MainEdgeDataset::ensure_default_indexes(self.backend.storage.backend())
1099 .await
1100 .map_err(UniError::Internal)?;
1101 }
1102 }
1103 Ok(())
1104 }
1105
1106 pub async fn commit(mut self) -> Result<BulkStats> {
1115 let labels: Vec<String> = self.pending_vertices.keys().cloned().collect();
1117 for label in labels {
1118 self.flush_vertices_buffer(&label).await?;
1119 }
1120
1121 let edge_types: Vec<String> = self.pending_edges.keys().cloned().collect();
1123 for edge_type in edge_types {
1124 self.flush_edges_buffer(&edge_type).await?;
1125 }
1126
1127 let index_start = Instant::now();
1128
1129 if self.config.defer_vector_indexes || self.config.defer_scalar_indexes {
1131 let labels_to_rebuild: Vec<String> = self.touched_labels.iter().cloned().collect();
1132
1133 if self.config.async_indexes && !labels_to_rebuild.is_empty() {
1134 let schema = self.backend.schema.schema();
1136 for label in &labels_to_rebuild {
1137 for idx in &schema.indexes {
1138 if idx.label() == label.as_str() {
1139 let _ = self.backend.schema.update_index_metadata(idx.name(), |m| {
1140 m.status = uni_common::core::schema::IndexStatus::Stale;
1141 });
1142 }
1143 }
1144 }
1145
1146 let rebuild_manager = IndexRebuildManager::new(
1147 self.backend.storage.clone(),
1148 self.backend.schema.clone(),
1149 self.backend.config.index_rebuild.clone(),
1150 )
1151 .await
1152 .map_err(UniError::Internal)?;
1153
1154 let task_ids = rebuild_manager
1155 .schedule(labels_to_rebuild)
1156 .await
1157 .map_err(UniError::Internal)?;
1158
1159 self.stats.index_task_ids = task_ids;
1160 self.stats.indexes_pending = true;
1161
1162 let manager = Arc::new(rebuild_manager);
1163 let handle = manager.start_background_worker(self.backend.shutdown.subscribe());
1164 self.backend.shutdown.track_task(handle);
1165 } else {
1166 for label in &labels_to_rebuild {
1168 self.report_progress(
1169 BulkPhase::RebuildingIndexes {
1170 label: label.clone(),
1171 },
1172 self.stats.vertices_inserted + self.stats.edges_inserted,
1173 Some(label.clone()),
1174 );
1175 let idx_mgr = IndexManager::new(
1176 self.backend.storage.base_path(),
1177 self.backend.storage.schema_manager_arc(),
1178 );
1179 idx_mgr
1180 .rebuild_indexes_for_label(label)
1181 .await
1182 .map_err(UniError::Internal)?;
1183 self.stats.indexes_rebuilt += 1;
1184
1185 let now = Utc::now();
1187 let vtable_name = uni_store::backend::table_names::vertex_table_name(label);
1188 let row_count = self
1189 .backend
1190 .storage
1191 .backend()
1192 .count_rows(&vtable_name, None)
1193 .await
1194 .ok()
1195 .map(|c| c as u64);
1196
1197 let schema = self.backend.schema.schema();
1198 for idx in &schema.indexes {
1199 if idx.label() == label.as_str() {
1200 let _ = self.backend.schema.update_index_metadata(idx.name(), |m| {
1201 m.status = uni_common::core::schema::IndexStatus::Online;
1202 m.last_built_at = Some(now);
1203 if let Some(count) = row_count {
1204 m.row_count_at_build = Some(count);
1205 }
1206 });
1207 }
1208 }
1209 }
1210 }
1211 }
1212
1213 self.stats.index_build_duration = index_start.elapsed();
1214
1215 self.report_progress(
1217 BulkPhase::Finalizing,
1218 self.stats.vertices_inserted + self.stats.edges_inserted,
1219 None,
1220 );
1221
1222 let mut manifest = self
1224 .backend
1225 .storage
1226 .snapshot_manager()
1227 .load_latest_snapshot()
1228 .await
1229 .map_err(UniError::Internal)?
1230 .unwrap_or_else(|| {
1231 SnapshotManifest::new(
1232 Uuid::new_v4().to_string(),
1233 self.backend.schema.schema().schema_version,
1234 )
1235 });
1236
1237 let parent_id = manifest.snapshot_id.clone();
1239 manifest.parent_snapshot = Some(parent_id);
1240 manifest.snapshot_id = Uuid::new_v4().to_string();
1241 manifest.created_at = Utc::now();
1242
1243 let backend = self.backend.storage.backend();
1245 for label in &self.touched_labels {
1246 let vtable_name = uni_store::backend::table_names::vertex_table_name(label);
1247 let count = backend
1248 .count_rows(&vtable_name, None)
1249 .await
1250 .map_err(UniError::Internal)?;
1251
1252 let current_snap =
1253 manifest
1254 .vertices
1255 .entry(label.to_string())
1256 .or_insert(LabelSnapshot {
1257 version: 0,
1258 count: 0,
1259 lance_version: 0,
1260 });
1261 current_snap.count = count as u64;
1262 current_snap.lance_version = 0;
1264 }
1265
1266 for edge_type in &self.touched_edge_types {
1268 let delta_name = uni_store::backend::table_names::delta_table_name(edge_type, "fwd");
1269 if let Ok(count) = backend.count_rows(&delta_name, None).await {
1270 let current_snap =
1271 manifest
1272 .edges
1273 .entry(edge_type.to_string())
1274 .or_insert(EdgeSnapshot {
1275 version: 0,
1276 count: 0,
1277 lance_version: 0,
1278 });
1279 current_snap.count = count as u64;
1280 current_snap.lance_version = 0;
1282 }
1283 }
1284
1285 self.backend
1291 .storage
1292 .snapshot_manager()
1293 .save_snapshot(&manifest)
1294 .await
1295 .map_err(UniError::Internal)?;
1296 if !self.initial_table_versions.is_empty() {
1297 let store = self.backend.storage.store();
1298 flush_intent::write_committed(
1299 &store,
1300 &self.initial_table_versions,
1301 &manifest.snapshot_id,
1302 )
1303 .await
1304 .map_err(UniError::Internal)?;
1305 }
1306 self.backend
1307 .storage
1308 .snapshot_manager()
1309 .set_latest_snapshot(&manifest.snapshot_id)
1310 .await
1311 .map_err(UniError::Internal)?;
1312
1313 self.backend
1315 .schema
1316 .save()
1317 .await
1318 .map_err(UniError::Internal)?;
1319
1320 let schema = self.backend.storage.schema_manager().schema();
1323 for edge_type_name in &self.touched_edge_types {
1324 if let Some(meta) = schema.edge_types.get(edge_type_name.as_str()) {
1325 let type_id = meta.id;
1326 for &dir in uni_store::storage::direction::Direction::Both.expand() {
1327 let _ = self
1328 .backend
1329 .storage
1330 .warm_adjacency(type_id, dir, None)
1331 .await;
1332 }
1333 }
1334 }
1335
1336 if !self.initial_table_versions.is_empty() {
1341 let store = self.backend.storage.store();
1342 flush_intent::clear(&store)
1343 .await
1344 .map_err(UniError::Internal)?;
1345 }
1346
1347 self.committed = true;
1348 self.stats.duration = self.start_time.elapsed();
1349 Ok(self.stats.clone())
1350 }
1351
1352 pub async fn abort(mut self) -> Result<()> {
1363 if self.committed {
1364 return Err(anyhow!("Cannot abort: bulk load already committed"));
1365 }
1366
1367 self.pending_vertices.clear();
1369 self.pending_edges.clear();
1370 self.buffer_size_bytes = 0;
1371
1372 let backend = self.backend.storage.backend();
1374 let mut rollback_errors = Vec::new();
1375 let mut rolled_back_count = 0;
1376 let mut dropped_count = 0;
1377
1378 for (table_name, initial_version) in &self.initial_table_versions {
1379 match initial_version {
1380 Some(version) => {
1381 match backend.rollback_table(table_name, *version).await {
1383 Ok(()) => {
1384 log::info!("Rolled back table '{}' to version {}", table_name, version);
1385 rolled_back_count += 1;
1386 }
1387 Err(e) => {
1388 rollback_errors.push(format!("{}: {}", table_name, e));
1389 }
1390 }
1391 }
1392 None => {
1393 match backend.drop_table(table_name).await {
1395 Ok(()) => {
1396 log::info!("Dropped table '{}' (created during bulk load)", table_name);
1397 dropped_count += 1;
1398 }
1399 Err(e) => {
1400 rollback_errors.push(format!("{}: {}", table_name, e));
1401 }
1402 }
1403 }
1404 }
1405 }
1406
1407 self.backend.storage.backend().clear_cache();
1409
1410 let store = self.backend.storage.store();
1413 if let Err(e) = flush_intent::clear(&store).await {
1414 rollback_errors.push(format!("intent marker: {e}"));
1415 }
1416
1417 if rollback_errors.is_empty() {
1418 log::info!(
1419 "Bulk load aborted successfully. Rolled back {} tables, dropped {} tables.",
1420 rolled_back_count,
1421 dropped_count
1422 );
1423 Ok(())
1424 } else {
1425 Err(anyhow!(
1426 "Bulk load abort had {} rollback errors: {}",
1427 rollback_errors.len(),
1428 rollback_errors.join("; ")
1429 ))
1430 }
1431 }
1432
1433 async fn persist_active_intent(&self) -> Result<()> {
1437 let store = self.backend.storage.store();
1438 flush_intent::write_active(&store, &self.initial_table_versions)
1439 .await
1440 .map_err(UniError::Internal)?;
1441 Ok(())
1442 }
1443
1444 fn report_progress(&self, phase: BulkPhase, rows: usize, label: Option<String>) {
1445 if let Some(cb) = &self.progress_callback {
1446 cb(BulkProgress {
1447 phase,
1448 rows_processed: rows,
1449 total_rows: None,
1450 current_label: label,
1451 elapsed: self.start_time.elapsed(),
1452 });
1453 }
1454 }
1455}