1use crate::api::UniInner;
30use anyhow::{Result, anyhow};
31use chrono::Utc;
32use std::collections::{HashMap, HashSet};
33use std::sync::Arc;
34use std::sync::atomic::{AtomicBool, Ordering};
35use std::time::{Duration, Instant};
36use uni_common::Value;
37use uni_common::core::id::{Eid, Vid};
38use uni_common::core::snapshot::{EdgeSnapshot, LabelSnapshot, SnapshotManifest};
39use uni_common::{Properties, UniError};
40use uni_store::storage::delta::{L1Entry, Op};
41use uni_store::storage::main_edge::MainEdgeDataset;
42use uni_store::storage::main_vertex::MainVertexDataset;
43use uni_store::storage::{IndexManager, IndexRebuildManager};
44use uuid::Uuid;
45
46pub trait IntoArrow {
51 fn into_property_maps(self) -> Vec<HashMap<String, Value>>;
53}
54
55impl IntoArrow for Vec<HashMap<String, Value>> {
56 fn into_property_maps(self) -> Vec<HashMap<String, Value>> {
57 self
58 }
59}
60
61impl IntoArrow for arrow_array::RecordBatch {
62 fn into_property_maps(self) -> Vec<HashMap<String, Value>> {
63 let schema = self.schema();
64 let num_rows = self.num_rows();
65 let mut rows = Vec::with_capacity(num_rows);
66 for row_idx in 0..num_rows {
67 let mut props = HashMap::with_capacity(schema.fields().len());
68 for (col_idx, field) in schema.fields().iter().enumerate() {
69 let col = self.column(col_idx);
70 let value =
71 uni_store::storage::arrow_convert::arrow_to_value(col.as_ref(), row_idx, None);
72 if !value.is_null() {
73 props.insert(field.name().clone(), value);
74 }
75 }
76 rows.push(props);
77 }
78 rows
79 }
80}
81
82pub struct BulkWriterBuilder {
84 db: Arc<UniInner>,
85 config: BulkConfig,
86 progress_callback: Option<Box<dyn Fn(BulkProgress) + Send>>,
87 session_write_guard: Option<Arc<AtomicBool>>,
89 guard_pre_acquired: bool,
91 is_pinned: bool,
93 session_id: String,
95}
96
97impl BulkWriterBuilder {
98 pub(crate) fn new_with_guard(db: Arc<UniInner>, guard: Arc<AtomicBool>) -> Self {
102 Self {
103 db,
104 config: BulkConfig::default(),
105 progress_callback: None,
106 session_write_guard: Some(guard),
107 guard_pre_acquired: true,
108 is_pinned: false,
109 session_id: String::new(),
110 }
111 }
112
113 pub(crate) fn new_unguarded(db: Arc<UniInner>) -> Self {
118 Self {
119 db,
120 config: BulkConfig::default(),
121 progress_callback: None,
122 session_write_guard: None,
123 guard_pre_acquired: true,
124 is_pinned: false,
125 session_id: String::new(),
126 }
127 }
128
129 #[allow(dead_code)] pub(crate) fn new_deferred(
135 db: Arc<UniInner>,
136 guard: Arc<AtomicBool>,
137 session_id: String,
138 is_pinned: bool,
139 ) -> Self {
140 Self {
141 db,
142 config: BulkConfig::default(),
143 progress_callback: None,
144 session_write_guard: Some(guard),
145 guard_pre_acquired: false,
146 is_pinned,
147 session_id,
148 }
149 }
150
151 pub fn defer_vector_indexes(mut self, defer: bool) -> Self {
153 self.config.defer_vector_indexes = defer;
154 self
155 }
156
157 pub fn defer_scalar_indexes(mut self, defer: bool) -> Self {
159 self.config.defer_scalar_indexes = defer;
160 self
161 }
162
163 pub fn batch_size(mut self, size: usize) -> Self {
165 self.config.batch_size = size;
166 self
167 }
168
169 pub fn on_progress<F: Fn(BulkProgress) + Send + 'static>(mut self, f: F) -> Self {
171 self.progress_callback = Some(Box::new(f));
172 self
173 }
174
175 pub fn async_indexes(mut self, async_: bool) -> Self {
185 self.config.async_indexes = async_;
186 self
187 }
188
189 pub fn validate_constraints(mut self, validate: bool) -> Self {
197 self.config.validate_constraints = validate;
198 self
199 }
200
201 pub fn max_buffer_size_bytes(mut self, size: usize) -> Self {
209 self.config.max_buffer_size_bytes = size;
210 self
211 }
212
213 pub fn build(self) -> Result<BulkWriter> {
222 if self.is_pinned {
224 return Err(UniError::ReadOnly {
225 operation: "bulk_writer".to_string(),
226 }
227 .into());
228 }
229
230 if !self.guard_pre_acquired
232 && let Some(guard) = &self.session_write_guard
233 && guard
234 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
235 .is_err()
236 {
237 return Err(UniError::WriteContextAlreadyActive {
238 session_id: self.session_id.clone(),
239 hint: "Only one Transaction, BulkWriter, or Appender can be active per Session at a time. Commit or rollback the active one first, or create a separate Session for concurrent writes.",
240 }.into());
241 }
242
243 if self.db.writer.is_none() {
244 if let Some(guard) = &self.session_write_guard {
246 guard.store(false, Ordering::SeqCst);
247 }
248 return Err(anyhow!("BulkWriter requires a writable database instance"));
249 }
250
251 Ok(BulkWriter {
252 db: self.db,
253 config: self.config,
254 progress_callback: self.progress_callback,
255 stats: BulkStats::default(),
256 start_time: Instant::now(),
257 pending_vertices: HashMap::new(),
258 pending_edges: HashMap::new(),
259 touched_labels: HashSet::new(),
260 touched_edge_types: HashSet::new(),
261 initial_table_versions: HashMap::new(),
262 buffer_size_bytes: 0,
263 committed: false,
264 session_write_guard: self.session_write_guard,
265 })
266 }
267}
268
269pub struct BulkConfig {
271 pub defer_vector_indexes: bool,
273 pub defer_scalar_indexes: bool,
275 pub batch_size: usize,
277 pub async_indexes: bool,
279 pub validate_constraints: bool,
283 pub max_buffer_size_bytes: usize,
289}
290
291impl Default for BulkConfig {
292 fn default() -> Self {
293 Self {
294 defer_vector_indexes: true,
295 defer_scalar_indexes: true,
296 batch_size: 10_000,
297 async_indexes: false,
298 validate_constraints: true,
299 max_buffer_size_bytes: 1_073_741_824, }
301 }
302}
303
304#[derive(Debug, Clone)]
305pub struct BulkProgress {
306 pub phase: BulkPhase,
307 pub rows_processed: usize,
308 pub total_rows: Option<usize>,
309 pub current_label: Option<String>,
310 pub elapsed: Duration,
311}
312
313#[derive(Debug, Clone)]
314pub enum BulkPhase {
315 Inserting,
316 RebuildingIndexes { label: String },
317 Finalizing,
318}
319
320#[derive(Debug, Clone, Default)]
321pub struct BulkStats {
322 pub vertices_inserted: usize,
323 pub edges_inserted: usize,
324 pub indexes_rebuilt: usize,
325 pub duration: Duration,
326 pub index_build_duration: Duration,
327 pub index_task_ids: Vec<String>,
329 pub indexes_pending: bool,
331}
332
333pub type BulkStatsAccumulator = BulkStats;
335
336#[derive(Debug, Clone)]
340pub struct EdgeData {
341 pub src_vid: Vid,
343 pub dst_vid: Vid,
345 pub properties: Properties,
347}
348
349impl EdgeData {
350 pub fn new(src_vid: Vid, dst_vid: Vid, properties: Properties) -> Self {
352 Self {
353 src_vid,
354 dst_vid,
355 properties,
356 }
357 }
358}
359
360pub struct BulkWriter {
369 db: Arc<UniInner>,
370 config: BulkConfig,
371 progress_callback: Option<Box<dyn Fn(BulkProgress) + Send>>,
372 stats: BulkStats,
373 start_time: Instant,
374 pending_vertices: HashMap<String, Vec<(Vid, Properties)>>,
376 pending_edges: HashMap<String, Vec<L1Entry>>,
377 touched_labels: HashSet<String>,
379 touched_edge_types: HashSet<String>,
380 initial_table_versions: HashMap<String, Option<u64>>,
383 buffer_size_bytes: usize,
385 committed: bool,
386 session_write_guard: Option<Arc<AtomicBool>>,
388}
389
390impl BulkWriter {
391 pub fn stats(&self) -> &BulkStats {
394 &self.stats
395 }
396
397 pub fn touched_labels(&self) -> Vec<String> {
399 self.touched_labels.iter().cloned().collect()
400 }
401
402 pub fn touched_edge_types(&self) -> Vec<String> {
404 self.touched_edge_types.iter().cloned().collect()
405 }
406
407 fn get_current_timestamp_micros(&self) -> i64 {
409 use std::time::{SystemTime, UNIX_EPOCH};
410 SystemTime::now()
411 .duration_since(UNIX_EPOCH)
412 .map(|d| d.as_micros() as i64)
413 .unwrap_or(0)
414 }
415
416 pub async fn insert_vertices(
429 &mut self,
430 label: &str,
431 vertices: impl IntoArrow,
432 ) -> Result<Vec<Vid>> {
433 let vertices = vertices.into_property_maps();
434 let schema = self.db.schema.schema();
435 schema
437 .labels
438 .get(label)
439 .ok_or_else(|| UniError::LabelNotFound {
440 label: label.to_string(),
441 })?;
442 if self.config.validate_constraints {
444 self.validate_vertex_batch_constraints(label, &vertices)
445 .await?;
446 }
447
448 let vids = {
450 let writer = self.db.writer.as_ref().unwrap().read().await;
451 writer
452 .allocate_vids(vertices.len())
453 .await
454 .map_err(UniError::Internal)?
455 };
456
457 let buffer = self.pending_vertices.entry(label.to_string()).or_default();
459 for (i, props) in vertices.into_iter().enumerate() {
460 self.buffer_size_bytes += Self::estimate_properties_size(&props);
461 buffer.push((vids[i], props));
462 }
463
464 self.touched_labels.insert(label.to_string());
465
466 if self.buffer_size_bytes >= self.config.max_buffer_size_bytes {
468 self.checkpoint().await?;
469 } else {
470 self.check_flush_vertices(label).await?;
472 }
473
474 self.stats.vertices_inserted += vids.len();
475 self.report_progress(
476 BulkPhase::Inserting,
477 self.stats.vertices_inserted,
478 Some(label.to_string()),
479 );
480
481 Ok(vids)
482 }
483
484 fn estimate_properties_size(props: &Properties) -> usize {
486 let mut size = 0;
487 for (key, value) in props {
488 size += key.len();
489 size += Self::estimate_value_size(value);
490 }
491 size
492 }
493
494 fn estimate_value_size(value: &Value) -> usize {
496 match value {
497 Value::Null => 1,
498 Value::Bool(_) => 1,
499 Value::Int(_) | Value::Float(_) => 8,
500 Value::String(s) => s.len(),
501 Value::Bytes(b) => b.len(),
502 Value::List(arr) => arr.iter().map(Self::estimate_value_size).sum::<usize>() + 8,
503 Value::Map(obj) => {
504 obj.iter()
505 .map(|(k, v)| k.len() + Self::estimate_value_size(v))
506 .sum::<usize>()
507 + 8
508 }
509 Value::Vector(v) => v.len() * 4,
510 _ => 16, }
512 }
513
514 async fn validate_vertex_batch_constraints(
519 &self,
520 label: &str,
521 vertices: &[Properties],
522 ) -> Result<()> {
523 let schema = self.db.schema.schema();
524
525 if let Some(props_meta) = schema.properties.get(label) {
527 for (idx, props) in vertices.iter().enumerate() {
528 for (prop_name, meta) in props_meta {
530 if !meta.nullable && props.get(prop_name).is_none_or(|v| v.is_null()) {
531 return Err(anyhow!(
532 "NOT NULL constraint violation at row {}: property '{}' cannot be null for label '{}'",
533 idx,
534 prop_name,
535 label
536 ));
537 }
538 }
539 }
540 }
541
542 for constraint in &schema.constraints {
544 if !constraint.enabled {
545 continue;
546 }
547 match &constraint.target {
548 uni_common::core::schema::ConstraintTarget::Label(l) if l == label => {}
549 _ => continue,
550 }
551
552 match &constraint.constraint_type {
553 uni_common::core::schema::ConstraintType::Unique {
554 properties: unique_props,
555 } => {
556 let mut seen_keys: HashSet<String> = HashSet::new();
558 for (idx, props) in vertices.iter().enumerate() {
559 let key = self.compute_unique_key(unique_props, props);
560 if let Some(k) = key
561 && !seen_keys.insert(k.clone())
562 {
563 return Err(anyhow!(
564 "UNIQUE constraint violation at row {}: duplicate key '{}' in batch",
565 idx,
566 k
567 ));
568 }
569 }
570
571 if let Some(buffered) = self.pending_vertices.get(label) {
573 for (idx, props) in vertices.iter().enumerate() {
574 let key = self.compute_unique_key(unique_props, props);
575 if let Some(k) = key {
576 for (_, buffered_props) in buffered {
577 let buffered_key =
578 self.compute_unique_key(unique_props, buffered_props);
579 if buffered_key.as_ref() == Some(&k) {
580 return Err(anyhow!(
581 "UNIQUE constraint violation at row {}: key '{}' conflicts with buffered data",
582 idx,
583 k
584 ));
585 }
586 }
587 }
588 }
589 }
590 }
591 uni_common::core::schema::ConstraintType::Exists { property } => {
592 for (idx, props) in vertices.iter().enumerate() {
593 if props.get(property).is_none_or(|v| v.is_null()) {
594 return Err(anyhow!(
595 "EXISTS constraint violation at row {}: property '{}' must exist",
596 idx,
597 property
598 ));
599 }
600 }
601 }
602 uni_common::core::schema::ConstraintType::Check { expression } => {
603 for (idx, props) in vertices.iter().enumerate() {
604 if !self.evaluate_check_expression(expression, props)? {
605 return Err(anyhow!(
606 "CHECK constraint '{}' violated at row {}: expression '{}' evaluated to false",
607 constraint.name,
608 idx,
609 expression
610 ));
611 }
612 }
613 }
614 _ => {}
615 }
616 }
617
618 Ok(())
619 }
620
621 fn compute_unique_key(&self, unique_props: &[String], props: &Properties) -> Option<String> {
623 let mut parts = Vec::new();
624 for prop in unique_props {
625 match props.get(prop) {
626 Some(v) if !v.is_null() => parts.push(v.to_string()),
627 _ => return None, }
629 }
630 Some(parts.join(":"))
631 }
632
633 fn evaluate_check_expression(&self, expression: &str, properties: &Properties) -> Result<bool> {
635 let parts: Vec<&str> = expression.split_whitespace().collect();
636 if parts.len() != 3 {
637 return Ok(true);
639 }
640
641 let prop_part = parts[0].trim_start_matches('(');
642 let prop_name = if let Some(idx) = prop_part.find('.') {
643 &prop_part[idx + 1..]
644 } else {
645 prop_part
646 };
647
648 let op = parts[1];
649 let val_str = parts[2].trim_end_matches(')');
650
651 let prop_val = match properties.get(prop_name) {
652 Some(v) => v,
653 None => return Ok(true), };
655
656 let target_val = if (val_str.starts_with('\'') && val_str.ends_with('\''))
658 || (val_str.starts_with('"') && val_str.ends_with('"'))
659 {
660 Value::String(val_str[1..val_str.len() - 1].to_string())
661 } else if let Ok(n) = val_str.parse::<i64>() {
662 Value::Int(n)
663 } else if let Ok(n) = val_str.parse::<f64>() {
664 Value::Float(n)
665 } else if let Ok(b) = val_str.parse::<bool>() {
666 Value::Bool(b)
667 } else {
668 Value::String(val_str.to_string())
669 };
670
671 match op {
672 "=" | "==" => Ok(prop_val == &target_val),
673 "!=" | "<>" => Ok(prop_val != &target_val),
674 ">" => self
675 .compare_json_values(prop_val, &target_val)
676 .map(|c| c > 0),
677 "<" => self
678 .compare_json_values(prop_val, &target_val)
679 .map(|c| c < 0),
680 ">=" => self
681 .compare_json_values(prop_val, &target_val)
682 .map(|c| c >= 0),
683 "<=" => self
684 .compare_json_values(prop_val, &target_val)
685 .map(|c| c <= 0),
686 _ => Ok(true), }
688 }
689
690 fn compare_json_values(&self, a: &Value, b: &Value) -> Result<i8> {
692 match (a, b) {
693 (Value::Int(n1), Value::Int(n2)) => Ok(n1.cmp(n2) as i8),
694 (Value::Float(f1), Value::Float(f2)) => {
695 if f1 < f2 {
696 Ok(-1)
697 } else if f1 > f2 {
698 Ok(1)
699 } else {
700 Ok(0)
701 }
702 }
703 (Value::Int(n), Value::Float(f)) => {
704 let nf = *n as f64;
705 if nf < *f {
706 Ok(-1)
707 } else if nf > *f {
708 Ok(1)
709 } else {
710 Ok(0)
711 }
712 }
713 (Value::Float(f), Value::Int(n)) => {
714 let nf = *n as f64;
715 if *f < nf {
716 Ok(-1)
717 } else if *f > nf {
718 Ok(1)
719 } else {
720 Ok(0)
721 }
722 }
723 (Value::String(s1), Value::String(s2)) => match s1.cmp(s2) {
724 std::cmp::Ordering::Less => Ok(-1),
725 std::cmp::Ordering::Greater => Ok(1),
726 std::cmp::Ordering::Equal => Ok(0),
727 },
728 _ => Err(anyhow!(
729 "Cannot compare incompatible types: {:?} vs {:?}",
730 a,
731 b
732 )),
733 }
734 }
735
736 async fn checkpoint(&mut self) -> Result<()> {
741 log::debug!(
742 "Checkpoint triggered at {} bytes (limit: {})",
743 self.buffer_size_bytes,
744 self.config.max_buffer_size_bytes
745 );
746
747 let labels: Vec<String> = self.pending_vertices.keys().cloned().collect();
749 for label in labels {
750 self.flush_vertices_buffer(&label).await?;
751 }
752
753 let edge_types: Vec<String> = self.pending_edges.keys().cloned().collect();
755 for edge_type in edge_types {
756 self.flush_edges_buffer(&edge_type).await?;
757 }
758
759 self.buffer_size_bytes = 0;
761
762 Ok(())
763 }
764
765 async fn check_flush_vertices(&mut self, label: &str) -> Result<()> {
767 let should_flush = {
768 if let Some(buf) = self.pending_vertices.get(label) {
769 buf.len() >= self.config.batch_size
770 } else {
771 false
772 }
773 };
774
775 if should_flush {
776 self.flush_vertices_buffer(label).await?;
777 }
778 Ok(())
779 }
780
781 async fn flush_vertices_buffer(&mut self, label: &str) -> Result<()> {
786 if let Some(vertices) = self.pending_vertices.remove(label) {
787 if vertices.is_empty() {
788 return Ok(());
789 }
790
791 let table_name = uni_store::backend::table_names::vertex_table_name(label);
793 if !self.initial_table_versions.contains_key(&table_name) {
794 let backend = self.db.storage.backend();
795 let version = backend
796 .get_table_version(&table_name)
797 .await
798 .map_err(UniError::Internal)?;
799 self.initial_table_versions.insert(table_name, version);
800 }
801
802 let main_table_name =
804 uni_store::backend::table_names::main_vertex_table_name().to_string();
805 if !self.initial_table_versions.contains_key(&main_table_name) {
806 let backend = self.db.storage.backend();
807 let version = backend
808 .get_table_version(&main_table_name)
809 .await
810 .map_err(UniError::Internal)?;
811 self.initial_table_versions
812 .insert(main_table_name.clone(), version);
813 }
814
815 let ds = self
816 .db
817 .storage
818 .vertex_dataset(label)
819 .map_err(UniError::Internal)?;
820 let schema = self.db.schema.schema();
821
822 let deleted = vec![false; vertices.len()];
823 let versions = vec![1; vertices.len()]; let now = self.get_current_timestamp_micros();
827 let mut created_at: HashMap<Vid, i64> = HashMap::new();
828 let mut updated_at: HashMap<Vid, i64> = HashMap::new();
829 for (vid, _) in &vertices {
830 created_at.insert(*vid, now);
831 updated_at.insert(*vid, now);
832 }
833
834 let labels = vec![label.to_string()];
837 let vertices_with_labels: Vec<(Vid, Vec<String>, Properties)> = vertices
838 .iter()
839 .map(|(vid, props)| (*vid, labels.clone(), props.clone()))
840 .collect();
841
842 let batch = ds
843 .build_record_batch_with_timestamps(
844 &vertices_with_labels,
845 &deleted,
846 &versions,
847 &schema,
848 Some(&created_at),
849 Some(&updated_at),
850 )
851 .map_err(UniError::Internal)?;
852
853 let backend = self.db.storage.backend();
855 ds.write_batch(backend, batch, &schema)
856 .await
857 .map_err(UniError::Internal)?;
858
859 ds.ensure_default_indexes(backend)
861 .await
862 .map_err(UniError::Internal)?;
863
864 let main_vertices: Vec<(Vid, Vec<String>, Properties, bool, u64)> =
866 vertices_with_labels
867 .into_iter()
868 .map(|(vid, lbls, props)| (vid, lbls, props, false, 1u64))
869 .collect();
870
871 if !main_vertices.is_empty() {
872 let main_batch = MainVertexDataset::build_record_batch(
873 &main_vertices,
874 Some(&created_at),
875 Some(&updated_at),
876 )
877 .map_err(UniError::Internal)?;
878
879 MainVertexDataset::write_batch(backend, main_batch)
880 .await
881 .map_err(UniError::Internal)?;
882
883 MainVertexDataset::ensure_default_indexes(backend)
884 .await
885 .map_err(UniError::Internal)?;
886 }
887 }
888 Ok(())
889 }
890
891 pub async fn insert_edges(
902 &mut self,
903 edge_type: &str,
904 edges: Vec<EdgeData>,
905 ) -> Result<Vec<Eid>> {
906 let schema = self.db.schema.schema();
907 let edge_meta =
908 schema
909 .edge_types
910 .get(edge_type)
911 .ok_or_else(|| UniError::EdgeTypeNotFound {
912 edge_type: edge_type.to_string(),
913 })?;
914 let type_id = edge_meta.id;
915
916 let mut eids = Vec::with_capacity(edges.len());
918 {
919 let writer = self.db.writer.as_ref().unwrap().read().await;
920 for _ in 0..edges.len() {
921 eids.push(writer.next_eid(type_id).await.map_err(UniError::Internal)?);
922 }
923 }
924
925 let now = self.get_current_timestamp_micros();
927 let mut added_size = 0usize;
928 let entries: Vec<L1Entry> = edges
929 .into_iter()
930 .enumerate()
931 .map(|(i, edge)| {
932 added_size += 32 + Self::estimate_properties_size(&edge.properties);
934 L1Entry {
935 src_vid: edge.src_vid,
936 dst_vid: edge.dst_vid,
937 eid: eids[i],
938 op: Op::Insert,
939 version: 1,
940 properties: edge.properties,
941 created_at: Some(now),
942 updated_at: Some(now),
943 }
944 })
945 .collect();
946 self.buffer_size_bytes += added_size;
947 self.pending_edges
948 .entry(edge_type.to_string())
949 .or_default()
950 .extend(entries);
951
952 self.touched_edge_types.insert(edge_type.to_string());
953
954 if self.buffer_size_bytes >= self.config.max_buffer_size_bytes {
956 self.checkpoint().await?;
957 } else {
958 self.check_flush_edges(edge_type).await?;
959 }
960
961 self.stats.edges_inserted += eids.len();
962 self.report_progress(
963 BulkPhase::Inserting,
964 self.stats.vertices_inserted + self.stats.edges_inserted,
965 Some(edge_type.to_string()),
966 );
967
968 Ok(eids)
969 }
970
971 async fn check_flush_edges(&mut self, edge_type: &str) -> Result<()> {
973 let should_flush = self
974 .pending_edges
975 .get(edge_type)
976 .is_some_and(|buf| buf.len() >= self.config.batch_size);
977
978 if should_flush {
979 self.flush_edges_buffer(edge_type).await?;
980 }
981 Ok(())
982 }
983
984 #[expect(
989 clippy::map_entry,
990 reason = "async code between contains_key and insert"
991 )]
992 async fn flush_edges_buffer(&mut self, edge_type: &str) -> Result<()> {
993 if let Some(entries) = self.pending_edges.remove(edge_type) {
994 if entries.is_empty() {
995 return Ok(());
996 }
997
998 let schema = self.db.schema.schema();
999 let backend = self.db.storage.backend();
1000
1001 let fwd_table_name =
1003 uni_store::backend::table_names::delta_table_name(edge_type, "fwd");
1004 if !self.initial_table_versions.contains_key(&fwd_table_name) {
1005 let version = backend
1006 .get_table_version(&fwd_table_name)
1007 .await
1008 .map_err(UniError::Internal)?;
1009 self.initial_table_versions.insert(fwd_table_name, version);
1010 }
1011 let bwd_table_name =
1012 uni_store::backend::table_names::delta_table_name(edge_type, "bwd");
1013 if !self.initial_table_versions.contains_key(&bwd_table_name) {
1014 let version = backend
1015 .get_table_version(&bwd_table_name)
1016 .await
1017 .map_err(UniError::Internal)?;
1018 self.initial_table_versions.insert(bwd_table_name, version);
1019 }
1020
1021 let main_edge_table_name =
1023 uni_store::backend::table_names::main_edge_table_name().to_string();
1024 if !self
1025 .initial_table_versions
1026 .contains_key(&main_edge_table_name)
1027 {
1028 let version = backend
1029 .get_table_version(&main_edge_table_name)
1030 .await
1031 .map_err(UniError::Internal)?;
1032 self.initial_table_versions
1033 .insert(main_edge_table_name.clone(), version);
1034 }
1035
1036 let mut fwd_entries = entries.clone();
1038 fwd_entries.sort_by_key(|e| e.src_vid);
1039 let fwd_ds = self
1040 .db
1041 .storage
1042 .delta_dataset(edge_type, "fwd")
1043 .map_err(UniError::Internal)?;
1044 let fwd_batch = fwd_ds
1045 .build_record_batch(&fwd_entries, &schema)
1046 .map_err(UniError::Internal)?;
1047 let backend = self.db.storage.backend();
1048 fwd_ds
1049 .write_run(backend, fwd_batch)
1050 .await
1051 .map_err(UniError::Internal)?;
1052 fwd_ds
1053 .ensure_eid_index(backend)
1054 .await
1055 .map_err(UniError::Internal)?;
1056
1057 let mut bwd_entries = entries.clone();
1059 bwd_entries.sort_by_key(|e| e.dst_vid);
1060 let bwd_ds = self
1061 .db
1062 .storage
1063 .delta_dataset(edge_type, "bwd")
1064 .map_err(UniError::Internal)?;
1065 let bwd_batch = bwd_ds
1066 .build_record_batch(&bwd_entries, &schema)
1067 .map_err(UniError::Internal)?;
1068 bwd_ds
1069 .write_run(backend, bwd_batch)
1070 .await
1071 .map_err(UniError::Internal)?;
1072 bwd_ds
1073 .ensure_eid_index(backend)
1074 .await
1075 .map_err(UniError::Internal)?;
1076
1077 let mut edge_created_at: HashMap<Eid, i64> = HashMap::new();
1079 let mut edge_updated_at: HashMap<Eid, i64> = HashMap::new();
1080 let main_edges: Vec<(Eid, Vid, Vid, String, Properties, bool, u64)> = entries
1081 .iter()
1082 .map(|e| {
1083 let deleted = matches!(e.op, Op::Delete);
1084 if let Some(ts) = e.created_at {
1085 edge_created_at.insert(e.eid, ts);
1086 }
1087 if let Some(ts) = e.updated_at {
1088 edge_updated_at.insert(e.eid, ts);
1089 }
1090 (
1091 e.eid,
1092 e.src_vid,
1093 e.dst_vid,
1094 edge_type.to_string(),
1095 e.properties.clone(),
1096 deleted,
1097 e.version,
1098 )
1099 })
1100 .collect();
1101
1102 if !main_edges.is_empty() {
1103 let main_batch = MainEdgeDataset::build_record_batch(
1104 &main_edges,
1105 Some(&edge_created_at),
1106 Some(&edge_updated_at),
1107 )
1108 .map_err(UniError::Internal)?;
1109
1110 MainEdgeDataset::write_batch(self.db.storage.backend(), main_batch)
1111 .await
1112 .map_err(UniError::Internal)?;
1113
1114 MainEdgeDataset::ensure_default_indexes(self.db.storage.backend())
1115 .await
1116 .map_err(UniError::Internal)?;
1117 }
1118 }
1119 Ok(())
1120 }
1121
1122 pub async fn commit(mut self) -> Result<BulkStats> {
1131 let labels: Vec<String> = self.pending_vertices.keys().cloned().collect();
1133 for label in labels {
1134 self.flush_vertices_buffer(&label).await?;
1135 }
1136
1137 let edge_types: Vec<String> = self.pending_edges.keys().cloned().collect();
1139 for edge_type in edge_types {
1140 self.flush_edges_buffer(&edge_type).await?;
1141 }
1142
1143 let index_start = Instant::now();
1144
1145 if self.config.defer_vector_indexes || self.config.defer_scalar_indexes {
1147 let labels_to_rebuild: Vec<String> = self.touched_labels.iter().cloned().collect();
1148
1149 if self.config.async_indexes && !labels_to_rebuild.is_empty() {
1150 let schema = self.db.schema.schema();
1152 for label in &labels_to_rebuild {
1153 for idx in &schema.indexes {
1154 if idx.label() == label.as_str() {
1155 let _ = self.db.schema.update_index_metadata(idx.name(), |m| {
1156 m.status = uni_common::core::schema::IndexStatus::Stale;
1157 });
1158 }
1159 }
1160 }
1161
1162 let rebuild_manager = IndexRebuildManager::new(
1163 self.db.storage.clone(),
1164 self.db.schema.clone(),
1165 self.db.config.index_rebuild.clone(),
1166 )
1167 .await
1168 .map_err(UniError::Internal)?;
1169
1170 let task_ids = rebuild_manager
1171 .schedule(labels_to_rebuild)
1172 .await
1173 .map_err(UniError::Internal)?;
1174
1175 self.stats.index_task_ids = task_ids;
1176 self.stats.indexes_pending = true;
1177
1178 let manager = Arc::new(rebuild_manager);
1179 let handle = manager.start_background_worker(self.db.shutdown_handle.subscribe());
1180 self.db.shutdown_handle.track_task(handle);
1181 } else {
1182 for label in &labels_to_rebuild {
1184 self.report_progress(
1185 BulkPhase::RebuildingIndexes {
1186 label: label.clone(),
1187 },
1188 self.stats.vertices_inserted + self.stats.edges_inserted,
1189 Some(label.clone()),
1190 );
1191 let idx_mgr = IndexManager::new(
1192 self.db.storage.base_path(),
1193 self.db.storage.schema_manager_arc(),
1194 self.db.storage.backend_arc(),
1195 );
1196 idx_mgr
1197 .rebuild_indexes_for_label(label)
1198 .await
1199 .map_err(UniError::Internal)?;
1200 self.stats.indexes_rebuilt += 1;
1201
1202 let now = chrono::Utc::now();
1204 let vtable_name = uni_store::backend::table_names::vertex_table_name(label);
1205 let row_count = self
1206 .db
1207 .storage
1208 .backend()
1209 .count_rows(&vtable_name, None)
1210 .await
1211 .ok()
1212 .map(|c| c as u64);
1213
1214 let schema = self.db.schema.schema();
1215 for idx in &schema.indexes {
1216 if idx.label() == label.as_str() {
1217 let _ = self.db.schema.update_index_metadata(idx.name(), |m| {
1218 m.status = uni_common::core::schema::IndexStatus::Online;
1219 m.last_built_at = Some(now);
1220 if let Some(count) = row_count {
1221 m.row_count_at_build = Some(count);
1222 }
1223 });
1224 }
1225 }
1226 }
1227 }
1228 }
1229
1230 self.stats.index_build_duration = index_start.elapsed();
1231
1232 self.report_progress(
1234 BulkPhase::Finalizing,
1235 self.stats.vertices_inserted + self.stats.edges_inserted,
1236 None,
1237 );
1238
1239 let mut manifest = self
1241 .db
1242 .storage
1243 .snapshot_manager()
1244 .load_latest_snapshot()
1245 .await
1246 .map_err(UniError::Internal)?
1247 .unwrap_or_else(|| {
1248 SnapshotManifest::new(
1249 Uuid::new_v4().to_string(),
1250 self.db.schema.schema().schema_version,
1251 )
1252 });
1253
1254 let parent_id = manifest.snapshot_id.clone();
1256 manifest.parent_snapshot = Some(parent_id);
1257 manifest.snapshot_id = Uuid::new_v4().to_string();
1258 manifest.created_at = Utc::now();
1259
1260 let backend = self.db.storage.backend();
1262 for label in &self.touched_labels {
1263 let vtable_name = uni_store::backend::table_names::vertex_table_name(label);
1264 let count = backend
1265 .count_rows(&vtable_name, None)
1266 .await
1267 .map_err(UniError::Internal)?;
1268
1269 let current_snap =
1270 manifest
1271 .vertices
1272 .entry(label.to_string())
1273 .or_insert(LabelSnapshot {
1274 version: 0,
1275 count: 0,
1276 lance_version: 0,
1277 });
1278 current_snap.count = count as u64;
1279 current_snap.lance_version = 0;
1281 }
1282
1283 for edge_type in &self.touched_edge_types {
1285 let delta_name = uni_store::backend::table_names::delta_table_name(edge_type, "fwd");
1286 if let Ok(count) = backend.count_rows(&delta_name, None).await {
1287 let current_snap =
1288 manifest
1289 .edges
1290 .entry(edge_type.to_string())
1291 .or_insert(EdgeSnapshot {
1292 version: 0,
1293 count: 0,
1294 lance_version: 0,
1295 });
1296 current_snap.count = count as u64;
1297 current_snap.lance_version = 0;
1299 }
1300 }
1301
1302 self.db
1304 .storage
1305 .snapshot_manager()
1306 .save_snapshot(&manifest)
1307 .await
1308 .map_err(UniError::Internal)?;
1309 self.db
1310 .storage
1311 .snapshot_manager()
1312 .set_latest_snapshot(&manifest.snapshot_id)
1313 .await
1314 .map_err(UniError::Internal)?;
1315
1316 self.db.schema.save().await.map_err(UniError::Internal)?;
1318
1319 let schema = self.db.storage.schema_manager().schema();
1322 for edge_type_name in &self.touched_edge_types {
1323 if let Some(meta) = schema.edge_types.get(edge_type_name.as_str()) {
1324 let type_id = meta.id;
1325 for &dir in uni_store::storage::direction::Direction::Both.expand() {
1326 let _ = self.db.storage.warm_adjacency(type_id, dir, None).await;
1327 }
1328 }
1329 }
1330
1331 self.committed = true;
1332 self.release_guard();
1333 self.stats.duration = self.start_time.elapsed();
1334 Ok(self.stats.clone())
1335 }
1336
1337 pub async fn abort(mut self) -> Result<()> {
1348 if self.committed {
1349 return Err(anyhow!("Cannot abort: bulk load already committed"));
1350 }
1351
1352 self.pending_vertices.clear();
1354 self.pending_edges.clear();
1355 self.buffer_size_bytes = 0;
1356
1357 let backend = self.db.storage.backend();
1359 let mut rollback_errors = Vec::new();
1360 let mut rolled_back_count = 0;
1361 let mut dropped_count = 0;
1362
1363 for (table_name, initial_version) in &self.initial_table_versions {
1364 match initial_version {
1365 Some(version) => {
1366 match backend.rollback_table(table_name, *version).await {
1368 Ok(()) => {
1369 log::info!("Rolled back table '{}' to version {}", table_name, version);
1370 rolled_back_count += 1;
1371 }
1372 Err(e) => {
1373 rollback_errors.push(format!("{}: {}", table_name, e));
1374 }
1375 }
1376 }
1377 None => {
1378 match backend.drop_table(table_name).await {
1380 Ok(()) => {
1381 log::info!("Dropped table '{}' (created during bulk load)", table_name);
1382 dropped_count += 1;
1383 }
1384 Err(e) => {
1385 rollback_errors.push(format!("{}: {}", table_name, e));
1386 }
1387 }
1388 }
1389 }
1390 }
1391
1392 self.db.storage.backend().clear_cache();
1394
1395 self.release_guard();
1397
1398 if rollback_errors.is_empty() {
1399 log::info!(
1400 "Bulk load aborted successfully. Rolled back {} tables, dropped {} tables.",
1401 rolled_back_count,
1402 dropped_count
1403 );
1404 Ok(())
1405 } else {
1406 Err(anyhow!(
1407 "Bulk load abort had {} rollback errors: {}",
1408 rollback_errors.len(),
1409 rollback_errors.join("; ")
1410 ))
1411 }
1412 }
1413
1414 fn report_progress(&self, phase: BulkPhase, rows: usize, label: Option<String>) {
1415 if let Some(cb) = &self.progress_callback {
1416 cb(BulkProgress {
1417 phase,
1418 rows_processed: rows,
1419 total_rows: None,
1420 current_label: label,
1421 elapsed: self.start_time.elapsed(),
1422 });
1423 }
1424 }
1425
1426 fn release_guard(&self) {
1428 if let Some(guard) = &self.session_write_guard {
1429 guard.store(false, Ordering::SeqCst);
1430 }
1431 }
1432}
1433
1434impl Drop for BulkWriter {
1435 fn drop(&mut self) {
1436 if !self.committed {
1437 self.release_guard();
1439 }
1440 }
1441}