1use crate::api::Uni;
30use anyhow::{Result, anyhow};
31use chrono::Utc;
32use std::collections::{HashMap, HashSet};
33use std::sync::Arc;
34use std::time::{Duration, Instant};
35use uni_common::Value;
36use uni_common::core::id::{Eid, Vid};
37use uni_common::core::snapshot::{EdgeSnapshot, LabelSnapshot, SnapshotManifest};
38use uni_common::{Properties, UniError};
39use uni_store::storage::delta::{L1Entry, Op};
40use uni_store::storage::main_edge::MainEdgeDataset;
41use uni_store::storage::main_vertex::MainVertexDataset;
42use uni_store::storage::{IndexManager, IndexRebuildManager};
43use uuid::Uuid;
44
45pub struct BulkWriterBuilder<'a> {
47 db: &'a Uni,
48 config: BulkConfig,
49 progress_callback: Option<Box<dyn Fn(BulkProgress) + Send>>,
50}
51
52impl<'a> BulkWriterBuilder<'a> {
53 pub fn new(db: &'a Uni) -> Self {
55 Self {
56 db,
57 config: BulkConfig::default(),
58 progress_callback: None,
59 }
60 }
61
62 pub fn defer_vector_indexes(mut self, defer: bool) -> Self {
64 self.config.defer_vector_indexes = defer;
65 self
66 }
67
68 pub fn defer_scalar_indexes(mut self, defer: bool) -> Self {
70 self.config.defer_scalar_indexes = defer;
71 self
72 }
73
74 pub fn batch_size(mut self, size: usize) -> Self {
76 self.config.batch_size = size;
77 self
78 }
79
80 pub fn on_progress<F: Fn(BulkProgress) + Send + 'static>(mut self, f: F) -> Self {
82 self.progress_callback = Some(Box::new(f));
83 self
84 }
85
86 pub fn async_indexes(mut self, async_: bool) -> Self {
96 self.config.async_indexes = async_;
97 self
98 }
99
100 pub fn validate_constraints(mut self, validate: bool) -> Self {
108 self.config.validate_constraints = validate;
109 self
110 }
111
112 pub fn max_buffer_size_bytes(mut self, size: usize) -> Self {
120 self.config.max_buffer_size_bytes = size;
121 self
122 }
123
124 pub fn build(self) -> Result<BulkWriter<'a>> {
130 if self.db.writer.is_none() {
131 return Err(anyhow!("BulkWriter requires a writable database instance"));
132 }
133
134 Ok(BulkWriter {
135 db: self.db,
136 config: self.config,
137 progress_callback: self.progress_callback,
138 stats: BulkStats::default(),
139 start_time: Instant::now(),
140 pending_vertices: HashMap::new(),
141 pending_edges: HashMap::new(),
142 touched_labels: HashSet::new(),
143 touched_edge_types: HashSet::new(),
144 initial_table_versions: HashMap::new(),
145 buffer_size_bytes: 0,
146 committed: false,
147 })
148 }
149}
150
151pub struct BulkConfig {
153 pub defer_vector_indexes: bool,
155 pub defer_scalar_indexes: bool,
157 pub batch_size: usize,
159 pub async_indexes: bool,
161 pub validate_constraints: bool,
165 pub max_buffer_size_bytes: usize,
171}
172
173impl Default for BulkConfig {
174 fn default() -> Self {
175 Self {
176 defer_vector_indexes: true,
177 defer_scalar_indexes: true,
178 batch_size: 10_000,
179 async_indexes: false,
180 validate_constraints: true,
181 max_buffer_size_bytes: 1_073_741_824, }
183 }
184}
185
186#[derive(Debug, Clone)]
187pub struct BulkProgress {
188 pub phase: BulkPhase,
189 pub rows_processed: usize,
190 pub total_rows: Option<usize>,
191 pub current_label: Option<String>,
192 pub elapsed: Duration,
193}
194
195#[derive(Debug, Clone)]
196pub enum BulkPhase {
197 Inserting,
198 RebuildingIndexes { label: String },
199 Finalizing,
200}
201
202#[derive(Debug, Clone, Default)]
203pub struct BulkStats {
204 pub vertices_inserted: usize,
205 pub edges_inserted: usize,
206 pub indexes_rebuilt: usize,
207 pub duration: Duration,
208 pub index_build_duration: Duration,
209 pub index_task_ids: Vec<String>,
211 pub indexes_pending: bool,
213}
214
215#[derive(Debug, Clone)]
219pub struct EdgeData {
220 pub src_vid: Vid,
222 pub dst_vid: Vid,
224 pub properties: Properties,
226}
227
228impl EdgeData {
229 pub fn new(src_vid: Vid, dst_vid: Vid, properties: Properties) -> Self {
231 Self {
232 src_vid,
233 dst_vid,
234 properties,
235 }
236 }
237}
238
239pub struct BulkWriter<'a> {
248 db: &'a Uni,
249 config: BulkConfig,
250 progress_callback: Option<Box<dyn Fn(BulkProgress) + Send>>,
251 stats: BulkStats,
252 start_time: Instant,
253 pending_vertices: HashMap<String, Vec<(Vid, Properties)>>,
255 pending_edges: HashMap<String, Vec<L1Entry>>,
256 touched_labels: HashSet<String>,
258 touched_edge_types: HashSet<String>,
259 initial_table_versions: HashMap<String, Option<u64>>,
262 buffer_size_bytes: usize,
264 committed: bool,
265}
266
267impl<'a> BulkWriter<'a> {
268 fn get_current_timestamp_micros(&self) -> i64 {
270 use std::time::{SystemTime, UNIX_EPOCH};
271 SystemTime::now()
272 .duration_since(UNIX_EPOCH)
273 .map(|d| d.as_micros() as i64)
274 .unwrap_or(0)
275 }
276
277 pub async fn insert_vertices(
290 &mut self,
291 label: &str,
292 vertices: Vec<HashMap<String, Value>>,
293 ) -> Result<Vec<Vid>> {
294 let schema = self.db.schema.schema();
295 schema
297 .labels
298 .get(label)
299 .ok_or_else(|| UniError::LabelNotFound {
300 label: label.to_string(),
301 })?;
302 if self.config.validate_constraints {
304 self.validate_vertex_batch_constraints(label, &vertices)
305 .await?;
306 }
307
308 let vids = {
310 let writer = self.db.writer.as_ref().unwrap().read().await;
311 writer
312 .allocate_vids(vertices.len())
313 .await
314 .map_err(UniError::Internal)?
315 };
316
317 let buffer = self.pending_vertices.entry(label.to_string()).or_default();
319 for (i, props) in vertices.into_iter().enumerate() {
320 self.buffer_size_bytes += Self::estimate_properties_size(&props);
321 buffer.push((vids[i], props));
322 }
323
324 self.touched_labels.insert(label.to_string());
325
326 if self.buffer_size_bytes >= self.config.max_buffer_size_bytes {
328 self.checkpoint().await?;
329 } else {
330 self.check_flush_vertices(label).await?;
332 }
333
334 self.stats.vertices_inserted += vids.len();
335 self.report_progress(
336 BulkPhase::Inserting,
337 self.stats.vertices_inserted,
338 Some(label.to_string()),
339 );
340
341 Ok(vids)
342 }
343
344 fn estimate_properties_size(props: &Properties) -> usize {
346 let mut size = 0;
347 for (key, value) in props {
348 size += key.len();
349 size += Self::estimate_value_size(value);
350 }
351 size
352 }
353
354 fn estimate_value_size(value: &Value) -> usize {
356 match value {
357 Value::Null => 1,
358 Value::Bool(_) => 1,
359 Value::Int(_) | Value::Float(_) => 8,
360 Value::String(s) => s.len(),
361 Value::Bytes(b) => b.len(),
362 Value::List(arr) => arr.iter().map(Self::estimate_value_size).sum::<usize>() + 8,
363 Value::Map(obj) => {
364 obj.iter()
365 .map(|(k, v)| k.len() + Self::estimate_value_size(v))
366 .sum::<usize>()
367 + 8
368 }
369 Value::Vector(v) => v.len() * 4,
370 _ => 16, }
372 }
373
374 async fn validate_vertex_batch_constraints(
379 &self,
380 label: &str,
381 vertices: &[Properties],
382 ) -> Result<()> {
383 let schema = self.db.schema.schema();
384
385 if let Some(props_meta) = schema.properties.get(label) {
387 for (idx, props) in vertices.iter().enumerate() {
388 for (prop_name, meta) in props_meta {
390 if !meta.nullable && props.get(prop_name).is_none_or(|v| v.is_null()) {
391 return Err(anyhow!(
392 "NOT NULL constraint violation at row {}: property '{}' cannot be null for label '{}'",
393 idx,
394 prop_name,
395 label
396 ));
397 }
398 }
399 }
400 }
401
402 for constraint in &schema.constraints {
404 if !constraint.enabled {
405 continue;
406 }
407 match &constraint.target {
408 uni_common::core::schema::ConstraintTarget::Label(l) if l == label => {}
409 _ => continue,
410 }
411
412 match &constraint.constraint_type {
413 uni_common::core::schema::ConstraintType::Unique {
414 properties: unique_props,
415 } => {
416 let mut seen_keys: HashSet<String> = HashSet::new();
418 for (idx, props) in vertices.iter().enumerate() {
419 let key = self.compute_unique_key(unique_props, props);
420 if let Some(k) = key
421 && !seen_keys.insert(k.clone())
422 {
423 return Err(anyhow!(
424 "UNIQUE constraint violation at row {}: duplicate key '{}' in batch",
425 idx,
426 k
427 ));
428 }
429 }
430
431 if let Some(buffered) = self.pending_vertices.get(label) {
433 for (idx, props) in vertices.iter().enumerate() {
434 let key = self.compute_unique_key(unique_props, props);
435 if let Some(k) = key {
436 for (_, buffered_props) in buffered {
437 let buffered_key =
438 self.compute_unique_key(unique_props, buffered_props);
439 if buffered_key.as_ref() == Some(&k) {
440 return Err(anyhow!(
441 "UNIQUE constraint violation at row {}: key '{}' conflicts with buffered data",
442 idx,
443 k
444 ));
445 }
446 }
447 }
448 }
449 }
450 }
451 uni_common::core::schema::ConstraintType::Exists { property } => {
452 for (idx, props) in vertices.iter().enumerate() {
453 if props.get(property).is_none_or(|v| v.is_null()) {
454 return Err(anyhow!(
455 "EXISTS constraint violation at row {}: property '{}' must exist",
456 idx,
457 property
458 ));
459 }
460 }
461 }
462 uni_common::core::schema::ConstraintType::Check { expression } => {
463 for (idx, props) in vertices.iter().enumerate() {
464 if !self.evaluate_check_expression(expression, props)? {
465 return Err(anyhow!(
466 "CHECK constraint '{}' violated at row {}: expression '{}' evaluated to false",
467 constraint.name,
468 idx,
469 expression
470 ));
471 }
472 }
473 }
474 _ => {}
475 }
476 }
477
478 Ok(())
479 }
480
481 fn compute_unique_key(&self, unique_props: &[String], props: &Properties) -> Option<String> {
483 let mut parts = Vec::new();
484 for prop in unique_props {
485 match props.get(prop) {
486 Some(v) if !v.is_null() => parts.push(v.to_string()),
487 _ => return None, }
489 }
490 Some(parts.join(":"))
491 }
492
493 fn evaluate_check_expression(&self, expression: &str, properties: &Properties) -> Result<bool> {
495 let parts: Vec<&str> = expression.split_whitespace().collect();
496 if parts.len() != 3 {
497 return Ok(true);
499 }
500
501 let prop_part = parts[0].trim_start_matches('(');
502 let prop_name = if let Some(idx) = prop_part.find('.') {
503 &prop_part[idx + 1..]
504 } else {
505 prop_part
506 };
507
508 let op = parts[1];
509 let val_str = parts[2].trim_end_matches(')');
510
511 let prop_val = match properties.get(prop_name) {
512 Some(v) => v,
513 None => return Ok(true), };
515
516 let target_val = if (val_str.starts_with('\'') && val_str.ends_with('\''))
518 || (val_str.starts_with('"') && val_str.ends_with('"'))
519 {
520 Value::String(val_str[1..val_str.len() - 1].to_string())
521 } else if let Ok(n) = val_str.parse::<i64>() {
522 Value::Int(n)
523 } else if let Ok(n) = val_str.parse::<f64>() {
524 Value::Float(n)
525 } else if let Ok(b) = val_str.parse::<bool>() {
526 Value::Bool(b)
527 } else {
528 Value::String(val_str.to_string())
529 };
530
531 match op {
532 "=" | "==" => Ok(prop_val == &target_val),
533 "!=" | "<>" => Ok(prop_val != &target_val),
534 ">" => self
535 .compare_json_values(prop_val, &target_val)
536 .map(|c| c > 0),
537 "<" => self
538 .compare_json_values(prop_val, &target_val)
539 .map(|c| c < 0),
540 ">=" => self
541 .compare_json_values(prop_val, &target_val)
542 .map(|c| c >= 0),
543 "<=" => self
544 .compare_json_values(prop_val, &target_val)
545 .map(|c| c <= 0),
546 _ => Ok(true), }
548 }
549
550 fn compare_json_values(&self, a: &Value, b: &Value) -> Result<i8> {
552 match (a, b) {
553 (Value::Int(n1), Value::Int(n2)) => Ok(n1.cmp(n2) as i8),
554 (Value::Float(f1), Value::Float(f2)) => {
555 if f1 < f2 {
556 Ok(-1)
557 } else if f1 > f2 {
558 Ok(1)
559 } else {
560 Ok(0)
561 }
562 }
563 (Value::Int(n), Value::Float(f)) => {
564 let nf = *n as f64;
565 if nf < *f {
566 Ok(-1)
567 } else if nf > *f {
568 Ok(1)
569 } else {
570 Ok(0)
571 }
572 }
573 (Value::Float(f), Value::Int(n)) => {
574 let nf = *n as f64;
575 if *f < nf {
576 Ok(-1)
577 } else if *f > nf {
578 Ok(1)
579 } else {
580 Ok(0)
581 }
582 }
583 (Value::String(s1), Value::String(s2)) => match s1.cmp(s2) {
584 std::cmp::Ordering::Less => Ok(-1),
585 std::cmp::Ordering::Greater => Ok(1),
586 std::cmp::Ordering::Equal => Ok(0),
587 },
588 _ => Err(anyhow!(
589 "Cannot compare incompatible types: {:?} vs {:?}",
590 a,
591 b
592 )),
593 }
594 }
595
596 async fn checkpoint(&mut self) -> Result<()> {
601 log::debug!(
602 "Checkpoint triggered at {} bytes (limit: {})",
603 self.buffer_size_bytes,
604 self.config.max_buffer_size_bytes
605 );
606
607 let labels: Vec<String> = self.pending_vertices.keys().cloned().collect();
609 for label in labels {
610 self.flush_vertices_buffer(&label).await?;
611 }
612
613 let edge_types: Vec<String> = self.pending_edges.keys().cloned().collect();
615 for edge_type in edge_types {
616 self.flush_edges_buffer(&edge_type).await?;
617 }
618
619 self.buffer_size_bytes = 0;
621
622 Ok(())
623 }
624
625 async fn check_flush_vertices(&mut self, label: &str) -> Result<()> {
627 let should_flush = {
628 if let Some(buf) = self.pending_vertices.get(label) {
629 buf.len() >= self.config.batch_size
630 } else {
631 false
632 }
633 };
634
635 if should_flush {
636 self.flush_vertices_buffer(label).await?;
637 }
638 Ok(())
639 }
640
641 async fn flush_vertices_buffer(&mut self, label: &str) -> Result<()> {
646 if let Some(vertices) = self.pending_vertices.remove(label) {
647 if vertices.is_empty() {
648 return Ok(());
649 }
650
651 let table_name = uni_store::lancedb::LanceDbStore::vertex_table_name(label);
653 if !self.initial_table_versions.contains_key(&table_name) {
654 let lancedb_store = self.db.storage.lancedb_store();
655 let version = lancedb_store
656 .get_table_version(&table_name)
657 .await
658 .map_err(UniError::Internal)?;
659 self.initial_table_versions.insert(table_name, version);
660 }
661
662 let main_table_name =
664 uni_store::lancedb::LanceDbStore::main_vertex_table_name().to_string();
665 if !self.initial_table_versions.contains_key(&main_table_name) {
666 let lancedb_store = self.db.storage.lancedb_store();
667 let version = lancedb_store
668 .get_table_version(&main_table_name)
669 .await
670 .map_err(UniError::Internal)?;
671 self.initial_table_versions
672 .insert(main_table_name.clone(), version);
673 }
674
675 let ds = self
676 .db
677 .storage
678 .vertex_dataset(label)
679 .map_err(UniError::Internal)?;
680 let schema = self.db.schema.schema();
681
682 let deleted = vec![false; vertices.len()];
683 let versions = vec![1; vertices.len()]; let now = self.get_current_timestamp_micros();
687 let mut created_at: HashMap<Vid, i64> = HashMap::new();
688 let mut updated_at: HashMap<Vid, i64> = HashMap::new();
689 for (vid, _) in &vertices {
690 created_at.insert(*vid, now);
691 updated_at.insert(*vid, now);
692 }
693
694 let labels = vec![label.to_string()];
697 let vertices_with_labels: Vec<(Vid, Vec<String>, Properties)> = vertices
698 .iter()
699 .map(|(vid, props)| (*vid, labels.clone(), props.clone()))
700 .collect();
701
702 let batch = ds
703 .build_record_batch_with_timestamps(
704 &vertices_with_labels,
705 &deleted,
706 &versions,
707 &schema,
708 Some(&created_at),
709 Some(&updated_at),
710 )
711 .map_err(UniError::Internal)?;
712
713 let lancedb_store = self.db.storage.lancedb_store();
715 let table = ds
716 .write_batch_lancedb(lancedb_store, batch, &schema)
717 .await
718 .map_err(UniError::Internal)?;
719
720 ds.ensure_default_indexes_lancedb(&table)
722 .await
723 .map_err(UniError::Internal)?;
724
725 let main_vertices: Vec<(Vid, Vec<String>, Properties, bool, u64)> =
727 vertices_with_labels
728 .into_iter()
729 .map(|(vid, lbls, props)| (vid, lbls, props, false, 1u64))
730 .collect();
731
732 if !main_vertices.is_empty() {
733 let main_batch = MainVertexDataset::build_record_batch(
734 &main_vertices,
735 Some(&created_at),
736 Some(&updated_at),
737 )
738 .map_err(UniError::Internal)?;
739
740 let main_table = MainVertexDataset::write_batch_lancedb(lancedb_store, main_batch)
741 .await
742 .map_err(UniError::Internal)?;
743
744 MainVertexDataset::ensure_default_indexes_lancedb(&main_table)
745 .await
746 .map_err(UniError::Internal)?;
747 }
748 }
749 Ok(())
750 }
751
752 pub async fn insert_edges(
763 &mut self,
764 edge_type: &str,
765 edges: Vec<EdgeData>,
766 ) -> Result<Vec<Eid>> {
767 let schema = self.db.schema.schema();
768 let edge_meta =
769 schema
770 .edge_types
771 .get(edge_type)
772 .ok_or_else(|| UniError::EdgeTypeNotFound {
773 edge_type: edge_type.to_string(),
774 })?;
775 let type_id = edge_meta.id;
776
777 let mut eids = Vec::with_capacity(edges.len());
779 {
780 let writer = self.db.writer.as_ref().unwrap().read().await;
781 for _ in 0..edges.len() {
782 eids.push(writer.next_eid(type_id).await.map_err(UniError::Internal)?);
783 }
784 }
785
786 let now = self.get_current_timestamp_micros();
788 let mut added_size = 0usize;
789 let entries: Vec<L1Entry> = edges
790 .into_iter()
791 .enumerate()
792 .map(|(i, edge)| {
793 added_size += 32 + Self::estimate_properties_size(&edge.properties);
795 L1Entry {
796 src_vid: edge.src_vid,
797 dst_vid: edge.dst_vid,
798 eid: eids[i],
799 op: Op::Insert,
800 version: 1,
801 properties: edge.properties,
802 created_at: Some(now),
803 updated_at: Some(now),
804 }
805 })
806 .collect();
807 self.buffer_size_bytes += added_size;
808 self.pending_edges
809 .entry(edge_type.to_string())
810 .or_default()
811 .extend(entries);
812
813 self.touched_edge_types.insert(edge_type.to_string());
814
815 if self.buffer_size_bytes >= self.config.max_buffer_size_bytes {
817 self.checkpoint().await?;
818 } else {
819 self.check_flush_edges(edge_type).await?;
820 }
821
822 self.stats.edges_inserted += eids.len();
823 self.report_progress(
824 BulkPhase::Inserting,
825 self.stats.vertices_inserted + self.stats.edges_inserted,
826 Some(edge_type.to_string()),
827 );
828
829 Ok(eids)
830 }
831
832 async fn check_flush_edges(&mut self, edge_type: &str) -> Result<()> {
834 let should_flush = self
835 .pending_edges
836 .get(edge_type)
837 .is_some_and(|buf| buf.len() >= self.config.batch_size);
838
839 if should_flush {
840 self.flush_edges_buffer(edge_type).await?;
841 }
842 Ok(())
843 }
844
845 #[expect(
850 clippy::map_entry,
851 reason = "async code between contains_key and insert"
852 )]
853 async fn flush_edges_buffer(&mut self, edge_type: &str) -> Result<()> {
854 if let Some(entries) = self.pending_edges.remove(edge_type) {
855 if entries.is_empty() {
856 return Ok(());
857 }
858
859 let schema = self.db.schema.schema();
860 let lancedb_store = self.db.storage.lancedb_store();
861
862 let fwd_table_name =
864 uni_store::lancedb::LanceDbStore::delta_table_name(edge_type, "fwd");
865 if !self.initial_table_versions.contains_key(&fwd_table_name) {
866 let version = lancedb_store
867 .get_table_version(&fwd_table_name)
868 .await
869 .map_err(UniError::Internal)?;
870 self.initial_table_versions.insert(fwd_table_name, version);
871 }
872 let bwd_table_name =
873 uni_store::lancedb::LanceDbStore::delta_table_name(edge_type, "bwd");
874 if !self.initial_table_versions.contains_key(&bwd_table_name) {
875 let version = lancedb_store
876 .get_table_version(&bwd_table_name)
877 .await
878 .map_err(UniError::Internal)?;
879 self.initial_table_versions.insert(bwd_table_name, version);
880 }
881
882 let main_edge_table_name =
884 uni_store::lancedb::LanceDbStore::main_edge_table_name().to_string();
885 if !self
886 .initial_table_versions
887 .contains_key(&main_edge_table_name)
888 {
889 let version = lancedb_store
890 .get_table_version(&main_edge_table_name)
891 .await
892 .map_err(UniError::Internal)?;
893 self.initial_table_versions
894 .insert(main_edge_table_name.clone(), version);
895 }
896
897 let mut fwd_entries = entries.clone();
899 fwd_entries.sort_by_key(|e| e.src_vid);
900 let fwd_ds = self
901 .db
902 .storage
903 .delta_dataset(edge_type, "fwd")
904 .map_err(UniError::Internal)?;
905 let fwd_batch = fwd_ds
906 .build_record_batch(&fwd_entries, &schema)
907 .map_err(UniError::Internal)?;
908 let fwd_table = fwd_ds
909 .write_run_lancedb(lancedb_store, fwd_batch)
910 .await
911 .map_err(UniError::Internal)?;
912 fwd_ds
913 .ensure_eid_index_lancedb(&fwd_table)
914 .await
915 .map_err(UniError::Internal)?;
916
917 let mut bwd_entries = entries.clone();
919 bwd_entries.sort_by_key(|e| e.dst_vid);
920 let bwd_ds = self
921 .db
922 .storage
923 .delta_dataset(edge_type, "bwd")
924 .map_err(UniError::Internal)?;
925 let bwd_batch = bwd_ds
926 .build_record_batch(&bwd_entries, &schema)
927 .map_err(UniError::Internal)?;
928 let bwd_table = bwd_ds
929 .write_run_lancedb(lancedb_store, bwd_batch)
930 .await
931 .map_err(UniError::Internal)?;
932 bwd_ds
933 .ensure_eid_index_lancedb(&bwd_table)
934 .await
935 .map_err(UniError::Internal)?;
936
937 let mut edge_created_at: HashMap<Eid, i64> = HashMap::new();
939 let mut edge_updated_at: HashMap<Eid, i64> = HashMap::new();
940 let main_edges: Vec<(Eid, Vid, Vid, String, Properties, bool, u64)> = entries
941 .iter()
942 .map(|e| {
943 let deleted = matches!(e.op, Op::Delete);
944 if let Some(ts) = e.created_at {
945 edge_created_at.insert(e.eid, ts);
946 }
947 if let Some(ts) = e.updated_at {
948 edge_updated_at.insert(e.eid, ts);
949 }
950 (
951 e.eid,
952 e.src_vid,
953 e.dst_vid,
954 edge_type.to_string(),
955 e.properties.clone(),
956 deleted,
957 e.version,
958 )
959 })
960 .collect();
961
962 if !main_edges.is_empty() {
963 let main_batch = MainEdgeDataset::build_record_batch(
964 &main_edges,
965 Some(&edge_created_at),
966 Some(&edge_updated_at),
967 )
968 .map_err(UniError::Internal)?;
969
970 let main_table = MainEdgeDataset::write_batch_lancedb(lancedb_store, main_batch)
971 .await
972 .map_err(UniError::Internal)?;
973
974 MainEdgeDataset::ensure_default_indexes_lancedb(&main_table)
975 .await
976 .map_err(UniError::Internal)?;
977 }
978 }
979 Ok(())
980 }
981
982 pub async fn commit(mut self) -> Result<BulkStats> {
991 let labels: Vec<String> = self.pending_vertices.keys().cloned().collect();
993 for label in labels {
994 self.flush_vertices_buffer(&label).await?;
995 }
996
997 let edge_types: Vec<String> = self.pending_edges.keys().cloned().collect();
999 for edge_type in edge_types {
1000 self.flush_edges_buffer(&edge_type).await?;
1001 }
1002
1003 let index_start = Instant::now();
1004
1005 if self.config.defer_vector_indexes || self.config.defer_scalar_indexes {
1007 let labels_to_rebuild: Vec<String> = self.touched_labels.iter().cloned().collect();
1008
1009 if self.config.async_indexes && !labels_to_rebuild.is_empty() {
1010 let schema = self.db.schema.schema();
1012 for label in &labels_to_rebuild {
1013 for idx in &schema.indexes {
1014 if idx.label() == label.as_str() {
1015 let _ = self.db.schema.update_index_metadata(idx.name(), |m| {
1016 m.status = uni_common::core::schema::IndexStatus::Stale;
1017 });
1018 }
1019 }
1020 }
1021
1022 let rebuild_manager = IndexRebuildManager::new(
1023 self.db.storage.clone(),
1024 self.db.schema.clone(),
1025 self.db.config.index_rebuild.clone(),
1026 )
1027 .await
1028 .map_err(UniError::Internal)?;
1029
1030 let task_ids = rebuild_manager
1031 .schedule(labels_to_rebuild)
1032 .await
1033 .map_err(UniError::Internal)?;
1034
1035 self.stats.index_task_ids = task_ids;
1036 self.stats.indexes_pending = true;
1037
1038 let manager = Arc::new(rebuild_manager);
1039 let handle = manager.start_background_worker(self.db.shutdown_handle.subscribe());
1040 self.db.shutdown_handle.track_task(handle);
1041 } else {
1042 for label in &labels_to_rebuild {
1044 self.report_progress(
1045 BulkPhase::RebuildingIndexes {
1046 label: label.clone(),
1047 },
1048 self.stats.vertices_inserted + self.stats.edges_inserted,
1049 Some(label.clone()),
1050 );
1051 let idx_mgr = IndexManager::new(
1052 self.db.storage.base_path(),
1053 self.db.storage.schema_manager_arc(),
1054 self.db.storage.lancedb_store_arc(),
1055 );
1056 idx_mgr
1057 .rebuild_indexes_for_label(label)
1058 .await
1059 .map_err(UniError::Internal)?;
1060 self.stats.indexes_rebuilt += 1;
1061
1062 let now = chrono::Utc::now();
1064 let row_count = self
1065 .db
1066 .storage
1067 .lancedb_store()
1068 .open_vertex_table(label)
1069 .await
1070 .ok()
1071 .map(|t| async move { t.count_rows(None).await.ok().map(|c| c as u64) });
1072 let row_count = match row_count {
1073 Some(fut) => fut.await,
1074 None => None,
1075 };
1076
1077 let schema = self.db.schema.schema();
1078 for idx in &schema.indexes {
1079 if idx.label() == label.as_str() {
1080 let _ = self.db.schema.update_index_metadata(idx.name(), |m| {
1081 m.status = uni_common::core::schema::IndexStatus::Online;
1082 m.last_built_at = Some(now);
1083 if let Some(count) = row_count {
1084 m.row_count_at_build = Some(count);
1085 }
1086 });
1087 }
1088 }
1089 }
1090 }
1091 }
1092
1093 self.stats.index_build_duration = index_start.elapsed();
1094
1095 self.report_progress(
1097 BulkPhase::Finalizing,
1098 self.stats.vertices_inserted + self.stats.edges_inserted,
1099 None,
1100 );
1101
1102 let mut manifest = self
1104 .db
1105 .storage
1106 .snapshot_manager()
1107 .load_latest_snapshot()
1108 .await
1109 .map_err(UniError::Internal)?
1110 .unwrap_or_else(|| {
1111 SnapshotManifest::new(
1112 Uuid::new_v4().to_string(),
1113 self.db.schema.schema().schema_version,
1114 )
1115 });
1116
1117 let parent_id = manifest.snapshot_id.clone();
1119 manifest.parent_snapshot = Some(parent_id);
1120 manifest.snapshot_id = Uuid::new_v4().to_string();
1121 manifest.created_at = Utc::now();
1122
1123 let lancedb_store = self.db.storage.lancedb_store();
1125 for label in &self.touched_labels {
1126 let table = lancedb_store
1127 .open_vertex_table(label)
1128 .await
1129 .map_err(UniError::Internal)?;
1130 let count = table
1131 .count_rows(None)
1132 .await
1133 .map_err(|e| UniError::Internal(anyhow::anyhow!("Count rows failed: {}", e)))?;
1134
1135 let current_snap =
1136 manifest
1137 .vertices
1138 .entry(label.to_string())
1139 .or_insert(LabelSnapshot {
1140 version: 0,
1141 count: 0,
1142 lance_version: 0,
1143 });
1144 current_snap.count = count as u64;
1145 current_snap.lance_version = 0;
1147 }
1148
1149 for edge_type in &self.touched_edge_types {
1151 if let Ok(table) = lancedb_store.open_delta_table(edge_type, "fwd").await {
1152 let count = table
1153 .count_rows(None)
1154 .await
1155 .map_err(|e| UniError::Internal(anyhow::anyhow!("Count rows failed: {}", e)))?;
1156
1157 let current_snap =
1158 manifest
1159 .edges
1160 .entry(edge_type.to_string())
1161 .or_insert(EdgeSnapshot {
1162 version: 0,
1163 count: 0,
1164 lance_version: 0,
1165 });
1166 current_snap.count = count as u64;
1167 current_snap.lance_version = 0;
1169 }
1170 }
1171
1172 self.db
1174 .storage
1175 .snapshot_manager()
1176 .save_snapshot(&manifest)
1177 .await
1178 .map_err(UniError::Internal)?;
1179 self.db
1180 .storage
1181 .snapshot_manager()
1182 .set_latest_snapshot(&manifest.snapshot_id)
1183 .await
1184 .map_err(UniError::Internal)?;
1185
1186 self.db.schema.save().await.map_err(UniError::Internal)?;
1188
1189 let schema = self.db.storage.schema_manager().schema();
1192 for edge_type_name in &self.touched_edge_types {
1193 if let Some(meta) = schema.edge_types.get(edge_type_name.as_str()) {
1194 let type_id = meta.id;
1195 for &dir in uni_store::storage::direction::Direction::Both.expand() {
1196 let _ = self.db.storage.warm_adjacency(type_id, dir, None).await;
1197 }
1198 }
1199 }
1200
1201 self.committed = true;
1202 self.stats.duration = self.start_time.elapsed();
1203 Ok(self.stats)
1204 }
1205
1206 pub async fn abort(mut self) -> Result<()> {
1217 if self.committed {
1218 return Err(anyhow!("Cannot abort: bulk load already committed"));
1219 }
1220
1221 self.pending_vertices.clear();
1223 self.pending_edges.clear();
1224 self.buffer_size_bytes = 0;
1225
1226 let lancedb_store = self.db.storage.lancedb_store();
1228 let mut rollback_errors = Vec::new();
1229 let mut rolled_back_count = 0;
1230 let mut dropped_count = 0;
1231
1232 for (table_name, initial_version) in &self.initial_table_versions {
1233 match initial_version {
1234 Some(version) => {
1235 match lancedb_store.rollback_table(table_name, *version).await {
1237 Ok(()) => {
1238 log::info!("Rolled back table '{}' to version {}", table_name, version);
1239 rolled_back_count += 1;
1240 }
1241 Err(e) => {
1242 rollback_errors.push(format!("{}: {}", table_name, e));
1243 }
1244 }
1245 }
1246 None => {
1247 match lancedb_store.drop_table(table_name).await {
1249 Ok(()) => {
1250 log::info!("Dropped table '{}' (created during bulk load)", table_name);
1251 dropped_count += 1;
1252 }
1253 Err(e) => {
1254 rollback_errors.push(format!("{}: {}", table_name, e));
1255 }
1256 }
1257 }
1258 }
1259 }
1260
1261 self.db.storage.clear_table_cache();
1263
1264 if rollback_errors.is_empty() {
1265 log::info!(
1266 "Bulk load aborted successfully. Rolled back {} tables, dropped {} tables.",
1267 rolled_back_count,
1268 dropped_count
1269 );
1270 Ok(())
1271 } else {
1272 Err(anyhow!(
1273 "Bulk load abort had {} rollback errors: {}",
1274 rollback_errors.len(),
1275 rollback_errors.join("; ")
1276 ))
1277 }
1278 }
1279
1280 fn report_progress(&self, phase: BulkPhase, rows: usize, label: Option<String>) {
1281 if let Some(cb) = &self.progress_callback {
1282 cb(BulkProgress {
1283 phase,
1284 rows_processed: rows,
1285 total_rows: None,
1286 current_label: label,
1287 elapsed: self.start_time.elapsed(),
1288 });
1289 }
1290 }
1291}