1use crate::runtime::context::QueryContext;
5use crate::runtime::id_allocator::IdAllocator;
6use crate::runtime::l0::{L0Buffer, serialize_constraint_key};
7use crate::runtime::l0_manager::L0Manager;
8use crate::runtime::property_manager::PropertyManager;
9use crate::runtime::wal::WriteAheadLog;
10use crate::storage::adjacency_manager::AdjacencyManager;
11use crate::storage::delta::{L1Entry, Op};
12use crate::storage::main_edge::MainEdgeDataset;
13use crate::storage::main_vertex::MainVertexDataset;
14use crate::storage::manager::StorageManager;
15use anyhow::{Result, anyhow};
16use chrono::Utc;
17use futures::TryStreamExt;
18use metrics;
19use parking_lot::RwLock;
20use std::collections::{HashMap, HashSet};
21use std::sync::Arc;
22use tracing::{debug, info, instrument};
23use uni_common::Properties;
24use uni_common::Value;
25use uni_common::config::UniConfig;
26use uni_common::core::id::{Eid, Vid};
27use uni_common::core::schema::{ConstraintTarget, ConstraintType, IndexDefinition};
28use uni_common::core::snapshot::{EdgeSnapshot, LabelSnapshot, SnapshotManifest};
29use uni_xervo::runtime::ModelRuntime;
30use uuid::Uuid;
31
32#[derive(Clone, Debug)]
33pub struct WriterConfig {
34 pub max_mutations: usize,
35}
36
37impl Default for WriterConfig {
38 fn default() -> Self {
39 Self {
40 max_mutations: 10_000,
41 }
42 }
43}
44
45pub struct Writer {
46 pub l0_manager: Arc<L0Manager>,
47 pub storage: Arc<StorageManager>,
48 pub schema_manager: Arc<uni_common::core::schema::SchemaManager>,
49 pub allocator: Arc<IdAllocator>,
50 pub config: UniConfig,
51 pub xervo_runtime: Option<Arc<ModelRuntime>>,
52 pub transaction_l0: Option<Arc<RwLock<L0Buffer>>>,
53 pub property_manager: Option<Arc<PropertyManager>>,
55 adjacency_manager: Arc<AdjacencyManager>,
57 last_flush_time: std::time::Instant,
59 compaction_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
61 index_rebuild_manager: Option<Arc<crate::storage::index_rebuild::IndexRebuildManager>>,
63}
64
65impl Writer {
66 pub async fn new(
67 storage: Arc<StorageManager>,
68 schema_manager: Arc<uni_common::core::schema::SchemaManager>,
69 start_version: u64,
70 ) -> Result<Self> {
71 Self::new_with_config(
72 storage,
73 schema_manager,
74 start_version,
75 UniConfig::default(),
76 None,
77 None,
78 )
79 .await
80 }
81
82 pub async fn new_with_config(
83 storage: Arc<StorageManager>,
84 schema_manager: Arc<uni_common::core::schema::SchemaManager>,
85 start_version: u64,
86 config: UniConfig,
87 wal: Option<Arc<WriteAheadLog>>,
88 allocator: Option<Arc<IdAllocator>>,
89 ) -> Result<Self> {
90 let allocator = if let Some(a) = allocator {
91 a
92 } else {
93 let store = storage.store();
94 let path = object_store::path::Path::from("id_allocator.json");
95 Arc::new(IdAllocator::new(store, path, 1000).await?)
96 };
97
98 let l0_manager = Arc::new(L0Manager::new(start_version, wal));
99
100 let property_manager = Some(Arc::new(PropertyManager::new(
101 storage.clone(),
102 schema_manager.clone(),
103 1000,
104 )));
105
106 let adjacency_manager = storage.adjacency_manager();
107
108 Ok(Self {
109 l0_manager,
110 storage,
111 schema_manager,
112 allocator,
113 config,
114 xervo_runtime: None,
115 transaction_l0: None,
116 property_manager,
117 adjacency_manager,
118 last_flush_time: std::time::Instant::now(),
119 compaction_handle: Arc::new(RwLock::new(None)),
120 index_rebuild_manager: None,
121 })
122 }
123
124 pub fn set_index_rebuild_manager(
126 &mut self,
127 manager: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
128 ) {
129 self.index_rebuild_manager = Some(manager);
130 }
131
132 pub async fn replay_wal(&self, wal_high_water_mark: u64) -> Result<usize> {
134 let l0 = self.l0_manager.get_current();
135 let wal = l0.read().wal.clone();
136
137 if let Some(wal) = wal {
138 wal.initialize().await?;
139 let mutations = wal.replay_since(wal_high_water_mark).await?;
140 let count = mutations.len();
141
142 if count > 0 {
143 log::info!(
144 "Replaying {} mutations from WAL (LSN > {})",
145 count,
146 wal_high_water_mark
147 );
148 let mut l0_guard = l0.write();
149 l0_guard.replay_mutations(mutations)?;
150 }
151
152 Ok(count)
153 } else {
154 Ok(0)
155 }
156 }
157
158 pub async fn next_vid(&self) -> Result<Vid> {
160 self.allocator.allocate_vid().await
161 }
162
163 pub async fn allocate_vids(&self, count: usize) -> Result<Vec<Vid>> {
166 self.allocator.allocate_vids(count).await
167 }
168
169 pub async fn next_eid(&self, _type_id: u32) -> Result<Eid> {
171 self.allocator.allocate_eid().await
172 }
173
174 pub fn set_xervo_runtime(&mut self, runtime: Arc<ModelRuntime>) {
175 self.xervo_runtime = Some(runtime);
176 }
177
178 pub fn xervo_runtime(&self) -> Option<Arc<ModelRuntime>> {
179 self.xervo_runtime.clone()
180 }
181
182 pub fn begin_transaction(&mut self) -> Result<()> {
183 if self.transaction_l0.is_some() {
184 return Err(anyhow!("Transaction already active"));
185 }
186 let current_version = self.l0_manager.get_current().read().current_version;
187 self.transaction_l0 = Some(Arc::new(RwLock::new(L0Buffer::new(current_version, None))));
189 Ok(())
190 }
191
192 fn active_l0(&self) -> Arc<RwLock<L0Buffer>> {
195 self.transaction_l0
196 .clone()
197 .unwrap_or_else(|| self.l0_manager.get_current())
198 }
199
200 fn update_metrics(&self) {
201 let l0 = self.l0_manager.get_current();
202 let size = l0.read().estimated_size;
203 metrics::gauge!("l0_buffer_size_bytes").set(size as f64);
204
205 if let Some(tx_l0) = &self.transaction_l0 {
206 metrics::gauge!("active_transactions").set(1.0);
207 let tx_size = tx_l0.read().estimated_size;
208 metrics::gauge!("transaction_l0_size_bytes").set(tx_size as f64);
209 } else {
210 metrics::gauge!("active_transactions").set(0.0);
211 metrics::gauge!("transaction_l0_size_bytes").set(0.0);
212 }
213 }
214
215 pub async fn commit_transaction(&mut self) -> Result<()> {
216 let tx_l0_arc = self
218 .transaction_l0
219 .as_ref()
220 .ok_or_else(|| anyhow!("No active transaction"))?
221 .clone();
222
223 {
227 let tx_l0 = tx_l0_arc.read();
228 let main_l0_arc = self.l0_manager.get_current();
229 let main_l0 = main_l0_arc.read();
230
231 if let Some(wal) = main_l0.wal.as_ref() {
233 for (vid, properties) in &tx_l0.vertex_properties {
238 if !tx_l0.vertex_tombstones.contains(vid) {
239 let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
240 wal.append(&crate::runtime::wal::Mutation::InsertVertex {
241 vid: *vid,
242 properties: properties.clone(),
243 labels,
244 })?;
245 }
246 }
247
248 for vid in &tx_l0.vertex_tombstones {
250 let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
251 wal.append(&crate::runtime::wal::Mutation::DeleteVertex { vid: *vid, labels })?;
252 }
253
254 for (eid, (src_vid, dst_vid, edge_type)) in &tx_l0.edge_endpoints {
256 if tx_l0.tombstones.contains_key(eid) {
257 let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
259 wal.append(&crate::runtime::wal::Mutation::DeleteEdge {
260 eid: *eid,
261 src_vid: *src_vid,
262 dst_vid: *dst_vid,
263 edge_type: *edge_type,
264 version,
265 })?;
266 } else {
267 let properties =
269 tx_l0.edge_properties.get(eid).cloned().unwrap_or_default();
270 let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
271 let edge_type_name = tx_l0.edge_types.get(eid).cloned();
272 wal.append(&crate::runtime::wal::Mutation::InsertEdge {
273 src_vid: *src_vid,
274 dst_vid: *dst_vid,
275 edge_type: *edge_type,
276 eid: *eid,
277 version,
278 properties,
279 edge_type_name,
280 })?;
281 }
282 }
283 }
284 }
285
286 self.flush_wal().await?;
289
290 {
292 let tx_l0 = tx_l0_arc.read();
293 let main_l0_arc = self.l0_manager.get_current();
294 let mut main_l0 = main_l0_arc.write();
295 main_l0.merge(&tx_l0)?;
296
297 for (eid, (src, dst, etype)) in &tx_l0.edge_endpoints {
301 let edge_version = tx_l0
302 .edge_versions
303 .get(eid)
304 .copied()
305 .unwrap_or(main_l0.current_version);
306 if tx_l0.tombstones.contains_key(eid) {
307 self.adjacency_manager
308 .add_tombstone(*eid, *src, *dst, *etype, edge_version);
309 } else {
310 self.adjacency_manager
311 .insert_edge(*src, *dst, *eid, *etype, edge_version);
312 }
313 }
314 }
315
316 self.update_metrics();
317
318 self.transaction_l0 = None;
320
321 if let Err(e) = self.check_flush().await {
323 tracing::warn!("Post-commit flush check failed (non-critical): {}", e);
324 }
325
326 Ok(())
327 }
328
329 pub async fn flush_wal(&self) -> Result<()> {
331 let l0 = self.l0_manager.get_current();
332 let wal = l0.read().wal.clone();
333
334 if let Some(wal) = wal {
335 wal.flush().await?;
336 }
337 Ok(())
338 }
339
340 pub fn rollback_transaction(&mut self) -> Result<()> {
341 self.transaction_l0 = None;
343 Ok(())
344 }
345
346 pub fn force_rollback(&mut self) {
349 if self.transaction_l0.take().is_some() {
350 tracing::warn!("Force-rolled back leaked transaction");
351 }
352 }
353
354 async fn validate_vertex_constraints_for_label(
357 &self,
358 vid: Vid,
359 properties: &Properties,
360 label: &str,
361 ) -> Result<()> {
362 let schema = self.schema_manager.schema();
363
364 {
365 if let Some(props_meta) = schema.properties.get(label) {
367 for (prop_name, meta) in props_meta {
368 if !meta.nullable && properties.get(prop_name).is_none_or(|v| v.is_null()) {
369 log::warn!(
370 "Constraint violation: Property '{}' cannot be null for label '{}'",
371 prop_name,
372 label
373 );
374 return Err(anyhow!(
375 "Constraint violation: Property '{}' cannot be null",
376 prop_name
377 ));
378 }
379 }
380 }
381
382 for constraint in &schema.constraints {
384 if !constraint.enabled {
385 continue;
386 }
387 match &constraint.target {
388 ConstraintTarget::Label(l) if l == label => {}
389 _ => continue,
390 }
391
392 match &constraint.constraint_type {
393 ConstraintType::Unique {
394 properties: unique_props,
395 } => {
396 if !unique_props.is_empty() {
398 let mut key_values = Vec::new();
399 let mut missing = false;
400 for prop in unique_props {
401 if let Some(val) = properties.get(prop) {
402 key_values.push((prop.clone(), val.clone()));
403 } else {
404 missing = true; }
409 }
410
411 if !missing {
412 self.check_unique_constraint_multi(label, &key_values, vid)
413 .await?;
414 }
415 }
416 }
417 ConstraintType::Exists { property } => {
418 if properties.get(property).is_none_or(|v| v.is_null()) {
419 log::warn!(
420 "Constraint violation: Property '{}' must exist for label '{}'",
421 property,
422 label
423 );
424 return Err(anyhow!(
425 "Constraint violation: Property '{}' must exist",
426 property
427 ));
428 }
429 }
430 ConstraintType::Check { expression } => {
431 if !self.evaluate_check_constraint(expression, properties)? {
432 return Err(anyhow!(
433 "CHECK constraint '{}' violated: expression '{}' evaluated to false",
434 constraint.name,
435 expression
436 ));
437 }
438 }
439 _ => {
440 return Err(anyhow!("Unsupported constraint type"));
441 }
442 }
443 }
444 }
445 Ok(())
446 }
447
448 async fn validate_vertex_constraints(
452 &self,
453 vid: Vid,
454 properties: &Properties,
455 labels: &[String],
456 ) -> Result<()> {
457 let schema = self.schema_manager.schema();
458
459 for label in labels {
461 if schema.get_label_case_insensitive(label).is_none() {
463 continue;
464 }
465 self.validate_vertex_constraints_for_label(vid, properties, label)
466 .await?;
467 }
468
469 if let Some(ext_id) = properties.get("ext_id").and_then(|v| v.as_str()) {
471 self.check_extid_globally_unique(ext_id, vid).await?;
472 }
473
474 Ok(())
475 }
476
477 fn collect_constraint_keys_from_properties<'a>(
481 properties_iter: impl Iterator<Item = &'a Properties>,
482 label: &str,
483 constraints: &[uni_common::core::schema::Constraint],
484 existing_keys: &mut HashMap<String, HashSet<String>>,
485 existing_extids: &mut HashSet<String>,
486 ) {
487 for props in properties_iter {
488 if let Some(ext_id) = props.get("ext_id").and_then(|v| v.as_str()) {
489 existing_extids.insert(ext_id.to_string());
490 }
491
492 for constraint in constraints {
493 if !constraint.enabled {
494 continue;
495 }
496 if let ConstraintTarget::Label(l) = &constraint.target {
497 if l != label {
498 continue;
499 }
500 } else {
501 continue;
502 }
503
504 if let ConstraintType::Unique {
505 properties: unique_props,
506 } = &constraint.constraint_type
507 {
508 let mut key_parts = Vec::new();
509 let mut all_present = true;
510 for prop in unique_props {
511 if let Some(val) = props.get(prop) {
512 key_parts.push(format!("{}:{}", prop, val));
513 } else {
514 all_present = false;
515 break;
516 }
517 }
518 if all_present {
519 let key = key_parts.join("|");
520 existing_keys
521 .entry(constraint.name.clone())
522 .or_default()
523 .insert(key);
524 }
525 }
526 }
527 }
528 }
529
530 async fn validate_vertex_batch_constraints(
545 &self,
546 vids: &[Vid],
547 properties_batch: &[Properties],
548 label: &str,
549 ) -> Result<()> {
550 if vids.len() != properties_batch.len() {
551 return Err(anyhow!("VID/properties length mismatch"));
552 }
553
554 let schema = self.schema_manager.schema();
555
556 if let Some(props_meta) = schema.properties.get(label) {
558 for (idx, properties) in properties_batch.iter().enumerate() {
559 for (prop_name, meta) in props_meta {
560 if !meta.nullable && properties.get(prop_name).is_none_or(|v| v.is_null()) {
561 return Err(anyhow!(
562 "Constraint violation at index {}: Property '{}' cannot be null",
563 idx,
564 prop_name
565 ));
566 }
567 }
568 }
569 }
570
571 let mut existing_keys: HashMap<String, HashSet<String>> = HashMap::new();
573 let mut existing_extids: HashSet<String> = HashSet::new();
574
575 {
577 let l0 = self.l0_manager.get_current();
578 let l0_guard = l0.read();
579 Self::collect_constraint_keys_from_properties(
580 l0_guard.vertex_properties.values(),
581 label,
582 &schema.constraints,
583 &mut existing_keys,
584 &mut existing_extids,
585 );
586 }
587
588 if let Some(tx_l0) = &self.transaction_l0 {
590 let tx_l0_guard = tx_l0.read();
591 Self::collect_constraint_keys_from_properties(
592 tx_l0_guard.vertex_properties.values(),
593 label,
594 &schema.constraints,
595 &mut existing_keys,
596 &mut existing_extids,
597 );
598 }
599
600 let mut batch_keys: HashMap<String, HashMap<String, usize>> = HashMap::new();
602 let mut batch_extids: HashMap<String, usize> = HashMap::new();
603
604 for (idx, (_vid, properties)) in vids.iter().zip(properties_batch.iter()).enumerate() {
605 if let Some(ext_id) = properties.get("ext_id").and_then(|v| v.as_str()) {
607 if existing_extids.contains(ext_id) {
608 return Err(anyhow!(
609 "Constraint violation at index {}: ext_id '{}' already exists",
610 idx,
611 ext_id
612 ));
613 }
614 if let Some(first_idx) = batch_extids.get(ext_id) {
615 return Err(anyhow!(
616 "Constraint violation: ext_id '{}' duplicated in batch at indices {} and {}",
617 ext_id,
618 first_idx,
619 idx
620 ));
621 }
622 batch_extids.insert(ext_id.to_string(), idx);
623 }
624
625 for constraint in &schema.constraints {
627 if !constraint.enabled {
628 continue;
629 }
630 if let ConstraintTarget::Label(l) = &constraint.target {
631 if l != label {
632 continue;
633 }
634 } else {
635 continue;
636 }
637
638 match &constraint.constraint_type {
639 ConstraintType::Unique {
640 properties: unique_props,
641 } => {
642 let mut key_parts = Vec::new();
643 let mut all_present = true;
644 for prop in unique_props {
645 if let Some(val) = properties.get(prop) {
646 key_parts.push(format!("{}:{}", prop, val));
647 } else {
648 all_present = false;
649 break;
650 }
651 }
652
653 if all_present {
654 let key = key_parts.join("|");
655
656 if let Some(keys) = existing_keys.get(&constraint.name)
658 && keys.contains(&key)
659 {
660 return Err(anyhow!(
661 "Constraint violation at index {}: Duplicate composite key for label '{}' (constraint '{}')",
662 idx,
663 label,
664 constraint.name
665 ));
666 }
667
668 let batch_constraint_keys =
670 batch_keys.entry(constraint.name.clone()).or_default();
671 if let Some(first_idx) = batch_constraint_keys.get(&key) {
672 return Err(anyhow!(
673 "Constraint violation: Duplicate key '{}' in batch at indices {} and {}",
674 key,
675 first_idx,
676 idx
677 ));
678 }
679 batch_constraint_keys.insert(key, idx);
680 }
681 }
682 ConstraintType::Exists { property } => {
683 if properties.get(property).is_none_or(|v| v.is_null()) {
684 return Err(anyhow!(
685 "Constraint violation at index {}: Property '{}' must exist",
686 idx,
687 property
688 ));
689 }
690 }
691 ConstraintType::Check { expression } => {
692 if !self.evaluate_check_constraint(expression, properties)? {
693 return Err(anyhow!(
694 "Constraint violation at index {}: CHECK constraint '{}' violated",
695 idx,
696 constraint.name
697 ));
698 }
699 }
700 _ => {}
701 }
702 }
703 }
704
705 for constraint in &schema.constraints {
707 if !constraint.enabled {
708 continue;
709 }
710 if let ConstraintTarget::Label(l) = &constraint.target {
711 if l != label {
712 continue;
713 }
714 } else {
715 continue;
716 }
717
718 if let ConstraintType::Unique {
719 properties: unique_props,
720 } = &constraint.constraint_type
721 {
722 let mut or_filters = Vec::new();
724 for properties in properties_batch.iter() {
725 let mut and_parts = Vec::new();
726 let mut all_present = true;
727 for prop in unique_props {
728 if let Some(val) = properties.get(prop) {
729 let val_str = match val {
730 Value::String(s) => format!("'{}'", s.replace('\'', "''")),
731 Value::Int(n) => n.to_string(),
732 Value::Float(f) => f.to_string(),
733 Value::Bool(b) => b.to_string(),
734 _ => {
735 all_present = false;
736 break;
737 }
738 };
739 and_parts.push(format!("{} = {}", prop, val_str));
740 } else {
741 all_present = false;
742 break;
743 }
744 }
745 if all_present {
746 or_filters.push(format!("({})", and_parts.join(" AND ")));
747 }
748 }
749
750 if !or_filters.is_empty() {
751 let vid_list: Vec<String> =
752 vids.iter().map(|v| v.as_u64().to_string()).collect();
753 let filter = format!(
754 "({}) AND _deleted = false AND _vid NOT IN ({})",
755 or_filters.join(" OR "),
756 vid_list.join(", ")
757 );
758
759 if let Ok(ds) = self.storage.vertex_dataset(label)
760 && let Ok(lance_ds) = ds.open_raw().await
761 {
762 let count = lance_ds.count_rows(Some(filter.clone())).await?;
763 if count > 0 {
764 return Err(anyhow!(
765 "Constraint violation: Duplicate composite key for label '{}' in storage (constraint '{}')",
766 label,
767 constraint.name
768 ));
769 }
770 }
771 }
772 }
773 }
774
775 Ok(())
776 }
777
778 async fn check_extid_globally_unique(&self, ext_id: &str, current_vid: Vid) -> Result<()> {
787 let l0_buffers_to_check: Vec<Arc<RwLock<L0Buffer>>> = {
789 let mut buffers = vec![self.l0_manager.get_current()];
790 if let Some(tx_l0) = &self.transaction_l0 {
791 buffers.push(tx_l0.clone());
792 }
793 buffers.extend(self.l0_manager.get_pending_flush());
794 buffers
795 };
796
797 for l0 in &l0_buffers_to_check {
798 if let Some(vid) =
799 Self::find_extid_in_properties(&l0.read().vertex_properties, ext_id, current_vid)
800 {
801 return Err(anyhow!(
802 "Constraint violation: ext_id '{}' already exists (vertex {:?})",
803 ext_id,
804 vid
805 ));
806 }
807 }
808
809 let lancedb = self.storage.lancedb_store();
812 if let Ok(Some(found_vid)) = MainVertexDataset::find_by_ext_id(lancedb, ext_id, None).await
813 && found_vid != current_vid
814 {
815 return Err(anyhow!(
816 "Constraint violation: ext_id '{}' already exists (vertex {:?})",
817 ext_id,
818 found_vid
819 ));
820 }
821
822 Ok(())
823 }
824
825 fn find_extid_in_properties(
827 vertex_properties: &HashMap<Vid, Properties>,
828 ext_id: &str,
829 current_vid: Vid,
830 ) -> Option<Vid> {
831 vertex_properties.iter().find_map(|(&vid, props)| {
832 if vid != current_vid && props.get("ext_id").and_then(|v| v.as_str()) == Some(ext_id) {
833 Some(vid)
834 } else {
835 None
836 }
837 })
838 }
839
840 fn get_vertex_labels_from_l0(&self, vid: Vid) -> Option<Vec<String>> {
842 let l0 = self.l0_manager.get_current();
843 let l0_guard = l0.read();
844 if l0_guard.vertex_tombstones.contains(&vid) {
846 return None;
847 }
848 l0_guard.get_vertex_labels(vid).map(|l| l.to_vec())
849 }
850
851 pub async fn get_vertex_labels(&self, vid: Vid) -> Option<Vec<String>> {
855 if let Some(labels) = self.get_vertex_labels_from_l0(vid) {
857 return Some(labels);
858 }
859
860 if let Some(tx_l0) = &self.transaction_l0 {
862 let guard = tx_l0.read();
863 if guard.vertex_tombstones.contains(&vid) {
864 return None;
865 }
866 if let Some(labels) = guard.get_vertex_labels(vid) {
867 return Some(labels.to_vec());
868 }
869 }
870
871 for pending_l0 in self.l0_manager.get_pending_flush() {
873 let guard = pending_l0.read();
874 if guard.vertex_tombstones.contains(&vid) {
875 return None;
876 }
877 if let Some(labels) = guard.get_vertex_labels(vid) {
878 return Some(labels.to_vec());
879 }
880 }
881
882 self.find_vertex_labels_in_storage(vid).await.ok().flatten()
884 }
885
886 fn get_edge_type_from_l0(&self, eid: Eid) -> Option<String> {
888 let l0 = self.l0_manager.get_current();
889 let l0_guard = l0.read();
890 l0_guard.get_edge_type(eid).map(|s| s.to_string())
891 }
892
893 pub fn get_edge_type_id_from_l0(&self, eid: Eid) -> Option<u32> {
896 if let Some(tx_l0) = &self.transaction_l0 {
898 let guard = tx_l0.read();
899 if let Some((_, _, etype)) = guard.get_edge_endpoint_full(eid) {
900 return Some(etype);
901 }
902 }
903 let l0 = self.l0_manager.get_current();
905 let l0_guard = l0.read();
906 l0_guard
907 .get_edge_endpoint_full(eid)
908 .map(|(_, _, etype)| etype)
909 }
910
911 pub fn set_edge_type(&self, eid: Eid, type_name: String) {
914 self.active_l0().write().set_edge_type(eid, type_name);
915 }
916
917 fn evaluate_check_constraint(&self, expression: &str, properties: &Properties) -> Result<bool> {
920 let parts: Vec<&str> = expression.split_whitespace().collect();
921 if parts.len() != 3 {
922 log::warn!(
925 "Complex CHECK constraint expression '{}' not fully supported yet; allowing write.",
926 expression
927 );
928 return Ok(true);
929 }
930
931 let prop_part = parts[0].trim_start_matches('(');
932 let prop_name = if let Some(idx) = prop_part.find('.') {
934 &prop_part[idx + 1..]
935 } else {
936 prop_part
937 };
938
939 let op = parts[1];
940 let val_str = parts[2].trim_end_matches(')');
941
942 let prop_val = match properties.get(prop_name) {
943 Some(v) => v,
944 None => return Ok(true), };
946
947 let target_val = if (val_str.starts_with('\'') && val_str.ends_with('\''))
949 || (val_str.starts_with('"') && val_str.ends_with('"'))
950 {
951 Value::String(val_str[1..val_str.len() - 1].to_string())
952 } else if let Ok(n) = val_str.parse::<i64>() {
953 Value::Int(n)
954 } else if let Ok(n) = val_str.parse::<f64>() {
955 Value::Float(n)
956 } else if let Ok(b) = val_str.parse::<bool>() {
957 Value::Bool(b)
958 } else {
959 if val_str.starts_with("Number(") && val_str.ends_with(')') {
961 let n_str = &val_str[7..val_str.len() - 1];
962 if let Ok(n) = n_str.parse::<i64>() {
963 Value::Int(n)
964 } else if let Ok(n) = n_str.parse::<f64>() {
965 Value::Float(n)
966 } else {
967 Value::String(val_str.to_string())
968 }
969 } else {
970 Value::String(val_str.to_string())
971 }
972 };
973
974 match op {
975 "=" | "==" => Ok(prop_val == &target_val),
976 "!=" | "<>" => Ok(prop_val != &target_val),
977 ">" => self
978 .compare_values(prop_val, &target_val)
979 .map(|o| o.is_gt()),
980 "<" => self
981 .compare_values(prop_val, &target_val)
982 .map(|o| o.is_lt()),
983 ">=" => self
984 .compare_values(prop_val, &target_val)
985 .map(|o| o.is_ge()),
986 "<=" => self
987 .compare_values(prop_val, &target_val)
988 .map(|o| o.is_le()),
989 _ => {
990 log::warn!("Unsupported operator '{}' in CHECK constraint", op);
991 Ok(true)
992 }
993 }
994 }
995
996 fn compare_values(&self, a: &Value, b: &Value) -> Result<std::cmp::Ordering> {
997 use std::cmp::Ordering;
998
999 fn cmp_f64(x: f64, y: f64) -> Ordering {
1000 x.partial_cmp(&y).unwrap_or(Ordering::Equal)
1001 }
1002
1003 match (a, b) {
1004 (Value::Int(n1), Value::Int(n2)) => Ok(n1.cmp(n2)),
1005 (Value::Float(f1), Value::Float(f2)) => Ok(cmp_f64(*f1, *f2)),
1006 (Value::Int(n), Value::Float(f)) => Ok(cmp_f64(*n as f64, *f)),
1007 (Value::Float(f), Value::Int(n)) => Ok(cmp_f64(*f, *n as f64)),
1008 (Value::String(s1), Value::String(s2)) => Ok(s1.cmp(s2)),
1009 _ => Err(anyhow!(
1010 "Cannot compare incompatible types: {:?} vs {:?}",
1011 a,
1012 b
1013 )),
1014 }
1015 }
1016
1017 async fn check_unique_constraint_multi(
1018 &self,
1019 label: &str,
1020 key_values: &[(String, Value)],
1021 current_vid: Vid,
1022 ) -> Result<()> {
1023 let key = serialize_constraint_key(label, key_values);
1025
1026 {
1028 let l0 = self.l0_manager.get_current();
1029 let l0_guard = l0.read();
1030 if l0_guard.has_constraint_key(&key, current_vid) {
1031 return Err(anyhow!(
1032 "Constraint violation: Duplicate composite key for label '{}'",
1033 label
1034 ));
1035 }
1036 }
1037
1038 if let Some(tx_l0) = &self.transaction_l0 {
1040 let tx_l0_guard = tx_l0.read();
1041 if tx_l0_guard.has_constraint_key(&key, current_vid) {
1042 return Err(anyhow!(
1043 "Constraint violation: Duplicate composite key for label '{}' (in tx)",
1044 label
1045 ));
1046 }
1047 }
1048
1049 let filters: Vec<String> = key_values
1051 .iter()
1052 .map(|(prop, val)| {
1053 let val_str = match val {
1054 Value::String(s) => format!("'{}'", s.replace('\'', "''")),
1055 Value::Int(n) => n.to_string(),
1056 Value::Float(f) => f.to_string(),
1057 Value::Bool(b) => b.to_string(),
1058 _ => "NULL".to_string(),
1059 };
1060 format!("{} = {}", prop, val_str)
1061 })
1062 .collect();
1063
1064 let mut filter = filters.join(" AND ");
1065 filter.push_str(&format!(
1066 " AND _deleted = false AND _vid != {}",
1067 current_vid.as_u64()
1068 ));
1069
1070 if let Ok(ds) = self.storage.vertex_dataset(label)
1071 && let Ok(lance_ds) = ds.open_raw().await
1072 {
1073 let count = lance_ds.count_rows(Some(filter.clone())).await?;
1074 if count > 0 {
1075 return Err(anyhow!(
1076 "Constraint violation: Duplicate composite key for label '{}' (in storage). Filter: {}",
1077 label,
1078 filter
1079 ));
1080 }
1081 }
1082
1083 Ok(())
1084 }
1085
1086 async fn check_write_pressure(&self) -> Result<()> {
1087 let status = self
1088 .storage
1089 .compaction_status()
1090 .map_err(|e| anyhow::anyhow!("Failed to get compaction status: {}", e))?;
1091 let l1_runs = status.l1_runs;
1092 let throttle = &self.config.throttle;
1093
1094 if l1_runs >= throttle.hard_limit {
1095 log::warn!("Write stalled: L1 runs ({}) at hard limit", l1_runs);
1096 while self
1098 .storage
1099 .compaction_status()
1100 .map_err(|e| anyhow::anyhow!("Failed to get compaction status: {}", e))?
1101 .l1_runs
1102 >= throttle.hard_limit
1103 {
1104 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1105 }
1106 } else if l1_runs >= throttle.soft_limit {
1107 let excess = l1_runs - throttle.soft_limit;
1108 let excess = std::cmp::min(excess, 31);
1110 let multiplier = 2_u32.pow(excess as u32);
1111 let delay = throttle.base_delay * multiplier;
1112 tokio::time::sleep(delay).await;
1113 }
1114 Ok(())
1115 }
1116
1117 fn check_transaction_memory(&self) -> Result<()> {
1120 if let Some(tx_l0) = &self.transaction_l0 {
1121 let size = tx_l0.read().estimated_size;
1122 if size > self.config.max_transaction_memory {
1123 return Err(anyhow!(
1124 "Transaction memory limit exceeded: {} bytes used, limit is {} bytes. \
1125 Roll back or commit the current transaction.",
1126 size,
1127 self.config.max_transaction_memory
1128 ));
1129 }
1130 }
1131 Ok(())
1132 }
1133
1134 async fn get_query_context(&self) -> Option<QueryContext> {
1135 Some(QueryContext::new_with_pending(
1136 self.l0_manager.get_current(),
1137 self.transaction_l0.clone(),
1138 self.l0_manager.get_pending_flush(),
1139 ))
1140 }
1141
1142 async fn prepare_vertex_upsert(
1151 &self,
1152 vid: Vid,
1153 properties: &mut Properties,
1154 label: Option<&str>,
1155 ) -> Result<()> {
1156 let Some(pm) = &self.property_manager else {
1157 return Ok(());
1158 };
1159
1160 let schema = self.schema_manager.schema();
1161
1162 let discovered_labels;
1164 let label_name = if let Some(l) = label {
1165 Some(l)
1166 } else {
1167 discovered_labels = self.get_vertex_labels(vid).await;
1168 discovered_labels
1169 .as_ref()
1170 .and_then(|l| l.first().map(|s| s.as_str()))
1171 };
1172
1173 let Some(label_str) = label_name else {
1174 return Ok(());
1175 };
1176 let Some(props_meta) = schema.properties.get(label_str) else {
1177 return Ok(());
1178 };
1179
1180 let crdt_keys: Vec<String> = properties
1182 .keys()
1183 .filter(|key| {
1184 props_meta.get(*key).is_some_and(|meta| {
1185 matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
1186 })
1187 })
1188 .cloned()
1189 .collect();
1190
1191 if crdt_keys.is_empty() {
1192 return Ok(());
1193 }
1194
1195 let ctx = self.get_query_context().await;
1196 for key in crdt_keys {
1197 let existing = pm.get_vertex_prop_with_ctx(vid, &key, ctx.as_ref()).await?;
1198 if !existing.is_null()
1199 && let Some(val) = properties.get_mut(&key)
1200 {
1201 *val = pm.merge_crdt_values(&existing, val)?;
1202 }
1203 }
1204
1205 Ok(())
1206 }
1207
1208 async fn prepare_edge_upsert(&self, eid: Eid, properties: &mut Properties) -> Result<()> {
1209 if let Some(pm) = &self.property_manager {
1210 let schema = self.schema_manager.schema();
1211 let type_name = self.get_edge_type_from_l0(eid);
1213
1214 if let Some(ref t_name) = type_name
1215 && let Some(props_meta) = schema.properties.get(t_name)
1216 {
1217 let mut crdt_keys = Vec::new();
1218 for (key, _) in properties.iter() {
1219 if let Some(meta) = props_meta.get(key)
1220 && matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
1221 {
1222 crdt_keys.push(key.clone());
1223 }
1224 }
1225
1226 if !crdt_keys.is_empty() {
1227 let ctx = self.get_query_context().await;
1228 for key in crdt_keys {
1229 let existing = pm.get_edge_prop(eid, &key, ctx.as_ref()).await?;
1230
1231 if !existing.is_null()
1232 && let Some(val) = properties.get_mut(&key)
1233 {
1234 *val = pm.merge_crdt_values(&existing, val)?;
1235 }
1236 }
1237 }
1238 }
1239 }
1240 Ok(())
1241 }
1242
1243 #[instrument(skip(self, properties), level = "trace")]
1244 pub async fn insert_vertex(&mut self, vid: Vid, properties: Properties) -> Result<()> {
1245 self.insert_vertex_with_labels(vid, properties, &[]).await?;
1246 Ok(())
1247 }
1248
1249 #[instrument(skip(self, properties, labels), level = "trace")]
1250 pub async fn insert_vertex_with_labels(
1251 &mut self,
1252 vid: Vid,
1253 mut properties: Properties,
1254 labels: &[String],
1255 ) -> Result<Properties> {
1256 let start = std::time::Instant::now();
1257 self.check_write_pressure().await?;
1258 self.check_transaction_memory()?;
1259 self.process_embeddings_for_labels(labels, &mut properties)
1260 .await?;
1261 self.validate_vertex_constraints(vid, &properties, labels)
1262 .await?;
1263 self.prepare_vertex_upsert(vid, &mut properties, labels.first().map(|s| s.as_str()))
1264 .await?;
1265
1266 let properties_copy = properties.clone();
1268 let labels_copy = labels.to_vec();
1269
1270 {
1271 let l0 = self.active_l0();
1272 let mut l0_guard = l0.write();
1273 l0_guard.insert_vertex_with_labels(vid, properties, labels);
1274
1275 let schema = self.schema_manager.schema();
1277 for label in &labels_copy {
1278 if schema.get_label_case_insensitive(label).is_none() {
1280 continue;
1281 }
1282
1283 for constraint in &schema.constraints {
1285 if !constraint.enabled {
1286 continue;
1287 }
1288 if let ConstraintTarget::Label(l) = &constraint.target {
1289 if l != label {
1290 continue;
1291 }
1292 } else {
1293 continue;
1294 }
1295
1296 if let ConstraintType::Unique {
1297 properties: unique_props,
1298 } = &constraint.constraint_type
1299 {
1300 let mut key_values = Vec::new();
1301 let mut all_present = true;
1302 for prop in unique_props {
1303 if let Some(val) = properties_copy.get(prop) {
1304 key_values.push((prop.clone(), val.clone()));
1305 } else {
1306 all_present = false;
1307 break;
1308 }
1309 }
1310
1311 if all_present {
1312 let key = serialize_constraint_key(label, &key_values);
1313 l0_guard.insert_constraint_key(key, vid);
1314 }
1315 }
1316 }
1317 }
1318 }
1319
1320 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
1321 self.update_metrics();
1322
1323 if self.transaction_l0.is_none() {
1324 self.check_flush().await?;
1325 }
1326 if start.elapsed().as_millis() > 100 {
1327 log::warn!("Slow insert_vertex: {}ms", start.elapsed().as_millis());
1328 }
1329 Ok(properties_copy)
1330 }
1331
1332 pub async fn insert_vertices_batch(
1358 &mut self,
1359 vids: Vec<Vid>,
1360 mut properties_batch: Vec<Properties>,
1361 labels: Vec<String>,
1362 ) -> Result<Vec<Properties>> {
1363 let start = std::time::Instant::now();
1364
1365 if vids.len() != properties_batch.len() {
1367 return Err(anyhow!(
1368 "VID/properties size mismatch: {} vids, {} properties",
1369 vids.len(),
1370 properties_batch.len()
1371 ));
1372 }
1373
1374 if vids.is_empty() {
1375 return Ok(Vec::new());
1376 }
1377
1378 let is_nested = self.transaction_l0.is_some();
1380 if !is_nested {
1381 self.begin_transaction()?;
1382 }
1383
1384 let result = async {
1386 self.check_write_pressure().await?;
1387 self.check_transaction_memory()?;
1388
1389 self.process_embeddings_for_batch(&labels, &mut properties_batch)
1391 .await?;
1392
1393 let label = labels
1395 .first()
1396 .ok_or_else(|| anyhow!("No labels provided"))?;
1397 self.validate_vertex_batch_constraints(&vids, &properties_batch, label)
1398 .await?;
1399
1400 let has_crdt_fields = {
1405 let schema = self.schema_manager.schema();
1406 schema
1407 .properties
1408 .get(label.as_str())
1409 .is_some_and(|props_meta| {
1410 props_meta.values().any(|meta| {
1411 matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
1412 })
1413 })
1414 };
1415
1416 if has_crdt_fields {
1417 let schema = self.schema_manager.schema();
1420 let crdt_keys: Vec<String> = schema
1421 .properties
1422 .get(label.as_str())
1423 .map(|props_meta| {
1424 props_meta
1425 .iter()
1426 .filter(|(_, meta)| {
1427 matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
1428 })
1429 .map(|(key, _)| key.clone())
1430 .collect()
1431 })
1432 .unwrap_or_default();
1433
1434 if let Some(pm) = &self.property_manager {
1435 let ctx = self.get_query_context().await;
1436 for (vid, props) in vids.iter().zip(&mut properties_batch) {
1437 for key in &crdt_keys {
1438 if props.contains_key(key) {
1439 let existing =
1440 pm.get_vertex_prop_with_ctx(*vid, key, ctx.as_ref()).await?;
1441 if !existing.is_null()
1442 && let Some(val) = props.get_mut(key)
1443 {
1444 *val = pm.merge_crdt_values(&existing, val)?;
1445 }
1446 }
1447 }
1448 }
1449 }
1450 }
1451
1452 let tx_l0 = self
1454 .transaction_l0
1455 .as_ref()
1456 .ok_or_else(|| anyhow!("Transaction L0 missing"))?;
1457
1458 let properties_result = properties_batch.clone();
1459 {
1460 let mut l0_guard = tx_l0.write();
1461 for (vid, props) in vids.iter().zip(properties_batch) {
1462 l0_guard.insert_vertex_with_labels(*vid, props, &labels);
1463 }
1464 }
1465
1466 metrics::counter!("uni_l0_buffer_mutations_total").increment(vids.len() as u64);
1468 self.update_metrics();
1469
1470 Ok::<Vec<Properties>, anyhow::Error>(properties_result)
1471 }
1472 .await;
1473
1474 match result {
1476 Ok(props) => {
1477 if !is_nested {
1479 self.commit_transaction().await?;
1480 }
1481
1482 if start.elapsed().as_millis() > 100 {
1483 log::warn!(
1484 "Slow insert_vertices_batch ({} vertices): {}ms",
1485 vids.len(),
1486 start.elapsed().as_millis()
1487 );
1488 }
1489
1490 Ok(props)
1491 }
1492 Err(e) => {
1493 if !is_nested {
1495 self.rollback_transaction()?;
1496 }
1497 Err(e)
1498 }
1499 }
1500 }
1501
1502 #[instrument(skip(self, labels), level = "trace")]
1513 pub async fn delete_vertex(&mut self, vid: Vid, labels: Option<Vec<String>>) -> Result<()> {
1514 let start = std::time::Instant::now();
1515 self.check_write_pressure().await?;
1516 self.check_transaction_memory()?;
1517 let l0 = self.active_l0();
1518
1519 let has_labels = {
1522 let l0_guard = l0.read();
1523 l0_guard.vertex_labels.contains_key(&vid)
1524 };
1525
1526 if !has_labels {
1527 let resolved_labels = if let Some(provided) = labels {
1528 Some(provided)
1530 } else {
1531 let mut found = None;
1533 for pending_l0 in self.l0_manager.get_pending_flush() {
1534 let pending_guard = pending_l0.read();
1535 if let Some(l) = pending_guard.get_vertex_labels(vid) {
1536 found = Some(l.to_vec());
1537 break;
1538 }
1539 }
1540 if found.is_none() {
1541 found = self.find_vertex_labels_in_storage(vid).await?;
1542 }
1543 found
1544 };
1545
1546 if let Some(found_labels) = resolved_labels {
1547 let mut l0_guard = l0.write();
1548 l0_guard.vertex_labels.insert(vid, found_labels);
1549 }
1550 }
1551
1552 l0.write().delete_vertex(vid)?;
1553 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
1554 self.update_metrics();
1555
1556 if self.transaction_l0.is_none() {
1557 self.check_flush().await?;
1558 }
1559 if start.elapsed().as_millis() > 100 {
1560 log::warn!("Slow delete_vertex: {}ms", start.elapsed().as_millis());
1561 }
1562 Ok(())
1563 }
1564
1565 async fn find_vertex_labels_in_storage(&self, vid: Vid) -> Result<Option<Vec<String>>> {
1568 use arrow_array::Array;
1569 use arrow_array::cast::AsArray;
1570 use lancedb::query::{ExecutableQuery, QueryBase, Select};
1571
1572 let lancedb_store = self.storage.lancedb_store();
1573 let table_name = MainVertexDataset::table_name();
1574
1575 if !lancedb_store.table_exists(table_name).await? {
1577 return Ok(None);
1578 }
1579
1580 let table = lancedb_store.open_table(table_name).await?;
1581
1582 let filter = format!("_vid = {}", vid.as_u64());
1584 let query = table.query().only_if(filter).select(Select::Columns(vec![
1585 "_vid".to_string(),
1586 "labels".to_string(),
1587 "_version".to_string(),
1588 "_deleted".to_string(),
1589 ]));
1590
1591 let stream = query.execute().await?;
1592 let batches: Vec<arrow_array::RecordBatch> = stream.try_collect().await.unwrap_or_default();
1593
1594 let mut max_version: Option<u64> = None;
1596 let mut labels: Option<Vec<String>> = None;
1597 let mut is_deleted = false;
1598
1599 for batch in batches {
1600 if batch.num_rows() == 0 {
1601 continue;
1602 }
1603
1604 let version_array = batch
1605 .column_by_name("_version")
1606 .unwrap()
1607 .as_primitive::<arrow_array::types::UInt64Type>();
1608
1609 let deleted_array = batch.column_by_name("_deleted").unwrap().as_boolean();
1610
1611 let labels_array = batch.column_by_name("labels").unwrap().as_list::<i32>();
1612
1613 for row_idx in 0..batch.num_rows() {
1614 let version = version_array.value(row_idx);
1615
1616 if max_version.is_none_or(|mv| version > mv) {
1617 is_deleted = deleted_array.value(row_idx);
1618
1619 let labels_list = labels_array.value(row_idx);
1620 let string_array = labels_list.as_string::<i32>();
1621 let vertex_labels: Vec<String> = (0..string_array.len())
1622 .filter(|&i| !string_array.is_null(i))
1623 .map(|i| string_array.value(i).to_string())
1624 .collect();
1625
1626 max_version = Some(version);
1627 labels = Some(vertex_labels);
1628 }
1629 }
1630 }
1631
1632 if is_deleted { Ok(None) } else { Ok(labels) }
1634 }
1635
1636 #[instrument(skip(self, properties), level = "trace")]
1637 pub async fn insert_edge(
1638 &mut self,
1639 src_vid: Vid,
1640 dst_vid: Vid,
1641 edge_type: u32,
1642 eid: Eid,
1643 mut properties: Properties,
1644 edge_type_name: Option<String>,
1645 ) -> Result<()> {
1646 let start = std::time::Instant::now();
1647 self.check_write_pressure().await?;
1648 self.check_transaction_memory()?;
1649 self.prepare_edge_upsert(eid, &mut properties).await?;
1650
1651 let l0 = self.active_l0();
1652 l0.write()
1653 .insert_edge(src_vid, dst_vid, edge_type, eid, properties, edge_type_name)?;
1654
1655 if self.transaction_l0.is_none() {
1658 let version = l0.read().current_version;
1659 self.adjacency_manager
1660 .insert_edge(src_vid, dst_vid, eid, edge_type, version);
1661 }
1662
1663 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
1664 self.update_metrics();
1665
1666 if self.transaction_l0.is_none() {
1667 self.check_flush().await?;
1668 }
1669 if start.elapsed().as_millis() > 100 {
1670 log::warn!("Slow insert_edge: {}ms", start.elapsed().as_millis());
1671 }
1672 Ok(())
1673 }
1674
1675 #[instrument(skip(self), level = "trace")]
1676 pub async fn delete_edge(
1677 &mut self,
1678 eid: Eid,
1679 src_vid: Vid,
1680 dst_vid: Vid,
1681 edge_type: u32,
1682 ) -> Result<()> {
1683 let start = std::time::Instant::now();
1684 self.check_write_pressure().await?;
1685 self.check_transaction_memory()?;
1686 let l0 = self.active_l0();
1687
1688 l0.write().delete_edge(eid, src_vid, dst_vid, edge_type)?;
1689
1690 if self.transaction_l0.is_none() {
1692 let version = l0.read().current_version;
1693 self.adjacency_manager
1694 .add_tombstone(eid, src_vid, dst_vid, edge_type, version);
1695 }
1696 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
1697 self.update_metrics();
1698
1699 if self.transaction_l0.is_none() {
1700 self.check_flush().await?;
1701 }
1702 if start.elapsed().as_millis() > 100 {
1703 log::warn!("Slow delete_edge: {}ms", start.elapsed().as_millis());
1704 }
1705 Ok(())
1706 }
1707
1708 pub async fn check_flush(&mut self) -> Result<()> {
1712 let count = self.l0_manager.get_current().read().mutation_count;
1713
1714 if count == 0 {
1716 return Ok(());
1717 }
1718
1719 if count >= self.config.auto_flush_threshold {
1721 self.flush_to_l1(None).await?;
1722 return Ok(());
1723 }
1724
1725 if let Some(interval) = self.config.auto_flush_interval
1727 && self.last_flush_time.elapsed() >= interval
1728 && count >= self.config.auto_flush_min_mutations
1729 {
1730 self.flush_to_l1(None).await?;
1731 }
1732
1733 Ok(())
1734 }
1735
1736 async fn process_embeddings_for_labels(
1739 &self,
1740 labels: &[String],
1741 properties: &mut Properties,
1742 ) -> Result<()> {
1743 let label_name = labels.first().map(|s| s.as_str());
1744 self.process_embeddings_impl(label_name, properties).await
1745 }
1746
1747 async fn process_embeddings_for_batch(
1757 &self,
1758 labels: &[String],
1759 properties_batch: &mut [Properties],
1760 ) -> Result<()> {
1761 let label_name = labels.first().map(|s| s.as_str());
1762 let schema = self.schema_manager.schema();
1763
1764 if let Some(label) = label_name {
1765 let mut configs = Vec::new();
1767 for idx in &schema.indexes {
1768 if let IndexDefinition::Vector(v_config) = idx
1769 && v_config.label == label
1770 && let Some(emb_config) = &v_config.embedding_config
1771 {
1772 configs.push((v_config.property.clone(), emb_config.clone()));
1773 }
1774 }
1775
1776 if configs.is_empty() {
1777 return Ok(());
1778 }
1779
1780 for (target_prop, emb_config) in configs {
1781 let mut input_texts: Vec<String> = Vec::new();
1783 let mut needs_embedding: Vec<usize> = Vec::new();
1784
1785 for (idx, properties) in properties_batch.iter().enumerate() {
1786 if properties.contains_key(&target_prop) {
1788 continue;
1789 }
1790
1791 let mut inputs = Vec::new();
1793 for src_prop in &emb_config.source_properties {
1794 if let Some(val) = properties.get(src_prop)
1795 && let Some(s) = val.as_str()
1796 {
1797 inputs.push(s.to_string());
1798 }
1799 }
1800
1801 if !inputs.is_empty() {
1802 let input_text = inputs.join(" ");
1803 input_texts.push(input_text);
1804 needs_embedding.push(idx);
1805 }
1806 }
1807
1808 if input_texts.is_empty() {
1809 continue;
1810 }
1811
1812 let runtime = self.xervo_runtime.as_ref().ok_or_else(|| {
1813 anyhow!("Uni-Xervo runtime not configured for auto-embedding")
1814 })?;
1815 let embedder = runtime.embedding(&emb_config.alias).await?;
1816
1817 let input_refs: Vec<&str> = input_texts.iter().map(|s| s.as_str()).collect();
1819 let embeddings = embedder.embed(input_refs).await?;
1820
1821 for (embedding_idx, &prop_idx) in needs_embedding.iter().enumerate() {
1823 if let Some(vec) = embeddings.get(embedding_idx) {
1824 let vals: Vec<Value> =
1825 vec.iter().map(|f| Value::Float(*f as f64)).collect();
1826 properties_batch[prop_idx].insert(target_prop.clone(), Value::List(vals));
1827 }
1828 }
1829 }
1830 }
1831
1832 Ok(())
1833 }
1834
1835 async fn process_embeddings_impl(
1836 &self,
1837 label_name: Option<&str>,
1838 properties: &mut Properties,
1839 ) -> Result<()> {
1840 let schema = self.schema_manager.schema();
1841
1842 if let Some(label) = label_name {
1843 let mut configs = Vec::new();
1845 for idx in &schema.indexes {
1846 if let IndexDefinition::Vector(v_config) = idx
1847 && v_config.label == label
1848 && let Some(emb_config) = &v_config.embedding_config
1849 {
1850 configs.push((v_config.property.clone(), emb_config.clone()));
1851 }
1852 }
1853
1854 if configs.is_empty() {
1855 log::info!("No embedding config found for label {}", label);
1856 }
1857
1858 for (target_prop, emb_config) in configs {
1859 if properties.contains_key(&target_prop) {
1861 continue;
1862 }
1863
1864 let mut inputs = Vec::new();
1866 for src_prop in &emb_config.source_properties {
1867 if let Some(val) = properties.get(src_prop)
1868 && let Some(s) = val.as_str()
1869 {
1870 inputs.push(s.to_string());
1871 }
1872 }
1873
1874 if inputs.is_empty() {
1875 continue;
1876 }
1877
1878 let input_text = inputs.join(" "); let runtime = self.xervo_runtime.as_ref().ok_or_else(|| {
1881 anyhow!("Uni-Xervo runtime not configured for auto-embedding")
1882 })?;
1883 let embedder = runtime.embedding(&emb_config.alias).await?;
1884
1885 let embeddings = embedder.embed(vec![input_text.as_str()]).await?;
1887 if let Some(vec) = embeddings.first() {
1888 let vals: Vec<Value> = vec.iter().map(|f| Value::Float(*f as f64)).collect();
1890 properties.insert(target_prop.clone(), Value::List(vals));
1891 }
1892 }
1893 }
1894 Ok(())
1895 }
1896
1897 #[instrument(
1907 skip(self),
1908 fields(snapshot_id, mutations_count, size_bytes),
1909 level = "info"
1910 )]
1911 pub async fn flush_to_l1(&mut self, name: Option<String>) -> Result<String> {
1912 let start = std::time::Instant::now();
1913 let schema = self.schema_manager.schema();
1914
1915 let (initial_size, initial_count) = {
1916 let l0_arc = self.l0_manager.get_current();
1917 let l0 = l0_arc.read();
1918 (l0.estimated_size, l0.mutation_count)
1919 };
1920 tracing::Span::current().record("size_bytes", initial_size);
1921 tracing::Span::current().record("mutations_count", initial_count);
1922
1923 debug!("Starting L0 flush to L1");
1924
1925 let wal_for_truncate = {
1930 let current_l0 = self.l0_manager.get_current();
1931 let l0_guard = current_l0.read();
1932 l0_guard.wal.clone()
1933 };
1934
1935 let wal_lsn = if let Some(ref w) = wal_for_truncate {
1936 w.flush().await?
1937 } else {
1938 0
1939 };
1940
1941 let old_l0_arc = self.l0_manager.begin_flush(0, None);
1945 metrics::counter!("uni_l0_buffer_rotations_total").increment(1);
1946
1947 let current_version;
1948 {
1949 let mut old_l0_guard = old_l0_arc.write();
1951 current_version = old_l0_guard.current_version;
1952
1953 old_l0_guard.wal_lsn_at_flush = wal_lsn;
1956
1957 let wal = old_l0_guard.wal.take();
1958
1959 let new_l0_arc = self.l0_manager.get_current();
1961 let mut new_l0_guard = new_l0_arc.write();
1962 new_l0_guard.wal = wal;
1963 new_l0_guard.current_version = current_version;
1964 } let mut entries_by_type: HashMap<u32, Vec<L1Entry>> = HashMap::new();
1968 type VertexEntry = (Vid, Vec<String>, Properties, bool, u64);
1970 let mut vertices_by_label: HashMap<u16, Vec<VertexEntry>> = HashMap::new();
1971 let mut vertex_created_at: HashMap<Vid, i64> = HashMap::new();
1973 let mut vertex_updated_at: HashMap<Vid, i64> = HashMap::new();
1974 let mut orphaned_tombstones: Vec<(Vid, u64)> = Vec::new();
1976
1977 {
1978 let old_l0 = old_l0_arc.read();
1979
1980 for edge in old_l0.graph.edges() {
1982 let properties = old_l0
1983 .edge_properties
1984 .get(&edge.eid)
1985 .cloned()
1986 .unwrap_or_default();
1987 let version = old_l0.edge_versions.get(&edge.eid).copied().unwrap_or(0);
1988
1989 let created_at = old_l0.edge_created_at.get(&edge.eid).copied();
1991 let updated_at = old_l0.edge_updated_at.get(&edge.eid).copied();
1992
1993 entries_by_type
1994 .entry(edge.edge_type)
1995 .or_default()
1996 .push(L1Entry {
1997 src_vid: edge.src_vid,
1998 dst_vid: edge.dst_vid,
1999 eid: edge.eid,
2000 op: Op::Insert,
2001 version,
2002 properties,
2003 created_at,
2004 updated_at,
2005 });
2006 }
2007
2008 for tombstone in old_l0.tombstones.values() {
2010 let version = old_l0
2011 .edge_versions
2012 .get(&tombstone.eid)
2013 .copied()
2014 .unwrap_or(0);
2015 let created_at = old_l0.edge_created_at.get(&tombstone.eid).copied();
2017 let updated_at = old_l0.edge_updated_at.get(&tombstone.eid).copied();
2018
2019 entries_by_type
2020 .entry(tombstone.edge_type)
2021 .or_default()
2022 .push(L1Entry {
2023 src_vid: tombstone.src_vid,
2024 dst_vid: tombstone.dst_vid,
2025 eid: tombstone.eid,
2026 op: Op::Delete,
2027 version,
2028 properties: HashMap::new(),
2029 created_at,
2030 updated_at,
2031 });
2032 }
2033
2034 let push_vertex_to_labels =
2040 |vid: Vid,
2041 all_labels: &[String],
2042 props: Properties,
2043 deleted: bool,
2044 version: u64,
2045 out: &mut HashMap<u16, Vec<VertexEntry>>| {
2046 for label in all_labels {
2047 if let Some(label_id) = schema.label_id_by_name(label) {
2048 out.entry(label_id).or_default().push((
2049 vid,
2050 all_labels.to_vec(),
2051 props.clone(),
2052 deleted,
2053 version,
2054 ));
2055 }
2056 }
2057 };
2058
2059 for (vid, props) in &old_l0.vertex_properties {
2060 let version = old_l0.vertex_versions.get(vid).copied().unwrap_or(0);
2061 if let Some(&ts) = old_l0.vertex_created_at.get(vid) {
2063 vertex_created_at.insert(*vid, ts);
2064 }
2065 if let Some(&ts) = old_l0.vertex_updated_at.get(vid) {
2066 vertex_updated_at.insert(*vid, ts);
2067 }
2068 if let Some(labels) = old_l0.vertex_labels.get(vid) {
2069 push_vertex_to_labels(
2070 *vid,
2071 labels,
2072 props.clone(),
2073 false,
2074 version,
2075 &mut vertices_by_label,
2076 );
2077 }
2078 }
2079 for &vid in &old_l0.vertex_tombstones {
2080 let version = old_l0.vertex_versions.get(&vid).copied().unwrap_or(0);
2081 if let Some(labels) = old_l0.vertex_labels.get(&vid) {
2082 push_vertex_to_labels(
2083 vid,
2084 labels,
2085 HashMap::new(),
2086 true,
2087 version,
2088 &mut vertices_by_label,
2089 );
2090 } else {
2091 orphaned_tombstones.push((vid, version));
2093 }
2094 }
2095 } if !orphaned_tombstones.is_empty() {
2099 tracing::warn!(
2100 count = orphaned_tombstones.len(),
2101 "Tombstones missing labels in L0, querying storage as fallback"
2102 );
2103 for (vid, version) in orphaned_tombstones {
2104 if let Ok(Some(labels)) = self.find_vertex_labels_in_storage(vid).await
2105 && !labels.is_empty()
2106 {
2107 for label in &labels {
2108 if let Some(label_id) = schema.label_id_by_name(label) {
2109 vertices_by_label.entry(label_id).or_default().push((
2110 vid,
2111 labels.clone(),
2112 HashMap::new(),
2113 true,
2114 version,
2115 ));
2116 }
2117 }
2118 }
2119 }
2120 }
2121
2122 let mut manifest = self
2124 .storage
2125 .snapshot_manager()
2126 .load_latest_snapshot()
2127 .await?
2128 .unwrap_or_else(|| {
2129 SnapshotManifest::new(Uuid::new_v4().to_string(), schema.schema_version)
2130 });
2131
2132 let parent_id = manifest.snapshot_id.clone();
2135 manifest.parent_snapshot = Some(parent_id);
2136 manifest.snapshot_id = Uuid::new_v4().to_string();
2137 manifest.name = name;
2138 manifest.created_at = Utc::now();
2139 manifest.version_high_water_mark = current_version;
2140 manifest.wal_high_water_mark = wal_lsn;
2141 let snapshot_id = manifest.snapshot_id.clone();
2142
2143 tracing::Span::current().record("snapshot_id", &snapshot_id);
2144
2145 let lancedb_store = self.storage.lancedb_store();
2147
2148 for (&edge_type_id, entries) in entries_by_type.iter() {
2149 let edge_type_name = self
2151 .storage
2152 .schema_manager()
2153 .edge_type_name_by_id_unified(edge_type_id)
2154 .ok_or_else(|| anyhow!("Edge type ID {} not found", edge_type_id))?;
2155
2156 let mut fwd_entries = entries.clone();
2158 fwd_entries.sort_by_key(|e| e.src_vid);
2159 let fwd_ds = self.storage.delta_dataset(&edge_type_name, "fwd")?;
2160 let fwd_batch = fwd_ds.build_record_batch(&fwd_entries, &schema)?;
2161
2162 let table = fwd_ds.write_run_lancedb(lancedb_store, fwd_batch).await?;
2164 fwd_ds.ensure_eid_index_lancedb(&table).await?;
2165
2166 let mut bwd_entries = entries.clone();
2168 bwd_entries.sort_by_key(|e| e.dst_vid);
2169 let bwd_ds = self.storage.delta_dataset(&edge_type_name, "bwd")?;
2170 let bwd_batch = bwd_ds.build_record_batch(&bwd_entries, &schema)?;
2171
2172 let bwd_table = bwd_ds.write_run_lancedb(lancedb_store, bwd_batch).await?;
2173 bwd_ds.ensure_eid_index_lancedb(&bwd_table).await?;
2174
2175 let current_snap =
2177 manifest
2178 .edges
2179 .entry(edge_type_name.to_string())
2180 .or_insert(EdgeSnapshot {
2181 version: 0,
2182 count: 0,
2183 lance_version: 0,
2184 });
2185 current_snap.version += 1;
2186 current_snap.count += entries.len() as u64;
2187 current_snap.lance_version = 0;
2189
2190 }
2193
2194 for (label_id, vertices) in vertices_by_label {
2196 let label_name = schema
2197 .label_name_by_id(label_id)
2198 .ok_or_else(|| anyhow!("Label ID {} not found", label_id))?;
2199
2200 let ds = self.storage.vertex_dataset(label_name)?;
2201
2202 type InvertedUpdateMap = HashMap<String, (HashMap<Vid, Vec<String>>, HashSet<Vid>)>;
2205 let mut inverted_updates: InvertedUpdateMap = HashMap::new();
2206
2207 for idx in &schema.indexes {
2208 if let IndexDefinition::Inverted(cfg) = idx
2209 && cfg.label == label_name
2210 {
2211 let mut added: HashMap<Vid, Vec<String>> = HashMap::new();
2212 let mut removed: HashSet<Vid> = HashSet::new();
2213
2214 for (vid, _labels, props, deleted, _version) in &vertices {
2215 if *deleted {
2216 removed.insert(*vid);
2217 } else if let Some(prop_value) = props.get(&cfg.property) {
2218 if let Some(arr) = prop_value.as_array() {
2220 let terms: Vec<String> = arr
2221 .iter()
2222 .filter_map(|v| v.as_str().map(ToString::to_string))
2223 .collect();
2224 if !terms.is_empty() {
2225 added.insert(*vid, terms);
2226 }
2227 }
2228 }
2229 }
2230
2231 if !added.is_empty() || !removed.is_empty() {
2232 inverted_updates.insert(cfg.property.clone(), (added, removed));
2233 }
2234 }
2235 }
2236
2237 let mut v_data = Vec::new();
2238 let mut d_data = Vec::new();
2239 let mut ver_data = Vec::new();
2240 for (vid, labels, props, deleted, version) in vertices {
2241 v_data.push((vid, labels, props));
2242 d_data.push(deleted);
2243 ver_data.push(version);
2244 }
2245
2246 let batch = ds.build_record_batch_with_timestamps(
2247 &v_data,
2248 &d_data,
2249 &ver_data,
2250 &schema,
2251 Some(&vertex_created_at),
2252 Some(&vertex_updated_at),
2253 )?;
2254
2255 let table = ds
2257 .write_batch_lancedb(lancedb_store, batch, &schema)
2258 .await?;
2259 ds.ensure_default_indexes_lancedb(&table).await?;
2260
2261 for ((vid, labels, _props), &deleted) in v_data.iter().zip(d_data.iter()) {
2263 if deleted {
2264 self.storage.remove_from_vid_labels_index(*vid);
2265 } else {
2266 self.storage.update_vid_labels_index(*vid, labels.clone());
2267 }
2268 }
2269
2270 let current_snap =
2272 manifest
2273 .vertices
2274 .entry(label_name.to_string())
2275 .or_insert(LabelSnapshot {
2276 version: 0,
2277 count: 0,
2278 lance_version: 0,
2279 });
2280 current_snap.version += 1;
2281 current_snap.count += v_data.len() as u64;
2282 current_snap.lance_version = 0;
2284
2285 self.storage.invalidate_table_cache(label_name);
2287
2288 for idx in &schema.indexes {
2290 if let IndexDefinition::Inverted(cfg) = idx
2291 && cfg.label == label_name
2292 && let Some((added, removed)) = inverted_updates.get(&cfg.property)
2293 {
2294 self.storage
2295 .index_manager()
2296 .update_inverted_index_incremental(cfg, added, removed)
2297 .await?;
2298 }
2299 }
2300
2301 let mut uid_mappings: Vec<(uni_common::core::id::UniId, Vid)> = Vec::new();
2304 for (vid, _labels, props) in &v_data {
2305 let ext_id = props.get("ext_id").and_then(|v| v.as_str());
2306 let uid = crate::storage::vertex::VertexDataset::compute_vertex_uid(
2307 label_name, ext_id, props,
2308 );
2309 uid_mappings.push((uid, *vid));
2310 }
2311
2312 if !uid_mappings.is_empty()
2313 && let Ok(uid_index) = self.storage.uid_index(label_name)
2314 {
2315 for (uid, vid) in &uid_mappings {
2320 if let Ok(Some(existing_vid)) = uid_index.get_vid(uid).await
2321 && existing_vid != *vid
2322 {
2323 anyhow::bail!(
2324 "UID collision detected: UID {:?} maps to both VID {} and VID {}. \
2325 This indicates either a hash collision (astronomically unlikely with SHA3-256) \
2326 or data corruption. Cannot proceed with flush.",
2327 uid,
2328 existing_vid.as_u64(),
2329 vid.as_u64()
2330 );
2331 }
2332 }
2333
2334 uid_index.write_mapping(&uid_mappings).await?;
2335 }
2336 }
2337
2338 let (main_edges, edge_created_at_map, edge_updated_at_map) = {
2342 let _old_l0 = old_l0_arc.read();
2343 let mut main_edges: Vec<(
2344 uni_common::core::id::Eid,
2345 Vid,
2346 Vid,
2347 String,
2348 Properties,
2349 bool,
2350 u64,
2351 )> = Vec::new();
2352 let mut edge_created_at_map: HashMap<uni_common::core::id::Eid, i64> = HashMap::new();
2353 let mut edge_updated_at_map: HashMap<uni_common::core::id::Eid, i64> = HashMap::new();
2354
2355 for (&edge_type_id, entries) in entries_by_type.iter() {
2356 for entry in entries {
2357 let edge_type_name = self
2359 .storage
2360 .schema_manager()
2361 .edge_type_name_by_id_unified(edge_type_id)
2362 .unwrap_or_else(|| "unknown".to_string());
2363
2364 let deleted = matches!(entry.op, Op::Delete);
2365 main_edges.push((
2366 entry.eid,
2367 entry.src_vid,
2368 entry.dst_vid,
2369 edge_type_name,
2370 entry.properties.clone(),
2371 deleted,
2372 entry.version,
2373 ));
2374
2375 if let Some(ts) = entry.created_at {
2376 edge_created_at_map.insert(entry.eid, ts);
2377 }
2378 if let Some(ts) = entry.updated_at {
2379 edge_updated_at_map.insert(entry.eid, ts);
2380 }
2381 }
2382 }
2383
2384 (main_edges, edge_created_at_map, edge_updated_at_map)
2385 }; if !main_edges.is_empty() {
2388 let main_edge_batch = MainEdgeDataset::build_record_batch(
2389 &main_edges,
2390 Some(&edge_created_at_map),
2391 Some(&edge_updated_at_map),
2392 )?;
2393 let main_edge_table =
2394 MainEdgeDataset::write_batch_lancedb(lancedb_store, main_edge_batch).await?;
2395 MainEdgeDataset::ensure_default_indexes_lancedb(&main_edge_table).await?;
2396 }
2397
2398 let main_vertices: Vec<(Vid, Vec<String>, Properties, bool, u64)> = {
2401 let old_l0 = old_l0_arc.read();
2402 let mut vertices = Vec::new();
2403
2404 for (vid, props) in &old_l0.vertex_properties {
2406 let version = old_l0.vertex_versions.get(vid).copied().unwrap_or(0);
2407 let labels = old_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
2408 vertices.push((*vid, labels, props.clone(), false, version));
2409 }
2410
2411 for &vid in &old_l0.vertex_tombstones {
2413 let version = old_l0.vertex_versions.get(&vid).copied().unwrap_or(0);
2414 let labels = old_l0.vertex_labels.get(&vid).cloned().unwrap_or_default();
2415 vertices.push((vid, labels, HashMap::new(), true, version));
2416 }
2417
2418 vertices
2419 }; if !main_vertices.is_empty() {
2422 let main_vertex_batch = MainVertexDataset::build_record_batch(
2423 &main_vertices,
2424 Some(&vertex_created_at),
2425 Some(&vertex_updated_at),
2426 )?;
2427 let main_vertex_table =
2428 MainVertexDataset::write_batch_lancedb(lancedb_store, main_vertex_batch).await?;
2429 MainVertexDataset::ensure_default_indexes_lancedb(&main_vertex_table).await?;
2430 }
2431
2432 self.storage
2434 .snapshot_manager()
2435 .save_snapshot(&manifest)
2436 .await?;
2437 self.storage
2438 .snapshot_manager()
2439 .set_latest_snapshot(&manifest.snapshot_id)
2440 .await?;
2441
2442 self.l0_manager.complete_flush(&old_l0_arc);
2445
2446 if let Some(w) = wal_for_truncate {
2449 let safe_lsn = self
2451 .l0_manager
2452 .min_pending_wal_lsn()
2453 .map(|min_pending| min_pending.min(wal_lsn))
2454 .unwrap_or(wal_lsn);
2455 w.truncate_before(safe_lsn).await?;
2456 }
2457
2458 if let Some(ref pm) = self.property_manager {
2461 pm.clear_cache().await;
2462 }
2463
2464 self.last_flush_time = std::time::Instant::now();
2466
2467 info!(
2468 snapshot_id,
2469 mutations_count = initial_count,
2470 size_bytes = initial_size,
2471 "L0 flush to L1 completed successfully"
2472 );
2473 metrics::histogram!("uni_flush_duration_seconds").record(start.elapsed().as_secs_f64());
2474 metrics::counter!("uni_flush_bytes_total").increment(initial_size as u64);
2475 metrics::counter!("uni_flush_rows_total").increment(initial_count as u64);
2476
2477 let am = self.adjacency_manager.clone();
2481 if am.should_compact(4) {
2482 let previous_still_running = {
2483 let guard = self.compaction_handle.read();
2484 guard.as_ref().is_some_and(|h| !h.is_finished())
2485 };
2486
2487 if previous_still_running {
2488 info!("Skipping compaction: previous compaction still in progress");
2489 } else {
2490 let handle = tokio::spawn(async move {
2491 am.compact();
2492 });
2493 *self.compaction_handle.write() = Some(handle);
2494 }
2495 }
2496
2497 if let Some(ref rebuild_mgr) = self.index_rebuild_manager
2499 && self.config.index_rebuild.auto_rebuild_enabled
2500 {
2501 self.schedule_index_rebuilds_if_needed(&manifest, rebuild_mgr.clone());
2502 }
2503
2504 Ok(snapshot_id)
2505 }
2506
2507 fn schedule_index_rebuilds_if_needed(
2511 &self,
2512 manifest: &SnapshotManifest,
2513 rebuild_mgr: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
2514 ) {
2515 let checker = crate::storage::index_rebuild::RebuildTriggerChecker::new(
2516 self.config.index_rebuild.clone(),
2517 );
2518 let schema = self.schema_manager.schema();
2519 let labels = checker.labels_needing_rebuild(manifest, &schema.indexes);
2520
2521 if labels.is_empty() {
2522 return;
2523 }
2524
2525 for label in &labels {
2527 for idx in &schema.indexes {
2528 if idx.label() == label {
2529 let _ = self.schema_manager.update_index_metadata(idx.name(), |m| {
2530 m.status = uni_common::core::schema::IndexStatus::Stale;
2531 });
2532 }
2533 }
2534 }
2535
2536 tokio::spawn(async move {
2537 if let Err(e) = rebuild_mgr.schedule(labels).await {
2538 tracing::warn!("Failed to schedule index rebuild: {e}");
2539 }
2540 });
2541 }
2542
2543 pub fn set_property_manager(&mut self, pm: Arc<PropertyManager>) {
2545 self.property_manager = Some(pm);
2546 }
2547}
2548
2549#[cfg(test)]
2550mod tests {
2551 use super::*;
2552 use tempfile::tempdir;
2553
2554 #[tokio::test]
2557 async fn test_commit_transaction_wal_before_merge() -> Result<()> {
2558 use crate::runtime::wal::WriteAheadLog;
2559 use crate::storage::manager::StorageManager;
2560 use object_store::local::LocalFileSystem;
2561 use object_store::path::Path as ObjectStorePath;
2562 use uni_common::core::schema::SchemaManager;
2563
2564 let dir = tempdir()?;
2565 let path = dir.path().to_str().unwrap();
2566 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2567 let schema_path = ObjectStorePath::from("schema.json");
2568
2569 let schema_manager =
2570 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
2571 let _label_id = schema_manager.add_label("Test")?;
2572 schema_manager.save().await?;
2573
2574 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
2575
2576 let wal_path = ObjectStorePath::from("wal");
2578 let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
2579
2580 let mut writer = Writer::new_with_config(
2581 storage.clone(),
2582 schema_manager.clone(),
2583 1,
2584 UniConfig::default(),
2585 Some(wal),
2586 None,
2587 )
2588 .await?;
2589
2590 writer.begin_transaction()?;
2592
2593 let vid_a = writer.next_vid().await?;
2595 let vid_b = writer.next_vid().await?;
2596
2597 let mut props = std::collections::HashMap::new();
2598 props.insert("test".to_string(), Value::String("data".to_string()));
2599
2600 writer
2601 .insert_vertex_with_labels(vid_a, props.clone(), &["Test".to_string()])
2602 .await?;
2603 writer
2604 .insert_vertex_with_labels(
2605 vid_b,
2606 std::collections::HashMap::new(),
2607 &["Test".to_string()],
2608 )
2609 .await?;
2610
2611 let eid = writer.next_eid(1).await?;
2612 writer
2613 .insert_edge(vid_a, vid_b, 1, eid, std::collections::HashMap::new(), None)
2614 .await?;
2615
2616 let l0 = writer.l0_manager.get_current();
2618 let wal = l0.read().wal.clone().expect("Main L0 should have WAL");
2619 let mutations_before = wal.replay().await?;
2620 let count_before = mutations_before.len();
2621
2622 writer.commit_transaction().await?;
2624
2625 let mutations_after = wal.replay().await?;
2627 assert!(
2628 mutations_after.len() > count_before,
2629 "WAL should contain transaction mutations after commit"
2630 );
2631
2632 let new_mutations: Vec<_> = mutations_after.into_iter().skip(count_before).collect();
2634
2635 let mut saw_vertex_a = false;
2636 let mut saw_vertex_b = false;
2637 let mut saw_edge = false;
2638
2639 for mutation in &new_mutations {
2640 match mutation {
2641 crate::runtime::wal::Mutation::InsertVertex { vid, .. } => {
2642 if *vid == vid_a {
2643 saw_vertex_a = true;
2644 }
2645 if *vid == vid_b {
2646 saw_vertex_b = true;
2647 }
2648 assert!(!saw_edge, "Vertices should be logged to WAL before edges");
2650 }
2651 crate::runtime::wal::Mutation::InsertEdge { eid: e, .. } => {
2652 if *e == eid {
2653 saw_edge = true;
2654 }
2655 assert!(
2657 saw_vertex_a && saw_vertex_b,
2658 "Edge should be logged after both vertices"
2659 );
2660 }
2661 _ => {}
2662 }
2663 }
2664
2665 assert!(saw_vertex_a, "Vertex A should be in WAL");
2666 assert!(saw_vertex_b, "Vertex B should be in WAL");
2667 assert!(saw_edge, "Edge should be in WAL");
2668
2669 let l0_read = l0.read();
2671 assert!(
2672 l0_read.vertex_properties.contains_key(&vid_a),
2673 "Vertex A should be in main L0"
2674 );
2675 assert!(
2676 l0_read.vertex_properties.contains_key(&vid_b),
2677 "Vertex B should be in main L0"
2678 );
2679 assert!(
2680 l0_read.edge_endpoints.contains_key(&eid),
2681 "Edge should be in main L0"
2682 );
2683
2684 Ok(())
2685 }
2686
2687 #[tokio::test]
2689 async fn test_commit_transaction_wal_failure_rollback() -> Result<()> {
2690 use crate::runtime::wal::WriteAheadLog;
2691 use crate::storage::manager::StorageManager;
2692 use object_store::local::LocalFileSystem;
2693 use object_store::path::Path as ObjectStorePath;
2694 use uni_common::core::schema::SchemaManager;
2695
2696 let dir = tempdir()?;
2697 let path = dir.path().to_str().unwrap();
2698 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2699 let schema_path = ObjectStorePath::from("schema.json");
2700
2701 let schema_manager =
2702 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
2703 let _label_id = schema_manager.add_label("Test")?;
2704 let _baseline_label_id = schema_manager.add_label("Baseline")?;
2705 let _txdata_label_id = schema_manager.add_label("TxData")?;
2706 schema_manager.save().await?;
2707
2708 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
2709
2710 let wal_path = ObjectStorePath::from("wal");
2712 let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
2713
2714 let mut writer = Writer::new_with_config(
2715 storage.clone(),
2716 schema_manager.clone(),
2717 1,
2718 UniConfig::default(),
2719 Some(wal),
2720 None,
2721 )
2722 .await?;
2723
2724 let baseline_vid = writer.next_vid().await?;
2726 writer
2727 .insert_vertex_with_labels(
2728 baseline_vid,
2729 [("baseline".to_string(), Value::Bool(true))]
2730 .into_iter()
2731 .collect(),
2732 &["Baseline".to_string()],
2733 )
2734 .await?;
2735
2736 writer.begin_transaction()?;
2738
2739 let tx_vid = writer.next_vid().await?;
2741 writer
2742 .insert_vertex_with_labels(
2743 tx_vid,
2744 [("tx_data".to_string(), Value::Bool(true))]
2745 .into_iter()
2746 .collect(),
2747 &["TxData".to_string()],
2748 )
2749 .await?;
2750
2751 let l0 = writer.l0_manager.get_current();
2753 let vertex_count_before = l0.read().vertex_properties.len();
2754
2755 writer.rollback_transaction()?;
2757
2758 let vertex_count_after = l0.read().vertex_properties.len();
2760 assert_eq!(
2761 vertex_count_before, vertex_count_after,
2762 "Main L0 should not change after rollback"
2763 );
2764
2765 assert!(
2767 l0.read().vertex_properties.contains_key(&baseline_vid),
2768 "Baseline data should remain"
2769 );
2770
2771 assert!(
2773 !l0.read().vertex_properties.contains_key(&tx_vid),
2774 "Transaction data should not be in main L0 after rollback"
2775 );
2776
2777 Ok(())
2778 }
2779
2780 #[tokio::test]
2783 async fn test_batch_insert_shared_labels() -> Result<()> {
2784 use crate::storage::manager::StorageManager;
2785 use object_store::local::LocalFileSystem;
2786 use object_store::path::Path as ObjectStorePath;
2787 use uni_common::core::schema::SchemaManager;
2788
2789 let dir = tempdir()?;
2790 let path = dir.path().to_str().unwrap();
2791 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2792 let schema_path = ObjectStorePath::from("schema.json");
2793
2794 let schema_manager =
2795 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
2796 let _label_id = schema_manager.add_label("Person")?;
2797 schema_manager.save().await?;
2798
2799 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
2800
2801 let mut writer = Writer::new(storage.clone(), schema_manager.clone(), 1).await?;
2802
2803 let labels = &["Person".to_string()];
2805
2806 let mut vids = Vec::new();
2808 for i in 0..100 {
2809 let vid = writer.next_vid().await?;
2810 let mut props = std::collections::HashMap::new();
2811 props.insert("id".to_string(), Value::Int(i));
2812 writer.insert_vertex_with_labels(vid, props, labels).await?;
2813 vids.push(vid);
2814 }
2815
2816 let l0 = writer.l0_manager.get_current();
2818 for vid in vids {
2819 let l0_guard = l0.read();
2820 let vertex_labels = l0_guard.vertex_labels.get(&vid);
2821 assert!(vertex_labels.is_some(), "Vertex should have labels");
2822 assert_eq!(
2823 vertex_labels.unwrap(),
2824 &vec!["Person".to_string()],
2825 "Labels should match"
2826 );
2827 }
2828
2829 Ok(())
2830 }
2831
2832 #[tokio::test]
2835 async fn test_estimated_size_tracks_mutations() -> Result<()> {
2836 use crate::storage::manager::StorageManager;
2837 use object_store::local::LocalFileSystem;
2838 use object_store::path::Path as ObjectStorePath;
2839 use uni_common::core::schema::SchemaManager;
2840
2841 let dir = tempdir()?;
2842 let path = dir.path().to_str().unwrap();
2843 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2844 let schema_path = ObjectStorePath::from("schema.json");
2845
2846 let schema_manager =
2847 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
2848 let _label_id = schema_manager.add_label("Test")?;
2849 schema_manager.save().await?;
2850
2851 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
2852
2853 let mut writer = Writer::new(storage.clone(), schema_manager.clone(), 1).await?;
2854
2855 let l0 = writer.l0_manager.get_current();
2856
2857 let initial_estimated = l0.read().estimated_size;
2859 let initial_actual = l0.read().size_bytes();
2860 assert_eq!(initial_estimated, 0, "Initial estimated_size should be 0");
2861 assert_eq!(initial_actual, 0, "Initial size_bytes should be 0");
2862
2863 let mut vids = Vec::new();
2865 for i in 0..10 {
2866 let vid = writer.next_vid().await?;
2867 let mut props = std::collections::HashMap::new();
2868 props.insert("name".to_string(), Value::String(format!("vertex_{}", i)));
2869 props.insert("index".to_string(), Value::Int(i));
2870 writer.insert_vertex_with_labels(vid, props, &[]).await?;
2871 vids.push(vid);
2872 }
2873
2874 let after_vertices_estimated = l0.read().estimated_size;
2876 let after_vertices_actual = l0.read().size_bytes();
2877 assert!(
2878 after_vertices_estimated > 0,
2879 "estimated_size should grow after insertions"
2880 );
2881
2882 let ratio = after_vertices_estimated as f64 / after_vertices_actual as f64;
2884 assert!(
2885 (0.5..=2.0).contains(&ratio),
2886 "estimated_size ({}) should be within 2x of size_bytes ({}), ratio: {}",
2887 after_vertices_estimated,
2888 after_vertices_actual,
2889 ratio
2890 );
2891
2892 let edge_type = 1u32;
2894 for i in 0..9 {
2895 let eid = writer.next_eid(edge_type).await?;
2896 writer
2897 .insert_edge(
2898 vids[i],
2899 vids[i + 1],
2900 edge_type,
2901 eid,
2902 std::collections::HashMap::new(),
2903 Some("NEXT".to_string()),
2904 )
2905 .await?;
2906 }
2907
2908 let after_edges_estimated = l0.read().estimated_size;
2910 let after_edges_actual = l0.read().size_bytes();
2911 assert!(
2912 after_edges_estimated > after_vertices_estimated,
2913 "estimated_size should grow after edge insertions"
2914 );
2915
2916 let ratio = after_edges_estimated as f64 / after_edges_actual as f64;
2918 assert!(
2919 (0.5..=2.0).contains(&ratio),
2920 "estimated_size ({}) should be within 2x of size_bytes ({}), ratio: {}",
2921 after_edges_estimated,
2922 after_edges_actual,
2923 ratio
2924 );
2925
2926 Ok(())
2927 }
2928}