1use crate::runtime::context::QueryContext;
5use crate::runtime::id_allocator::IdAllocator;
6use crate::runtime::l0::{L0Buffer, serialize_constraint_key};
7use crate::runtime::l0_manager::L0Manager;
8use crate::runtime::property_manager::PropertyManager;
9use crate::runtime::wal::WriteAheadLog;
10use crate::storage::adjacency_manager::AdjacencyManager;
11use crate::storage::delta::{L1Entry, Op};
12use crate::storage::main_edge::MainEdgeDataset;
13use crate::storage::main_vertex::MainVertexDataset;
14use crate::storage::manager::StorageManager;
15use anyhow::{Result, anyhow};
16use chrono::Utc;
17use futures::TryStreamExt;
18use metrics;
19use parking_lot::RwLock;
20use std::collections::{HashMap, HashSet};
21use std::sync::Arc;
22use tracing::{debug, info, instrument};
23use uni_common::Properties;
24use uni_common::Value;
25use uni_common::config::UniConfig;
26use uni_common::core::id::{Eid, Vid};
27use uni_common::core::schema::{ConstraintTarget, ConstraintType, IndexDefinition};
28use uni_common::core::snapshot::{EdgeSnapshot, LabelSnapshot, SnapshotManifest};
29use uni_xervo::runtime::ModelRuntime;
30use uuid::Uuid;
31
32#[derive(Clone, Debug)]
33pub struct WriterConfig {
34 pub max_mutations: usize,
35}
36
37impl Default for WriterConfig {
38 fn default() -> Self {
39 Self {
40 max_mutations: 10_000,
41 }
42 }
43}
44
45pub struct Writer {
46 pub l0_manager: Arc<L0Manager>,
47 pub storage: Arc<StorageManager>,
48 pub schema_manager: Arc<uni_common::core::schema::SchemaManager>,
49 pub allocator: Arc<IdAllocator>,
50 pub config: UniConfig,
51 pub xervo_runtime: Option<Arc<ModelRuntime>>,
52 pub transaction_l0: Option<Arc<RwLock<L0Buffer>>>,
53 pub property_manager: Option<Arc<PropertyManager>>,
55 adjacency_manager: Arc<AdjacencyManager>,
57 last_flush_time: std::time::Instant,
59 compaction_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
61}
62
63impl Writer {
64 pub async fn new(
65 storage: Arc<StorageManager>,
66 schema_manager: Arc<uni_common::core::schema::SchemaManager>,
67 start_version: u64,
68 ) -> Result<Self> {
69 Self::new_with_config(
70 storage,
71 schema_manager,
72 start_version,
73 UniConfig::default(),
74 None,
75 None,
76 )
77 .await
78 }
79
80 pub async fn new_with_config(
81 storage: Arc<StorageManager>,
82 schema_manager: Arc<uni_common::core::schema::SchemaManager>,
83 start_version: u64,
84 config: UniConfig,
85 wal: Option<Arc<WriteAheadLog>>,
86 allocator: Option<Arc<IdAllocator>>,
87 ) -> Result<Self> {
88 let allocator = if let Some(a) = allocator {
89 a
90 } else {
91 let store = storage.store();
92 let path = object_store::path::Path::from("id_allocator.json");
93 Arc::new(IdAllocator::new(store, path, 1000).await?)
94 };
95
96 let l0_manager = Arc::new(L0Manager::new(start_version, wal));
97
98 let property_manager = Some(Arc::new(PropertyManager::new(
99 storage.clone(),
100 schema_manager.clone(),
101 1000,
102 )));
103
104 let adjacency_manager = storage.adjacency_manager();
105
106 Ok(Self {
107 l0_manager,
108 storage,
109 schema_manager,
110 allocator,
111 config,
112 xervo_runtime: None,
113 transaction_l0: None,
114 property_manager,
115 adjacency_manager,
116 last_flush_time: std::time::Instant::now(),
117 compaction_handle: Arc::new(RwLock::new(None)),
118 })
119 }
120
121 pub async fn replay_wal(&self, wal_high_water_mark: u64) -> Result<usize> {
123 let l0 = self.l0_manager.get_current();
124 let wal = l0.read().wal.clone();
125
126 if let Some(wal) = wal {
127 wal.initialize().await?;
128 let mutations = wal.replay_since(wal_high_water_mark).await?;
129 let count = mutations.len();
130
131 if count > 0 {
132 log::info!(
133 "Replaying {} mutations from WAL (LSN > {})",
134 count,
135 wal_high_water_mark
136 );
137 let mut l0_guard = l0.write();
138 l0_guard.replay_mutations(mutations)?;
139 }
140
141 Ok(count)
142 } else {
143 Ok(0)
144 }
145 }
146
147 pub async fn next_vid(&self) -> Result<Vid> {
149 self.allocator.allocate_vid().await
150 }
151
152 pub async fn allocate_vids(&self, count: usize) -> Result<Vec<Vid>> {
155 self.allocator.allocate_vids(count).await
156 }
157
158 pub async fn next_eid(&self, _type_id: u32) -> Result<Eid> {
160 self.allocator.allocate_eid().await
161 }
162
163 pub fn set_xervo_runtime(&mut self, runtime: Arc<ModelRuntime>) {
164 self.xervo_runtime = Some(runtime);
165 }
166
167 pub fn xervo_runtime(&self) -> Option<Arc<ModelRuntime>> {
168 self.xervo_runtime.clone()
169 }
170
171 pub fn begin_transaction(&mut self) -> Result<()> {
172 if self.transaction_l0.is_some() {
173 return Err(anyhow!("Transaction already active"));
174 }
175 let current_version = self.l0_manager.get_current().read().current_version;
176 self.transaction_l0 = Some(Arc::new(RwLock::new(L0Buffer::new(current_version, None))));
178 Ok(())
179 }
180
181 fn active_l0(&self) -> Arc<RwLock<L0Buffer>> {
184 self.transaction_l0
185 .clone()
186 .unwrap_or_else(|| self.l0_manager.get_current())
187 }
188
189 fn update_metrics(&self) {
190 let l0 = self.l0_manager.get_current();
191 let size = l0.read().estimated_size;
192 metrics::gauge!("l0_buffer_size_bytes").set(size as f64);
193
194 if let Some(tx_l0) = &self.transaction_l0 {
195 metrics::gauge!("active_transactions").set(1.0);
196 let tx_size = tx_l0.read().estimated_size;
197 metrics::gauge!("transaction_l0_size_bytes").set(tx_size as f64);
198 } else {
199 metrics::gauge!("active_transactions").set(0.0);
200 metrics::gauge!("transaction_l0_size_bytes").set(0.0);
201 }
202 }
203
204 pub async fn commit_transaction(&mut self) -> Result<()> {
205 let tx_l0_arc = self
207 .transaction_l0
208 .as_ref()
209 .ok_or_else(|| anyhow!("No active transaction"))?
210 .clone();
211
212 {
216 let tx_l0 = tx_l0_arc.read();
217 let main_l0_arc = self.l0_manager.get_current();
218 let main_l0 = main_l0_arc.read();
219
220 if let Some(wal) = main_l0.wal.as_ref() {
222 for (vid, properties) in &tx_l0.vertex_properties {
227 if !tx_l0.vertex_tombstones.contains(vid) {
228 let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
229 wal.append(&crate::runtime::wal::Mutation::InsertVertex {
230 vid: *vid,
231 properties: properties.clone(),
232 labels,
233 })?;
234 }
235 }
236
237 for vid in &tx_l0.vertex_tombstones {
239 let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
240 wal.append(&crate::runtime::wal::Mutation::DeleteVertex { vid: *vid, labels })?;
241 }
242
243 for (eid, (src_vid, dst_vid, edge_type)) in &tx_l0.edge_endpoints {
245 if tx_l0.tombstones.contains_key(eid) {
246 let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
248 wal.append(&crate::runtime::wal::Mutation::DeleteEdge {
249 eid: *eid,
250 src_vid: *src_vid,
251 dst_vid: *dst_vid,
252 edge_type: *edge_type,
253 version,
254 })?;
255 } else {
256 let properties =
258 tx_l0.edge_properties.get(eid).cloned().unwrap_or_default();
259 let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
260 let edge_type_name = tx_l0.edge_types.get(eid).cloned();
261 wal.append(&crate::runtime::wal::Mutation::InsertEdge {
262 src_vid: *src_vid,
263 dst_vid: *dst_vid,
264 edge_type: *edge_type,
265 eid: *eid,
266 version,
267 properties,
268 edge_type_name,
269 })?;
270 }
271 }
272 }
273 }
274
275 self.flush_wal().await?;
278
279 {
281 let tx_l0 = tx_l0_arc.read();
282 let main_l0_arc = self.l0_manager.get_current();
283 let mut main_l0 = main_l0_arc.write();
284 main_l0.merge(&tx_l0)?;
285
286 for (eid, (src, dst, etype)) in &tx_l0.edge_endpoints {
290 let edge_version = tx_l0
291 .edge_versions
292 .get(eid)
293 .copied()
294 .unwrap_or(main_l0.current_version);
295 if tx_l0.tombstones.contains_key(eid) {
296 self.adjacency_manager
297 .add_tombstone(*eid, *src, *dst, *etype, edge_version);
298 } else {
299 self.adjacency_manager
300 .insert_edge(*src, *dst, *eid, *etype, edge_version);
301 }
302 }
303 }
304
305 self.update_metrics();
306
307 self.transaction_l0 = None;
309
310 if let Err(e) = self.check_flush().await {
312 tracing::warn!("Post-commit flush check failed (non-critical): {}", e);
313 }
314
315 Ok(())
316 }
317
318 pub async fn flush_wal(&self) -> Result<()> {
320 let l0 = self.l0_manager.get_current();
321 let wal = l0.read().wal.clone();
322
323 if let Some(wal) = wal {
324 wal.flush().await?;
325 }
326 Ok(())
327 }
328
329 pub fn rollback_transaction(&mut self) -> Result<()> {
330 self.transaction_l0 = None;
332 Ok(())
333 }
334
335 pub fn force_rollback(&mut self) {
338 if self.transaction_l0.take().is_some() {
339 tracing::warn!("Force-rolled back leaked transaction");
340 }
341 }
342
343 async fn validate_vertex_constraints_for_label(
346 &self,
347 vid: Vid,
348 properties: &Properties,
349 label: &str,
350 ) -> Result<()> {
351 let schema = self.schema_manager.schema();
352
353 {
354 if let Some(props_meta) = schema.properties.get(label) {
356 for (prop_name, meta) in props_meta {
357 if !meta.nullable && properties.get(prop_name).is_none_or(|v| v.is_null()) {
358 log::warn!(
359 "Constraint violation: Property '{}' cannot be null for label '{}'",
360 prop_name,
361 label
362 );
363 return Err(anyhow!(
364 "Constraint violation: Property '{}' cannot be null",
365 prop_name
366 ));
367 }
368 }
369 }
370
371 for constraint in &schema.constraints {
373 if !constraint.enabled {
374 continue;
375 }
376 match &constraint.target {
377 ConstraintTarget::Label(l) if l == label => {}
378 _ => continue,
379 }
380
381 match &constraint.constraint_type {
382 ConstraintType::Unique {
383 properties: unique_props,
384 } => {
385 if !unique_props.is_empty() {
387 let mut key_values = Vec::new();
388 let mut missing = false;
389 for prop in unique_props {
390 if let Some(val) = properties.get(prop) {
391 key_values.push((prop.clone(), val.clone()));
392 } else {
393 missing = true; }
398 }
399
400 if !missing {
401 self.check_unique_constraint_multi(label, &key_values, vid)
402 .await?;
403 }
404 }
405 }
406 ConstraintType::Exists { property } => {
407 if properties.get(property).is_none_or(|v| v.is_null()) {
408 log::warn!(
409 "Constraint violation: Property '{}' must exist for label '{}'",
410 property,
411 label
412 );
413 return Err(anyhow!(
414 "Constraint violation: Property '{}' must exist",
415 property
416 ));
417 }
418 }
419 ConstraintType::Check { expression } => {
420 if !self.evaluate_check_constraint(expression, properties)? {
421 return Err(anyhow!(
422 "CHECK constraint '{}' violated: expression '{}' evaluated to false",
423 constraint.name,
424 expression
425 ));
426 }
427 }
428 _ => {
429 return Err(anyhow!("Unsupported constraint type"));
430 }
431 }
432 }
433 }
434 Ok(())
435 }
436
437 async fn validate_vertex_constraints(
441 &self,
442 vid: Vid,
443 properties: &Properties,
444 labels: &[String],
445 ) -> Result<()> {
446 let schema = self.schema_manager.schema();
447
448 for label in labels {
450 if schema.get_label_case_insensitive(label).is_none() {
452 continue;
453 }
454 self.validate_vertex_constraints_for_label(vid, properties, label)
455 .await?;
456 }
457
458 if let Some(ext_id) = properties.get("ext_id").and_then(|v| v.as_str()) {
460 self.check_extid_globally_unique(ext_id, vid).await?;
461 }
462
463 Ok(())
464 }
465
466 fn collect_constraint_keys_from_properties<'a>(
470 properties_iter: impl Iterator<Item = &'a Properties>,
471 label: &str,
472 constraints: &[uni_common::core::schema::Constraint],
473 existing_keys: &mut HashMap<String, HashSet<String>>,
474 existing_extids: &mut HashSet<String>,
475 ) {
476 for props in properties_iter {
477 if let Some(ext_id) = props.get("ext_id").and_then(|v| v.as_str()) {
478 existing_extids.insert(ext_id.to_string());
479 }
480
481 for constraint in constraints {
482 if !constraint.enabled {
483 continue;
484 }
485 if let ConstraintTarget::Label(l) = &constraint.target {
486 if l != label {
487 continue;
488 }
489 } else {
490 continue;
491 }
492
493 if let ConstraintType::Unique {
494 properties: unique_props,
495 } = &constraint.constraint_type
496 {
497 let mut key_parts = Vec::new();
498 let mut all_present = true;
499 for prop in unique_props {
500 if let Some(val) = props.get(prop) {
501 key_parts.push(format!("{}:{}", prop, val));
502 } else {
503 all_present = false;
504 break;
505 }
506 }
507 if all_present {
508 let key = key_parts.join("|");
509 existing_keys
510 .entry(constraint.name.clone())
511 .or_default()
512 .insert(key);
513 }
514 }
515 }
516 }
517 }
518
519 async fn validate_vertex_batch_constraints(
534 &self,
535 vids: &[Vid],
536 properties_batch: &[Properties],
537 label: &str,
538 ) -> Result<()> {
539 if vids.len() != properties_batch.len() {
540 return Err(anyhow!("VID/properties length mismatch"));
541 }
542
543 let schema = self.schema_manager.schema();
544
545 if let Some(props_meta) = schema.properties.get(label) {
547 for (idx, properties) in properties_batch.iter().enumerate() {
548 for (prop_name, meta) in props_meta {
549 if !meta.nullable && properties.get(prop_name).is_none_or(|v| v.is_null()) {
550 return Err(anyhow!(
551 "Constraint violation at index {}: Property '{}' cannot be null",
552 idx,
553 prop_name
554 ));
555 }
556 }
557 }
558 }
559
560 let mut existing_keys: HashMap<String, HashSet<String>> = HashMap::new();
562 let mut existing_extids: HashSet<String> = HashSet::new();
563
564 {
566 let l0 = self.l0_manager.get_current();
567 let l0_guard = l0.read();
568 Self::collect_constraint_keys_from_properties(
569 l0_guard.vertex_properties.values(),
570 label,
571 &schema.constraints,
572 &mut existing_keys,
573 &mut existing_extids,
574 );
575 }
576
577 if let Some(tx_l0) = &self.transaction_l0 {
579 let tx_l0_guard = tx_l0.read();
580 Self::collect_constraint_keys_from_properties(
581 tx_l0_guard.vertex_properties.values(),
582 label,
583 &schema.constraints,
584 &mut existing_keys,
585 &mut existing_extids,
586 );
587 }
588
589 let mut batch_keys: HashMap<String, HashMap<String, usize>> = HashMap::new();
591 let mut batch_extids: HashMap<String, usize> = HashMap::new();
592
593 for (idx, (_vid, properties)) in vids.iter().zip(properties_batch.iter()).enumerate() {
594 if let Some(ext_id) = properties.get("ext_id").and_then(|v| v.as_str()) {
596 if existing_extids.contains(ext_id) {
597 return Err(anyhow!(
598 "Constraint violation at index {}: ext_id '{}' already exists",
599 idx,
600 ext_id
601 ));
602 }
603 if let Some(first_idx) = batch_extids.get(ext_id) {
604 return Err(anyhow!(
605 "Constraint violation: ext_id '{}' duplicated in batch at indices {} and {}",
606 ext_id,
607 first_idx,
608 idx
609 ));
610 }
611 batch_extids.insert(ext_id.to_string(), idx);
612 }
613
614 for constraint in &schema.constraints {
616 if !constraint.enabled {
617 continue;
618 }
619 if let ConstraintTarget::Label(l) = &constraint.target {
620 if l != label {
621 continue;
622 }
623 } else {
624 continue;
625 }
626
627 match &constraint.constraint_type {
628 ConstraintType::Unique {
629 properties: unique_props,
630 } => {
631 let mut key_parts = Vec::new();
632 let mut all_present = true;
633 for prop in unique_props {
634 if let Some(val) = properties.get(prop) {
635 key_parts.push(format!("{}:{}", prop, val));
636 } else {
637 all_present = false;
638 break;
639 }
640 }
641
642 if all_present {
643 let key = key_parts.join("|");
644
645 if let Some(keys) = existing_keys.get(&constraint.name)
647 && keys.contains(&key)
648 {
649 return Err(anyhow!(
650 "Constraint violation at index {}: Duplicate composite key for label '{}' (constraint '{}')",
651 idx,
652 label,
653 constraint.name
654 ));
655 }
656
657 let batch_constraint_keys =
659 batch_keys.entry(constraint.name.clone()).or_default();
660 if let Some(first_idx) = batch_constraint_keys.get(&key) {
661 return Err(anyhow!(
662 "Constraint violation: Duplicate key '{}' in batch at indices {} and {}",
663 key,
664 first_idx,
665 idx
666 ));
667 }
668 batch_constraint_keys.insert(key, idx);
669 }
670 }
671 ConstraintType::Exists { property } => {
672 if properties.get(property).is_none_or(|v| v.is_null()) {
673 return Err(anyhow!(
674 "Constraint violation at index {}: Property '{}' must exist",
675 idx,
676 property
677 ));
678 }
679 }
680 ConstraintType::Check { expression } => {
681 if !self.evaluate_check_constraint(expression, properties)? {
682 return Err(anyhow!(
683 "Constraint violation at index {}: CHECK constraint '{}' violated",
684 idx,
685 constraint.name
686 ));
687 }
688 }
689 _ => {}
690 }
691 }
692 }
693
694 for constraint in &schema.constraints {
696 if !constraint.enabled {
697 continue;
698 }
699 if let ConstraintTarget::Label(l) = &constraint.target {
700 if l != label {
701 continue;
702 }
703 } else {
704 continue;
705 }
706
707 if let ConstraintType::Unique {
708 properties: unique_props,
709 } = &constraint.constraint_type
710 {
711 let mut or_filters = Vec::new();
713 for properties in properties_batch.iter() {
714 let mut and_parts = Vec::new();
715 let mut all_present = true;
716 for prop in unique_props {
717 if let Some(val) = properties.get(prop) {
718 let val_str = match val {
719 Value::String(s) => format!("'{}'", s.replace('\'', "''")),
720 Value::Int(n) => n.to_string(),
721 Value::Float(f) => f.to_string(),
722 Value::Bool(b) => b.to_string(),
723 _ => {
724 all_present = false;
725 break;
726 }
727 };
728 and_parts.push(format!("{} = {}", prop, val_str));
729 } else {
730 all_present = false;
731 break;
732 }
733 }
734 if all_present {
735 or_filters.push(format!("({})", and_parts.join(" AND ")));
736 }
737 }
738
739 if !or_filters.is_empty() {
740 let vid_list: Vec<String> =
741 vids.iter().map(|v| v.as_u64().to_string()).collect();
742 let filter = format!(
743 "({}) AND _deleted = false AND _vid NOT IN ({})",
744 or_filters.join(" OR "),
745 vid_list.join(", ")
746 );
747
748 if let Ok(ds) = self.storage.vertex_dataset(label)
749 && let Ok(lance_ds) = ds.open_raw().await
750 {
751 let count = lance_ds.count_rows(Some(filter.clone())).await?;
752 if count > 0 {
753 return Err(anyhow!(
754 "Constraint violation: Duplicate composite key for label '{}' in storage (constraint '{}')",
755 label,
756 constraint.name
757 ));
758 }
759 }
760 }
761 }
762 }
763
764 Ok(())
765 }
766
767 async fn check_extid_globally_unique(&self, ext_id: &str, current_vid: Vid) -> Result<()> {
776 let l0_buffers_to_check: Vec<Arc<RwLock<L0Buffer>>> = {
778 let mut buffers = vec![self.l0_manager.get_current()];
779 if let Some(tx_l0) = &self.transaction_l0 {
780 buffers.push(tx_l0.clone());
781 }
782 buffers.extend(self.l0_manager.get_pending_flush());
783 buffers
784 };
785
786 for l0 in &l0_buffers_to_check {
787 if let Some(vid) =
788 Self::find_extid_in_properties(&l0.read().vertex_properties, ext_id, current_vid)
789 {
790 return Err(anyhow!(
791 "Constraint violation: ext_id '{}' already exists (vertex {:?})",
792 ext_id,
793 vid
794 ));
795 }
796 }
797
798 let lancedb = self.storage.lancedb_store();
801 if let Ok(Some(found_vid)) = MainVertexDataset::find_by_ext_id(lancedb, ext_id, None).await
802 && found_vid != current_vid
803 {
804 return Err(anyhow!(
805 "Constraint violation: ext_id '{}' already exists (vertex {:?})",
806 ext_id,
807 found_vid
808 ));
809 }
810
811 Ok(())
812 }
813
814 fn find_extid_in_properties(
816 vertex_properties: &HashMap<Vid, Properties>,
817 ext_id: &str,
818 current_vid: Vid,
819 ) -> Option<Vid> {
820 vertex_properties.iter().find_map(|(&vid, props)| {
821 if vid != current_vid && props.get("ext_id").and_then(|v| v.as_str()) == Some(ext_id) {
822 Some(vid)
823 } else {
824 None
825 }
826 })
827 }
828
829 fn get_vertex_labels_from_l0(&self, vid: Vid) -> Option<Vec<String>> {
831 let l0 = self.l0_manager.get_current();
832 let l0_guard = l0.read();
833 if l0_guard.vertex_tombstones.contains(&vid) {
835 return None;
836 }
837 l0_guard.get_vertex_labels(vid).map(|l| l.to_vec())
838 }
839
840 pub async fn get_vertex_labels(&self, vid: Vid) -> Option<Vec<String>> {
844 if let Some(labels) = self.get_vertex_labels_from_l0(vid) {
846 return Some(labels);
847 }
848
849 if let Some(tx_l0) = &self.transaction_l0 {
851 let guard = tx_l0.read();
852 if guard.vertex_tombstones.contains(&vid) {
853 return None;
854 }
855 if let Some(labels) = guard.get_vertex_labels(vid) {
856 return Some(labels.to_vec());
857 }
858 }
859
860 for pending_l0 in self.l0_manager.get_pending_flush() {
862 let guard = pending_l0.read();
863 if guard.vertex_tombstones.contains(&vid) {
864 return None;
865 }
866 if let Some(labels) = guard.get_vertex_labels(vid) {
867 return Some(labels.to_vec());
868 }
869 }
870
871 self.find_vertex_labels_in_storage(vid).await.ok().flatten()
873 }
874
875 fn get_edge_type_from_l0(&self, eid: Eid) -> Option<String> {
877 let l0 = self.l0_manager.get_current();
878 let l0_guard = l0.read();
879 l0_guard.get_edge_type(eid).map(|s| s.to_string())
880 }
881
882 pub fn get_edge_type_id_from_l0(&self, eid: Eid) -> Option<u32> {
885 if let Some(tx_l0) = &self.transaction_l0 {
887 let guard = tx_l0.read();
888 if let Some((_, _, etype)) = guard.get_edge_endpoint_full(eid) {
889 return Some(etype);
890 }
891 }
892 let l0 = self.l0_manager.get_current();
894 let l0_guard = l0.read();
895 l0_guard
896 .get_edge_endpoint_full(eid)
897 .map(|(_, _, etype)| etype)
898 }
899
900 pub fn set_edge_type(&self, eid: Eid, type_name: String) {
903 self.active_l0().write().set_edge_type(eid, type_name);
904 }
905
906 fn evaluate_check_constraint(&self, expression: &str, properties: &Properties) -> Result<bool> {
909 let parts: Vec<&str> = expression.split_whitespace().collect();
910 if parts.len() != 3 {
911 log::warn!(
914 "Complex CHECK constraint expression '{}' not fully supported yet; allowing write.",
915 expression
916 );
917 return Ok(true);
918 }
919
920 let prop_part = parts[0].trim_start_matches('(');
921 let prop_name = if let Some(idx) = prop_part.find('.') {
923 &prop_part[idx + 1..]
924 } else {
925 prop_part
926 };
927
928 let op = parts[1];
929 let val_str = parts[2].trim_end_matches(')');
930
931 let prop_val = match properties.get(prop_name) {
932 Some(v) => v,
933 None => return Ok(true), };
935
936 let target_val = if (val_str.starts_with('\'') && val_str.ends_with('\''))
938 || (val_str.starts_with('"') && val_str.ends_with('"'))
939 {
940 Value::String(val_str[1..val_str.len() - 1].to_string())
941 } else if let Ok(n) = val_str.parse::<i64>() {
942 Value::Int(n)
943 } else if let Ok(n) = val_str.parse::<f64>() {
944 Value::Float(n)
945 } else if let Ok(b) = val_str.parse::<bool>() {
946 Value::Bool(b)
947 } else {
948 if val_str.starts_with("Number(") && val_str.ends_with(')') {
950 let n_str = &val_str[7..val_str.len() - 1];
951 if let Ok(n) = n_str.parse::<i64>() {
952 Value::Int(n)
953 } else if let Ok(n) = n_str.parse::<f64>() {
954 Value::Float(n)
955 } else {
956 Value::String(val_str.to_string())
957 }
958 } else {
959 Value::String(val_str.to_string())
960 }
961 };
962
963 match op {
964 "=" | "==" => Ok(prop_val == &target_val),
965 "!=" | "<>" => Ok(prop_val != &target_val),
966 ">" => self
967 .compare_values(prop_val, &target_val)
968 .map(|o| o.is_gt()),
969 "<" => self
970 .compare_values(prop_val, &target_val)
971 .map(|o| o.is_lt()),
972 ">=" => self
973 .compare_values(prop_val, &target_val)
974 .map(|o| o.is_ge()),
975 "<=" => self
976 .compare_values(prop_val, &target_val)
977 .map(|o| o.is_le()),
978 _ => {
979 log::warn!("Unsupported operator '{}' in CHECK constraint", op);
980 Ok(true)
981 }
982 }
983 }
984
985 fn compare_values(&self, a: &Value, b: &Value) -> Result<std::cmp::Ordering> {
986 use std::cmp::Ordering;
987
988 fn cmp_f64(x: f64, y: f64) -> Ordering {
989 x.partial_cmp(&y).unwrap_or(Ordering::Equal)
990 }
991
992 match (a, b) {
993 (Value::Int(n1), Value::Int(n2)) => Ok(n1.cmp(n2)),
994 (Value::Float(f1), Value::Float(f2)) => Ok(cmp_f64(*f1, *f2)),
995 (Value::Int(n), Value::Float(f)) => Ok(cmp_f64(*n as f64, *f)),
996 (Value::Float(f), Value::Int(n)) => Ok(cmp_f64(*f, *n as f64)),
997 (Value::String(s1), Value::String(s2)) => Ok(s1.cmp(s2)),
998 _ => Err(anyhow!(
999 "Cannot compare incompatible types: {:?} vs {:?}",
1000 a,
1001 b
1002 )),
1003 }
1004 }
1005
1006 async fn check_unique_constraint_multi(
1007 &self,
1008 label: &str,
1009 key_values: &[(String, Value)],
1010 current_vid: Vid,
1011 ) -> Result<()> {
1012 let key = serialize_constraint_key(label, key_values);
1014
1015 {
1017 let l0 = self.l0_manager.get_current();
1018 let l0_guard = l0.read();
1019 if l0_guard.has_constraint_key(&key, current_vid) {
1020 return Err(anyhow!(
1021 "Constraint violation: Duplicate composite key for label '{}'",
1022 label
1023 ));
1024 }
1025 }
1026
1027 if let Some(tx_l0) = &self.transaction_l0 {
1029 let tx_l0_guard = tx_l0.read();
1030 if tx_l0_guard.has_constraint_key(&key, current_vid) {
1031 return Err(anyhow!(
1032 "Constraint violation: Duplicate composite key for label '{}' (in tx)",
1033 label
1034 ));
1035 }
1036 }
1037
1038 let filters: Vec<String> = key_values
1040 .iter()
1041 .map(|(prop, val)| {
1042 let val_str = match val {
1043 Value::String(s) => format!("'{}'", s.replace('\'', "''")),
1044 Value::Int(n) => n.to_string(),
1045 Value::Float(f) => f.to_string(),
1046 Value::Bool(b) => b.to_string(),
1047 _ => "NULL".to_string(),
1048 };
1049 format!("{} = {}", prop, val_str)
1050 })
1051 .collect();
1052
1053 let mut filter = filters.join(" AND ");
1054 filter.push_str(&format!(
1055 " AND _deleted = false AND _vid != {}",
1056 current_vid.as_u64()
1057 ));
1058
1059 if let Ok(ds) = self.storage.vertex_dataset(label)
1060 && let Ok(lance_ds) = ds.open_raw().await
1061 {
1062 let count = lance_ds.count_rows(Some(filter.clone())).await?;
1063 if count > 0 {
1064 return Err(anyhow!(
1065 "Constraint violation: Duplicate composite key for label '{}' (in storage). Filter: {}",
1066 label,
1067 filter
1068 ));
1069 }
1070 }
1071
1072 Ok(())
1073 }
1074
1075 async fn check_write_pressure(&self) -> Result<()> {
1076 let status = self
1077 .storage
1078 .compaction_status()
1079 .map_err(|e| anyhow::anyhow!("Failed to get compaction status: {}", e))?;
1080 let l1_runs = status.l1_runs;
1081 let throttle = &self.config.throttle;
1082
1083 if l1_runs >= throttle.hard_limit {
1084 log::warn!("Write stalled: L1 runs ({}) at hard limit", l1_runs);
1085 while self
1087 .storage
1088 .compaction_status()
1089 .map_err(|e| anyhow::anyhow!("Failed to get compaction status: {}", e))?
1090 .l1_runs
1091 >= throttle.hard_limit
1092 {
1093 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1094 }
1095 } else if l1_runs >= throttle.soft_limit {
1096 let excess = l1_runs - throttle.soft_limit;
1097 let excess = std::cmp::min(excess, 31);
1099 let multiplier = 2_u32.pow(excess as u32);
1100 let delay = throttle.base_delay * multiplier;
1101 tokio::time::sleep(delay).await;
1102 }
1103 Ok(())
1104 }
1105
1106 fn check_transaction_memory(&self) -> Result<()> {
1109 if let Some(tx_l0) = &self.transaction_l0 {
1110 let size = tx_l0.read().estimated_size;
1111 if size > self.config.max_transaction_memory {
1112 return Err(anyhow!(
1113 "Transaction memory limit exceeded: {} bytes used, limit is {} bytes. \
1114 Roll back or commit the current transaction.",
1115 size,
1116 self.config.max_transaction_memory
1117 ));
1118 }
1119 }
1120 Ok(())
1121 }
1122
1123 async fn get_query_context(&self) -> Option<QueryContext> {
1124 Some(QueryContext::new_with_pending(
1125 self.l0_manager.get_current(),
1126 self.transaction_l0.clone(),
1127 self.l0_manager.get_pending_flush(),
1128 ))
1129 }
1130
1131 async fn prepare_vertex_upsert(
1140 &self,
1141 vid: Vid,
1142 properties: &mut Properties,
1143 label: Option<&str>,
1144 ) -> Result<()> {
1145 let Some(pm) = &self.property_manager else {
1146 return Ok(());
1147 };
1148
1149 let schema = self.schema_manager.schema();
1150
1151 let discovered_labels;
1153 let label_name = if let Some(l) = label {
1154 Some(l)
1155 } else {
1156 discovered_labels = self.get_vertex_labels(vid).await;
1157 discovered_labels
1158 .as_ref()
1159 .and_then(|l| l.first().map(|s| s.as_str()))
1160 };
1161
1162 let Some(label_str) = label_name else {
1163 return Ok(());
1164 };
1165 let Some(props_meta) = schema.properties.get(label_str) else {
1166 return Ok(());
1167 };
1168
1169 let crdt_keys: Vec<String> = properties
1171 .keys()
1172 .filter(|key| {
1173 props_meta.get(*key).is_some_and(|meta| {
1174 matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
1175 })
1176 })
1177 .cloned()
1178 .collect();
1179
1180 if crdt_keys.is_empty() {
1181 return Ok(());
1182 }
1183
1184 let ctx = self.get_query_context().await;
1185 for key in crdt_keys {
1186 let existing = pm.get_vertex_prop_with_ctx(vid, &key, ctx.as_ref()).await?;
1187 if !existing.is_null()
1188 && let Some(val) = properties.get_mut(&key)
1189 {
1190 *val = pm.merge_crdt_values(&existing, val)?;
1191 }
1192 }
1193
1194 Ok(())
1195 }
1196
1197 async fn prepare_edge_upsert(&self, eid: Eid, properties: &mut Properties) -> Result<()> {
1198 if let Some(pm) = &self.property_manager {
1199 let schema = self.schema_manager.schema();
1200 let type_name = self.get_edge_type_from_l0(eid);
1202
1203 if let Some(ref t_name) = type_name
1204 && let Some(props_meta) = schema.properties.get(t_name)
1205 {
1206 let mut crdt_keys = Vec::new();
1207 for (key, _) in properties.iter() {
1208 if let Some(meta) = props_meta.get(key)
1209 && matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
1210 {
1211 crdt_keys.push(key.clone());
1212 }
1213 }
1214
1215 if !crdt_keys.is_empty() {
1216 let ctx = self.get_query_context().await;
1217 for key in crdt_keys {
1218 let existing = pm.get_edge_prop(eid, &key, ctx.as_ref()).await?;
1219
1220 if !existing.is_null()
1221 && let Some(val) = properties.get_mut(&key)
1222 {
1223 *val = pm.merge_crdt_values(&existing, val)?;
1224 }
1225 }
1226 }
1227 }
1228 }
1229 Ok(())
1230 }
1231
1232 #[instrument(skip(self, properties), level = "trace")]
1233 pub async fn insert_vertex(&mut self, vid: Vid, properties: Properties) -> Result<()> {
1234 self.insert_vertex_with_labels(vid, properties, &[]).await?;
1235 Ok(())
1236 }
1237
1238 #[instrument(skip(self, properties, labels), level = "trace")]
1239 pub async fn insert_vertex_with_labels(
1240 &mut self,
1241 vid: Vid,
1242 mut properties: Properties,
1243 labels: &[String],
1244 ) -> Result<Properties> {
1245 let start = std::time::Instant::now();
1246 self.check_write_pressure().await?;
1247 self.check_transaction_memory()?;
1248 self.process_embeddings_for_labels(labels, &mut properties)
1249 .await?;
1250 self.validate_vertex_constraints(vid, &properties, labels)
1251 .await?;
1252 self.prepare_vertex_upsert(vid, &mut properties, labels.first().map(|s| s.as_str()))
1253 .await?;
1254
1255 let properties_copy = properties.clone();
1257 let labels_copy = labels.to_vec();
1258
1259 {
1260 let l0 = self.active_l0();
1261 let mut l0_guard = l0.write();
1262 l0_guard.insert_vertex_with_labels(vid, properties, labels);
1263
1264 let schema = self.schema_manager.schema();
1266 for label in &labels_copy {
1267 if schema.get_label_case_insensitive(label).is_none() {
1269 continue;
1270 }
1271
1272 for constraint in &schema.constraints {
1274 if !constraint.enabled {
1275 continue;
1276 }
1277 if let ConstraintTarget::Label(l) = &constraint.target {
1278 if l != label {
1279 continue;
1280 }
1281 } else {
1282 continue;
1283 }
1284
1285 if let ConstraintType::Unique {
1286 properties: unique_props,
1287 } = &constraint.constraint_type
1288 {
1289 let mut key_values = Vec::new();
1290 let mut all_present = true;
1291 for prop in unique_props {
1292 if let Some(val) = properties_copy.get(prop) {
1293 key_values.push((prop.clone(), val.clone()));
1294 } else {
1295 all_present = false;
1296 break;
1297 }
1298 }
1299
1300 if all_present {
1301 let key = serialize_constraint_key(label, &key_values);
1302 l0_guard.insert_constraint_key(key, vid);
1303 }
1304 }
1305 }
1306 }
1307 }
1308
1309 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
1310 self.update_metrics();
1311
1312 if self.transaction_l0.is_none() {
1313 self.check_flush().await?;
1314 }
1315 if start.elapsed().as_millis() > 100 {
1316 log::warn!("Slow insert_vertex: {}ms", start.elapsed().as_millis());
1317 }
1318 Ok(properties_copy)
1319 }
1320
1321 pub async fn insert_vertices_batch(
1347 &mut self,
1348 vids: Vec<Vid>,
1349 mut properties_batch: Vec<Properties>,
1350 labels: Vec<String>,
1351 ) -> Result<Vec<Properties>> {
1352 let start = std::time::Instant::now();
1353
1354 if vids.len() != properties_batch.len() {
1356 return Err(anyhow!(
1357 "VID/properties size mismatch: {} vids, {} properties",
1358 vids.len(),
1359 properties_batch.len()
1360 ));
1361 }
1362
1363 if vids.is_empty() {
1364 return Ok(Vec::new());
1365 }
1366
1367 let is_nested = self.transaction_l0.is_some();
1369 if !is_nested {
1370 self.begin_transaction()?;
1371 }
1372
1373 let result = async {
1375 self.check_write_pressure().await?;
1376 self.check_transaction_memory()?;
1377
1378 self.process_embeddings_for_batch(&labels, &mut properties_batch)
1380 .await?;
1381
1382 let label = labels
1384 .first()
1385 .ok_or_else(|| anyhow!("No labels provided"))?;
1386 self.validate_vertex_batch_constraints(&vids, &properties_batch, label)
1387 .await?;
1388
1389 let has_crdt_fields = {
1394 let schema = self.schema_manager.schema();
1395 schema
1396 .properties
1397 .get(label.as_str())
1398 .is_some_and(|props_meta| {
1399 props_meta.values().any(|meta| {
1400 matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
1401 })
1402 })
1403 };
1404
1405 if has_crdt_fields {
1406 let schema = self.schema_manager.schema();
1409 let crdt_keys: Vec<String> = schema
1410 .properties
1411 .get(label.as_str())
1412 .map(|props_meta| {
1413 props_meta
1414 .iter()
1415 .filter(|(_, meta)| {
1416 matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
1417 })
1418 .map(|(key, _)| key.clone())
1419 .collect()
1420 })
1421 .unwrap_or_default();
1422
1423 if let Some(pm) = &self.property_manager {
1424 let ctx = self.get_query_context().await;
1425 for (vid, props) in vids.iter().zip(&mut properties_batch) {
1426 for key in &crdt_keys {
1427 if props.contains_key(key) {
1428 let existing =
1429 pm.get_vertex_prop_with_ctx(*vid, key, ctx.as_ref()).await?;
1430 if !existing.is_null()
1431 && let Some(val) = props.get_mut(key)
1432 {
1433 *val = pm.merge_crdt_values(&existing, val)?;
1434 }
1435 }
1436 }
1437 }
1438 }
1439 }
1440
1441 let tx_l0 = self
1443 .transaction_l0
1444 .as_ref()
1445 .ok_or_else(|| anyhow!("Transaction L0 missing"))?;
1446
1447 let properties_result = properties_batch.clone();
1448 {
1449 let mut l0_guard = tx_l0.write();
1450 for (vid, props) in vids.iter().zip(properties_batch) {
1451 l0_guard.insert_vertex_with_labels(*vid, props, &labels);
1452 }
1453 }
1454
1455 metrics::counter!("uni_l0_buffer_mutations_total").increment(vids.len() as u64);
1457 self.update_metrics();
1458
1459 Ok::<Vec<Properties>, anyhow::Error>(properties_result)
1460 }
1461 .await;
1462
1463 match result {
1465 Ok(props) => {
1466 if !is_nested {
1468 self.commit_transaction().await?;
1469 }
1470
1471 if start.elapsed().as_millis() > 100 {
1472 log::warn!(
1473 "Slow insert_vertices_batch ({} vertices): {}ms",
1474 vids.len(),
1475 start.elapsed().as_millis()
1476 );
1477 }
1478
1479 Ok(props)
1480 }
1481 Err(e) => {
1482 if !is_nested {
1484 self.rollback_transaction()?;
1485 }
1486 Err(e)
1487 }
1488 }
1489 }
1490
1491 #[instrument(skip(self, labels), level = "trace")]
1502 pub async fn delete_vertex(&mut self, vid: Vid, labels: Option<Vec<String>>) -> Result<()> {
1503 let start = std::time::Instant::now();
1504 self.check_write_pressure().await?;
1505 self.check_transaction_memory()?;
1506 let l0 = self.active_l0();
1507
1508 let has_labels = {
1511 let l0_guard = l0.read();
1512 l0_guard.vertex_labels.contains_key(&vid)
1513 };
1514
1515 if !has_labels {
1516 let resolved_labels = if let Some(provided) = labels {
1517 Some(provided)
1519 } else {
1520 let mut found = None;
1522 for pending_l0 in self.l0_manager.get_pending_flush() {
1523 let pending_guard = pending_l0.read();
1524 if let Some(l) = pending_guard.get_vertex_labels(vid) {
1525 found = Some(l.to_vec());
1526 break;
1527 }
1528 }
1529 if found.is_none() {
1530 found = self.find_vertex_labels_in_storage(vid).await?;
1531 }
1532 found
1533 };
1534
1535 if let Some(found_labels) = resolved_labels {
1536 let mut l0_guard = l0.write();
1537 l0_guard.vertex_labels.insert(vid, found_labels);
1538 }
1539 }
1540
1541 l0.write().delete_vertex(vid)?;
1542 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
1543 self.update_metrics();
1544
1545 if self.transaction_l0.is_none() {
1546 self.check_flush().await?;
1547 }
1548 if start.elapsed().as_millis() > 100 {
1549 log::warn!("Slow delete_vertex: {}ms", start.elapsed().as_millis());
1550 }
1551 Ok(())
1552 }
1553
1554 async fn find_vertex_labels_in_storage(&self, vid: Vid) -> Result<Option<Vec<String>>> {
1557 use arrow_array::Array;
1558 use arrow_array::cast::AsArray;
1559 use lancedb::query::{ExecutableQuery, QueryBase, Select};
1560
1561 let lancedb_store = self.storage.lancedb_store();
1562 let table_name = MainVertexDataset::table_name();
1563
1564 if !lancedb_store.table_exists(table_name).await? {
1566 return Ok(None);
1567 }
1568
1569 let table = lancedb_store.open_table(table_name).await?;
1570
1571 let filter = format!("_vid = {}", vid.as_u64());
1573 let query = table.query().only_if(filter).select(Select::Columns(vec![
1574 "_vid".to_string(),
1575 "labels".to_string(),
1576 "_version".to_string(),
1577 "_deleted".to_string(),
1578 ]));
1579
1580 let stream = query.execute().await?;
1581 let batches: Vec<arrow_array::RecordBatch> = stream.try_collect().await.unwrap_or_default();
1582
1583 let mut max_version: Option<u64> = None;
1585 let mut labels: Option<Vec<String>> = None;
1586 let mut is_deleted = false;
1587
1588 for batch in batches {
1589 if batch.num_rows() == 0 {
1590 continue;
1591 }
1592
1593 let version_array = batch
1594 .column_by_name("_version")
1595 .unwrap()
1596 .as_primitive::<arrow_array::types::UInt64Type>();
1597
1598 let deleted_array = batch.column_by_name("_deleted").unwrap().as_boolean();
1599
1600 let labels_array = batch.column_by_name("labels").unwrap().as_list::<i32>();
1601
1602 for row_idx in 0..batch.num_rows() {
1603 let version = version_array.value(row_idx);
1604
1605 if max_version.is_none_or(|mv| version > mv) {
1606 is_deleted = deleted_array.value(row_idx);
1607
1608 let labels_list = labels_array.value(row_idx);
1609 let string_array = labels_list.as_string::<i32>();
1610 let vertex_labels: Vec<String> = (0..string_array.len())
1611 .filter(|&i| !string_array.is_null(i))
1612 .map(|i| string_array.value(i).to_string())
1613 .collect();
1614
1615 max_version = Some(version);
1616 labels = Some(vertex_labels);
1617 }
1618 }
1619 }
1620
1621 if is_deleted { Ok(None) } else { Ok(labels) }
1623 }
1624
1625 #[instrument(skip(self, properties), level = "trace")]
1626 pub async fn insert_edge(
1627 &mut self,
1628 src_vid: Vid,
1629 dst_vid: Vid,
1630 edge_type: u32,
1631 eid: Eid,
1632 mut properties: Properties,
1633 edge_type_name: Option<String>,
1634 ) -> Result<()> {
1635 let start = std::time::Instant::now();
1636 self.check_write_pressure().await?;
1637 self.check_transaction_memory()?;
1638 self.prepare_edge_upsert(eid, &mut properties).await?;
1639
1640 let l0 = self.active_l0();
1641 l0.write()
1642 .insert_edge(src_vid, dst_vid, edge_type, eid, properties, edge_type_name)?;
1643
1644 if self.transaction_l0.is_none() {
1647 let version = l0.read().current_version;
1648 self.adjacency_manager
1649 .insert_edge(src_vid, dst_vid, eid, edge_type, version);
1650 }
1651
1652 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
1653 self.update_metrics();
1654
1655 if self.transaction_l0.is_none() {
1656 self.check_flush().await?;
1657 }
1658 if start.elapsed().as_millis() > 100 {
1659 log::warn!("Slow insert_edge: {}ms", start.elapsed().as_millis());
1660 }
1661 Ok(())
1662 }
1663
1664 #[instrument(skip(self), level = "trace")]
1665 pub async fn delete_edge(
1666 &mut self,
1667 eid: Eid,
1668 src_vid: Vid,
1669 dst_vid: Vid,
1670 edge_type: u32,
1671 ) -> Result<()> {
1672 let start = std::time::Instant::now();
1673 self.check_write_pressure().await?;
1674 self.check_transaction_memory()?;
1675 let l0 = self.active_l0();
1676
1677 l0.write().delete_edge(eid, src_vid, dst_vid, edge_type)?;
1678
1679 if self.transaction_l0.is_none() {
1681 let version = l0.read().current_version;
1682 self.adjacency_manager
1683 .add_tombstone(eid, src_vid, dst_vid, edge_type, version);
1684 }
1685 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
1686 self.update_metrics();
1687
1688 if self.transaction_l0.is_none() {
1689 self.check_flush().await?;
1690 }
1691 if start.elapsed().as_millis() > 100 {
1692 log::warn!("Slow delete_edge: {}ms", start.elapsed().as_millis());
1693 }
1694 Ok(())
1695 }
1696
1697 pub async fn check_flush(&mut self) -> Result<()> {
1701 let count = self.l0_manager.get_current().read().mutation_count;
1702
1703 if count == 0 {
1705 return Ok(());
1706 }
1707
1708 if count >= self.config.auto_flush_threshold {
1710 self.flush_to_l1(None).await?;
1711 return Ok(());
1712 }
1713
1714 if let Some(interval) = self.config.auto_flush_interval
1716 && self.last_flush_time.elapsed() >= interval
1717 && count >= self.config.auto_flush_min_mutations
1718 {
1719 self.flush_to_l1(None).await?;
1720 }
1721
1722 Ok(())
1723 }
1724
1725 async fn process_embeddings_for_labels(
1728 &self,
1729 labels: &[String],
1730 properties: &mut Properties,
1731 ) -> Result<()> {
1732 let label_name = labels.first().map(|s| s.as_str());
1733 self.process_embeddings_impl(label_name, properties).await
1734 }
1735
1736 async fn process_embeddings_for_batch(
1746 &self,
1747 labels: &[String],
1748 properties_batch: &mut [Properties],
1749 ) -> Result<()> {
1750 let label_name = labels.first().map(|s| s.as_str());
1751 let schema = self.schema_manager.schema();
1752
1753 if let Some(label) = label_name {
1754 let mut configs = Vec::new();
1756 for idx in &schema.indexes {
1757 if let IndexDefinition::Vector(v_config) = idx
1758 && v_config.label == label
1759 && let Some(emb_config) = &v_config.embedding_config
1760 {
1761 configs.push((v_config.property.clone(), emb_config.clone()));
1762 }
1763 }
1764
1765 if configs.is_empty() {
1766 return Ok(());
1767 }
1768
1769 for (target_prop, emb_config) in configs {
1770 let mut input_texts: Vec<String> = Vec::new();
1772 let mut needs_embedding: Vec<usize> = Vec::new();
1773
1774 for (idx, properties) in properties_batch.iter().enumerate() {
1775 if properties.contains_key(&target_prop) {
1777 continue;
1778 }
1779
1780 let mut inputs = Vec::new();
1782 for src_prop in &emb_config.source_properties {
1783 if let Some(val) = properties.get(src_prop)
1784 && let Some(s) = val.as_str()
1785 {
1786 inputs.push(s.to_string());
1787 }
1788 }
1789
1790 if !inputs.is_empty() {
1791 let input_text = inputs.join(" ");
1792 input_texts.push(input_text);
1793 needs_embedding.push(idx);
1794 }
1795 }
1796
1797 if input_texts.is_empty() {
1798 continue;
1799 }
1800
1801 let runtime = self.xervo_runtime.as_ref().ok_or_else(|| {
1802 anyhow!("Uni-Xervo runtime not configured for auto-embedding")
1803 })?;
1804 let embedder = runtime.embedding(&emb_config.alias).await?;
1805
1806 let input_refs: Vec<&str> = input_texts.iter().map(|s| s.as_str()).collect();
1808 let embeddings = embedder.embed(input_refs).await?;
1809
1810 for (embedding_idx, &prop_idx) in needs_embedding.iter().enumerate() {
1812 if let Some(vec) = embeddings.get(embedding_idx) {
1813 let vals: Vec<Value> =
1814 vec.iter().map(|f| Value::Float(*f as f64)).collect();
1815 properties_batch[prop_idx].insert(target_prop.clone(), Value::List(vals));
1816 }
1817 }
1818 }
1819 }
1820
1821 Ok(())
1822 }
1823
1824 async fn process_embeddings_impl(
1825 &self,
1826 label_name: Option<&str>,
1827 properties: &mut Properties,
1828 ) -> Result<()> {
1829 let schema = self.schema_manager.schema();
1830
1831 if let Some(label) = label_name {
1832 let mut configs = Vec::new();
1834 for idx in &schema.indexes {
1835 if let IndexDefinition::Vector(v_config) = idx
1836 && v_config.label == label
1837 && let Some(emb_config) = &v_config.embedding_config
1838 {
1839 configs.push((v_config.property.clone(), emb_config.clone()));
1840 }
1841 }
1842
1843 if configs.is_empty() {
1844 log::info!("No embedding config found for label {}", label);
1845 }
1846
1847 for (target_prop, emb_config) in configs {
1848 if properties.contains_key(&target_prop) {
1850 continue;
1851 }
1852
1853 let mut inputs = Vec::new();
1855 for src_prop in &emb_config.source_properties {
1856 if let Some(val) = properties.get(src_prop)
1857 && let Some(s) = val.as_str()
1858 {
1859 inputs.push(s.to_string());
1860 }
1861 }
1862
1863 if inputs.is_empty() {
1864 continue;
1865 }
1866
1867 let input_text = inputs.join(" "); let runtime = self.xervo_runtime.as_ref().ok_or_else(|| {
1870 anyhow!("Uni-Xervo runtime not configured for auto-embedding")
1871 })?;
1872 let embedder = runtime.embedding(&emb_config.alias).await?;
1873
1874 let embeddings = embedder.embed(vec![input_text.as_str()]).await?;
1876 if let Some(vec) = embeddings.first() {
1877 let vals: Vec<Value> = vec.iter().map(|f| Value::Float(*f as f64)).collect();
1879 properties.insert(target_prop.clone(), Value::List(vals));
1880 }
1881 }
1882 }
1883 Ok(())
1884 }
1885
1886 #[instrument(
1896 skip(self),
1897 fields(snapshot_id, mutations_count, size_bytes),
1898 level = "info"
1899 )]
1900 pub async fn flush_to_l1(&mut self, name: Option<String>) -> Result<String> {
1901 let start = std::time::Instant::now();
1902 let schema = self.schema_manager.schema();
1903
1904 let (initial_size, initial_count) = {
1905 let l0_arc = self.l0_manager.get_current();
1906 let l0 = l0_arc.read();
1907 (l0.estimated_size, l0.mutation_count)
1908 };
1909 tracing::Span::current().record("size_bytes", initial_size);
1910 tracing::Span::current().record("mutations_count", initial_count);
1911
1912 debug!("Starting L0 flush to L1");
1913
1914 let wal_for_truncate = {
1919 let current_l0 = self.l0_manager.get_current();
1920 let l0_guard = current_l0.read();
1921 l0_guard.wal.clone()
1922 };
1923
1924 let wal_lsn = if let Some(ref w) = wal_for_truncate {
1925 w.flush().await?
1926 } else {
1927 0
1928 };
1929
1930 let old_l0_arc = self.l0_manager.begin_flush(0, None);
1934 metrics::counter!("uni_l0_buffer_rotations_total").increment(1);
1935
1936 let current_version;
1937 {
1938 let mut old_l0_guard = old_l0_arc.write();
1940 current_version = old_l0_guard.current_version;
1941
1942 old_l0_guard.wal_lsn_at_flush = wal_lsn;
1945
1946 let wal = old_l0_guard.wal.take();
1947
1948 let new_l0_arc = self.l0_manager.get_current();
1950 let mut new_l0_guard = new_l0_arc.write();
1951 new_l0_guard.wal = wal;
1952 new_l0_guard.current_version = current_version;
1953 } let mut entries_by_type: HashMap<u32, Vec<L1Entry>> = HashMap::new();
1957 type VertexEntry = (Vid, Vec<String>, Properties, bool, u64);
1959 let mut vertices_by_label: HashMap<u16, Vec<VertexEntry>> = HashMap::new();
1960 let mut vertex_created_at: HashMap<Vid, i64> = HashMap::new();
1962 let mut vertex_updated_at: HashMap<Vid, i64> = HashMap::new();
1963 let mut orphaned_tombstones: Vec<(Vid, u64)> = Vec::new();
1965
1966 {
1967 let old_l0 = old_l0_arc.read();
1968
1969 for edge in old_l0.graph.edges() {
1971 let properties = old_l0
1972 .edge_properties
1973 .get(&edge.eid)
1974 .cloned()
1975 .unwrap_or_default();
1976 let version = old_l0.edge_versions.get(&edge.eid).copied().unwrap_or(0);
1977
1978 let created_at = old_l0.edge_created_at.get(&edge.eid).copied();
1980 let updated_at = old_l0.edge_updated_at.get(&edge.eid).copied();
1981
1982 entries_by_type
1983 .entry(edge.edge_type)
1984 .or_default()
1985 .push(L1Entry {
1986 src_vid: edge.src_vid,
1987 dst_vid: edge.dst_vid,
1988 eid: edge.eid,
1989 op: Op::Insert,
1990 version,
1991 properties,
1992 created_at,
1993 updated_at,
1994 });
1995 }
1996
1997 for tombstone in old_l0.tombstones.values() {
1999 let version = old_l0
2000 .edge_versions
2001 .get(&tombstone.eid)
2002 .copied()
2003 .unwrap_or(0);
2004 let created_at = old_l0.edge_created_at.get(&tombstone.eid).copied();
2006 let updated_at = old_l0.edge_updated_at.get(&tombstone.eid).copied();
2007
2008 entries_by_type
2009 .entry(tombstone.edge_type)
2010 .or_default()
2011 .push(L1Entry {
2012 src_vid: tombstone.src_vid,
2013 dst_vid: tombstone.dst_vid,
2014 eid: tombstone.eid,
2015 op: Op::Delete,
2016 version,
2017 properties: HashMap::new(),
2018 created_at,
2019 updated_at,
2020 });
2021 }
2022
2023 let push_vertex_to_labels =
2029 |vid: Vid,
2030 all_labels: &[String],
2031 props: Properties,
2032 deleted: bool,
2033 version: u64,
2034 out: &mut HashMap<u16, Vec<VertexEntry>>| {
2035 for label in all_labels {
2036 if let Some(label_id) = schema.label_id_by_name(label) {
2037 out.entry(label_id).or_default().push((
2038 vid,
2039 all_labels.to_vec(),
2040 props.clone(),
2041 deleted,
2042 version,
2043 ));
2044 }
2045 }
2046 };
2047
2048 for (vid, props) in &old_l0.vertex_properties {
2049 let version = old_l0.vertex_versions.get(vid).copied().unwrap_or(0);
2050 if let Some(&ts) = old_l0.vertex_created_at.get(vid) {
2052 vertex_created_at.insert(*vid, ts);
2053 }
2054 if let Some(&ts) = old_l0.vertex_updated_at.get(vid) {
2055 vertex_updated_at.insert(*vid, ts);
2056 }
2057 if let Some(labels) = old_l0.vertex_labels.get(vid) {
2058 push_vertex_to_labels(
2059 *vid,
2060 labels,
2061 props.clone(),
2062 false,
2063 version,
2064 &mut vertices_by_label,
2065 );
2066 }
2067 }
2068 for &vid in &old_l0.vertex_tombstones {
2069 let version = old_l0.vertex_versions.get(&vid).copied().unwrap_or(0);
2070 if let Some(labels) = old_l0.vertex_labels.get(&vid) {
2071 push_vertex_to_labels(
2072 vid,
2073 labels,
2074 HashMap::new(),
2075 true,
2076 version,
2077 &mut vertices_by_label,
2078 );
2079 } else {
2080 orphaned_tombstones.push((vid, version));
2082 }
2083 }
2084 } if !orphaned_tombstones.is_empty() {
2088 tracing::warn!(
2089 count = orphaned_tombstones.len(),
2090 "Tombstones missing labels in L0, querying storage as fallback"
2091 );
2092 for (vid, version) in orphaned_tombstones {
2093 if let Ok(Some(labels)) = self.find_vertex_labels_in_storage(vid).await
2094 && !labels.is_empty()
2095 {
2096 for label in &labels {
2097 if let Some(label_id) = schema.label_id_by_name(label) {
2098 vertices_by_label.entry(label_id).or_default().push((
2099 vid,
2100 labels.clone(),
2101 HashMap::new(),
2102 true,
2103 version,
2104 ));
2105 }
2106 }
2107 }
2108 }
2109 }
2110
2111 let mut manifest = self
2113 .storage
2114 .snapshot_manager()
2115 .load_latest_snapshot()
2116 .await?
2117 .unwrap_or_else(|| {
2118 SnapshotManifest::new(Uuid::new_v4().to_string(), schema.schema_version)
2119 });
2120
2121 let parent_id = manifest.snapshot_id.clone();
2124 manifest.parent_snapshot = Some(parent_id);
2125 manifest.snapshot_id = Uuid::new_v4().to_string();
2126 manifest.name = name;
2127 manifest.created_at = Utc::now();
2128 manifest.version_high_water_mark = current_version;
2129 manifest.wal_high_water_mark = wal_lsn;
2130 let snapshot_id = manifest.snapshot_id.clone();
2131
2132 tracing::Span::current().record("snapshot_id", &snapshot_id);
2133
2134 let lancedb_store = self.storage.lancedb_store();
2136
2137 for (&edge_type_id, entries) in entries_by_type.iter() {
2138 let edge_type_name = self
2140 .storage
2141 .schema_manager()
2142 .edge_type_name_by_id_unified(edge_type_id)
2143 .ok_or_else(|| anyhow!("Edge type ID {} not found", edge_type_id))?;
2144
2145 let mut fwd_entries = entries.clone();
2147 fwd_entries.sort_by_key(|e| e.src_vid);
2148 let fwd_ds = self.storage.delta_dataset(&edge_type_name, "fwd")?;
2149 let fwd_batch = fwd_ds.build_record_batch(&fwd_entries, &schema)?;
2150
2151 let table = fwd_ds.write_run_lancedb(lancedb_store, fwd_batch).await?;
2153 fwd_ds.ensure_eid_index_lancedb(&table).await?;
2154
2155 let mut bwd_entries = entries.clone();
2157 bwd_entries.sort_by_key(|e| e.dst_vid);
2158 let bwd_ds = self.storage.delta_dataset(&edge_type_name, "bwd")?;
2159 let bwd_batch = bwd_ds.build_record_batch(&bwd_entries, &schema)?;
2160
2161 let bwd_table = bwd_ds.write_run_lancedb(lancedb_store, bwd_batch).await?;
2162 bwd_ds.ensure_eid_index_lancedb(&bwd_table).await?;
2163
2164 let current_snap =
2166 manifest
2167 .edges
2168 .entry(edge_type_name.to_string())
2169 .or_insert(EdgeSnapshot {
2170 version: 0,
2171 count: 0,
2172 lance_version: 0,
2173 });
2174 current_snap.version += 1;
2175 current_snap.count += entries.len() as u64;
2176 current_snap.lance_version = 0;
2178
2179 }
2182
2183 for (label_id, vertices) in vertices_by_label {
2185 let label_name = schema
2186 .label_name_by_id(label_id)
2187 .ok_or_else(|| anyhow!("Label ID {} not found", label_id))?;
2188
2189 let ds = self.storage.vertex_dataset(label_name)?;
2190
2191 type InvertedUpdateMap = HashMap<String, (HashMap<Vid, Vec<String>>, HashSet<Vid>)>;
2194 let mut inverted_updates: InvertedUpdateMap = HashMap::new();
2195
2196 for idx in &schema.indexes {
2197 if let IndexDefinition::Inverted(cfg) = idx
2198 && cfg.label == label_name
2199 {
2200 let mut added: HashMap<Vid, Vec<String>> = HashMap::new();
2201 let mut removed: HashSet<Vid> = HashSet::new();
2202
2203 for (vid, _labels, props, deleted, _version) in &vertices {
2204 if *deleted {
2205 removed.insert(*vid);
2206 } else if let Some(prop_value) = props.get(&cfg.property) {
2207 if let Some(arr) = prop_value.as_array() {
2209 let terms: Vec<String> = arr
2210 .iter()
2211 .filter_map(|v| v.as_str().map(ToString::to_string))
2212 .collect();
2213 if !terms.is_empty() {
2214 added.insert(*vid, terms);
2215 }
2216 }
2217 }
2218 }
2219
2220 if !added.is_empty() || !removed.is_empty() {
2221 inverted_updates.insert(cfg.property.clone(), (added, removed));
2222 }
2223 }
2224 }
2225
2226 let mut v_data = Vec::new();
2227 let mut d_data = Vec::new();
2228 let mut ver_data = Vec::new();
2229 for (vid, labels, props, deleted, version) in vertices {
2230 v_data.push((vid, labels, props));
2231 d_data.push(deleted);
2232 ver_data.push(version);
2233 }
2234
2235 let batch = ds.build_record_batch_with_timestamps(
2236 &v_data,
2237 &d_data,
2238 &ver_data,
2239 &schema,
2240 Some(&vertex_created_at),
2241 Some(&vertex_updated_at),
2242 )?;
2243
2244 let table = ds
2246 .write_batch_lancedb(lancedb_store, batch, &schema)
2247 .await?;
2248 ds.ensure_default_indexes_lancedb(&table).await?;
2249
2250 for ((vid, labels, _props), &deleted) in v_data.iter().zip(d_data.iter()) {
2252 if deleted {
2253 self.storage.remove_from_vid_labels_index(*vid);
2254 } else {
2255 self.storage.update_vid_labels_index(*vid, labels.clone());
2256 }
2257 }
2258
2259 let current_snap =
2261 manifest
2262 .vertices
2263 .entry(label_name.to_string())
2264 .or_insert(LabelSnapshot {
2265 version: 0,
2266 count: 0,
2267 lance_version: 0,
2268 });
2269 current_snap.version += 1;
2270 current_snap.count += v_data.len() as u64;
2271 current_snap.lance_version = 0;
2273
2274 self.storage.invalidate_table_cache(label_name);
2276
2277 for idx in &schema.indexes {
2279 if let IndexDefinition::Inverted(cfg) = idx
2280 && cfg.label == label_name
2281 && let Some((added, removed)) = inverted_updates.get(&cfg.property)
2282 {
2283 self.storage
2284 .index_manager()
2285 .update_inverted_index_incremental(cfg, added, removed)
2286 .await?;
2287 }
2288 }
2289
2290 let mut uid_mappings: Vec<(uni_common::core::id::UniId, Vid)> = Vec::new();
2293 for (vid, _labels, props) in &v_data {
2294 let ext_id = props.get("ext_id").and_then(|v| v.as_str());
2295 let uid = crate::storage::vertex::VertexDataset::compute_vertex_uid(
2296 label_name, ext_id, props,
2297 );
2298 uid_mappings.push((uid, *vid));
2299 }
2300
2301 if !uid_mappings.is_empty()
2302 && let Ok(uid_index) = self.storage.uid_index(label_name)
2303 {
2304 for (uid, vid) in &uid_mappings {
2309 if let Ok(Some(existing_vid)) = uid_index.get_vid(uid).await
2310 && existing_vid != *vid
2311 {
2312 anyhow::bail!(
2313 "UID collision detected: UID {:?} maps to both VID {} and VID {}. \
2314 This indicates either a hash collision (astronomically unlikely with SHA3-256) \
2315 or data corruption. Cannot proceed with flush.",
2316 uid,
2317 existing_vid.as_u64(),
2318 vid.as_u64()
2319 );
2320 }
2321 }
2322
2323 uid_index.write_mapping(&uid_mappings).await?;
2324 }
2325 }
2326
2327 let (main_edges, edge_created_at_map, edge_updated_at_map) = {
2331 let _old_l0 = old_l0_arc.read();
2332 let mut main_edges: Vec<(
2333 uni_common::core::id::Eid,
2334 Vid,
2335 Vid,
2336 String,
2337 Properties,
2338 bool,
2339 u64,
2340 )> = Vec::new();
2341 let mut edge_created_at_map: HashMap<uni_common::core::id::Eid, i64> = HashMap::new();
2342 let mut edge_updated_at_map: HashMap<uni_common::core::id::Eid, i64> = HashMap::new();
2343
2344 for (&edge_type_id, entries) in entries_by_type.iter() {
2345 for entry in entries {
2346 let edge_type_name = self
2348 .storage
2349 .schema_manager()
2350 .edge_type_name_by_id_unified(edge_type_id)
2351 .unwrap_or_else(|| "unknown".to_string());
2352
2353 let deleted = matches!(entry.op, Op::Delete);
2354 main_edges.push((
2355 entry.eid,
2356 entry.src_vid,
2357 entry.dst_vid,
2358 edge_type_name,
2359 entry.properties.clone(),
2360 deleted,
2361 entry.version,
2362 ));
2363
2364 if let Some(ts) = entry.created_at {
2365 edge_created_at_map.insert(entry.eid, ts);
2366 }
2367 if let Some(ts) = entry.updated_at {
2368 edge_updated_at_map.insert(entry.eid, ts);
2369 }
2370 }
2371 }
2372
2373 (main_edges, edge_created_at_map, edge_updated_at_map)
2374 }; if !main_edges.is_empty() {
2377 let main_edge_batch = MainEdgeDataset::build_record_batch(
2378 &main_edges,
2379 Some(&edge_created_at_map),
2380 Some(&edge_updated_at_map),
2381 )?;
2382 let main_edge_table =
2383 MainEdgeDataset::write_batch_lancedb(lancedb_store, main_edge_batch).await?;
2384 MainEdgeDataset::ensure_default_indexes_lancedb(&main_edge_table).await?;
2385 }
2386
2387 let main_vertices: Vec<(Vid, Vec<String>, Properties, bool, u64)> = {
2390 let old_l0 = old_l0_arc.read();
2391 let mut vertices = Vec::new();
2392
2393 for (vid, props) in &old_l0.vertex_properties {
2395 let version = old_l0.vertex_versions.get(vid).copied().unwrap_or(0);
2396 let labels = old_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
2397 vertices.push((*vid, labels, props.clone(), false, version));
2398 }
2399
2400 for &vid in &old_l0.vertex_tombstones {
2402 let version = old_l0.vertex_versions.get(&vid).copied().unwrap_or(0);
2403 let labels = old_l0.vertex_labels.get(&vid).cloned().unwrap_or_default();
2404 vertices.push((vid, labels, HashMap::new(), true, version));
2405 }
2406
2407 vertices
2408 }; if !main_vertices.is_empty() {
2411 let main_vertex_batch = MainVertexDataset::build_record_batch(
2412 &main_vertices,
2413 Some(&vertex_created_at),
2414 Some(&vertex_updated_at),
2415 )?;
2416 let main_vertex_table =
2417 MainVertexDataset::write_batch_lancedb(lancedb_store, main_vertex_batch).await?;
2418 MainVertexDataset::ensure_default_indexes_lancedb(&main_vertex_table).await?;
2419 }
2420
2421 self.storage
2423 .snapshot_manager()
2424 .save_snapshot(&manifest)
2425 .await?;
2426 self.storage
2427 .snapshot_manager()
2428 .set_latest_snapshot(&manifest.snapshot_id)
2429 .await?;
2430
2431 self.l0_manager.complete_flush(&old_l0_arc);
2434
2435 if let Some(w) = wal_for_truncate {
2438 let safe_lsn = self
2440 .l0_manager
2441 .min_pending_wal_lsn()
2442 .map(|min_pending| min_pending.min(wal_lsn))
2443 .unwrap_or(wal_lsn);
2444 w.truncate_before(safe_lsn).await?;
2445 }
2446
2447 if let Some(ref pm) = self.property_manager {
2450 pm.clear_cache().await;
2451 }
2452
2453 self.last_flush_time = std::time::Instant::now();
2455
2456 info!(
2457 snapshot_id,
2458 mutations_count = initial_count,
2459 size_bytes = initial_size,
2460 "L0 flush to L1 completed successfully"
2461 );
2462 metrics::histogram!("uni_flush_duration_seconds").record(start.elapsed().as_secs_f64());
2463 metrics::counter!("uni_flush_bytes_total").increment(initial_size as u64);
2464 metrics::counter!("uni_flush_rows_total").increment(initial_count as u64);
2465
2466 let am = self.adjacency_manager.clone();
2470 if am.should_compact(4) {
2471 let previous_still_running = {
2472 let guard = self.compaction_handle.read();
2473 guard.as_ref().is_some_and(|h| !h.is_finished())
2474 };
2475
2476 if previous_still_running {
2477 info!("Skipping compaction: previous compaction still in progress");
2478 } else {
2479 let handle = tokio::spawn(async move {
2480 am.compact();
2481 });
2482 *self.compaction_handle.write() = Some(handle);
2483 }
2484 }
2485
2486 Ok(snapshot_id)
2487 }
2488
2489 pub fn set_property_manager(&mut self, pm: Arc<PropertyManager>) {
2491 self.property_manager = Some(pm);
2492 }
2493}
2494
2495#[cfg(test)]
2496mod tests {
2497 use super::*;
2498 use tempfile::tempdir;
2499
2500 #[tokio::test]
2503 async fn test_commit_transaction_wal_before_merge() -> Result<()> {
2504 use crate::runtime::wal::WriteAheadLog;
2505 use crate::storage::manager::StorageManager;
2506 use object_store::local::LocalFileSystem;
2507 use object_store::path::Path as ObjectStorePath;
2508 use uni_common::core::schema::SchemaManager;
2509
2510 let dir = tempdir()?;
2511 let path = dir.path().to_str().unwrap();
2512 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2513 let schema_path = ObjectStorePath::from("schema.json");
2514
2515 let schema_manager =
2516 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
2517 let _label_id = schema_manager.add_label("Test")?;
2518 schema_manager.save().await?;
2519
2520 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
2521
2522 let wal_path = ObjectStorePath::from("wal");
2524 let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
2525
2526 let mut writer = Writer::new_with_config(
2527 storage.clone(),
2528 schema_manager.clone(),
2529 1,
2530 UniConfig::default(),
2531 Some(wal),
2532 None,
2533 )
2534 .await?;
2535
2536 writer.begin_transaction()?;
2538
2539 let vid_a = writer.next_vid().await?;
2541 let vid_b = writer.next_vid().await?;
2542
2543 let mut props = std::collections::HashMap::new();
2544 props.insert("test".to_string(), Value::String("data".to_string()));
2545
2546 writer
2547 .insert_vertex_with_labels(vid_a, props.clone(), &["Test".to_string()])
2548 .await?;
2549 writer
2550 .insert_vertex_with_labels(
2551 vid_b,
2552 std::collections::HashMap::new(),
2553 &["Test".to_string()],
2554 )
2555 .await?;
2556
2557 let eid = writer.next_eid(1).await?;
2558 writer
2559 .insert_edge(vid_a, vid_b, 1, eid, std::collections::HashMap::new(), None)
2560 .await?;
2561
2562 let l0 = writer.l0_manager.get_current();
2564 let wal = l0.read().wal.clone().expect("Main L0 should have WAL");
2565 let mutations_before = wal.replay().await?;
2566 let count_before = mutations_before.len();
2567
2568 writer.commit_transaction().await?;
2570
2571 let mutations_after = wal.replay().await?;
2573 assert!(
2574 mutations_after.len() > count_before,
2575 "WAL should contain transaction mutations after commit"
2576 );
2577
2578 let new_mutations: Vec<_> = mutations_after.into_iter().skip(count_before).collect();
2580
2581 let mut saw_vertex_a = false;
2582 let mut saw_vertex_b = false;
2583 let mut saw_edge = false;
2584
2585 for mutation in &new_mutations {
2586 match mutation {
2587 crate::runtime::wal::Mutation::InsertVertex { vid, .. } => {
2588 if *vid == vid_a {
2589 saw_vertex_a = true;
2590 }
2591 if *vid == vid_b {
2592 saw_vertex_b = true;
2593 }
2594 assert!(!saw_edge, "Vertices should be logged to WAL before edges");
2596 }
2597 crate::runtime::wal::Mutation::InsertEdge { eid: e, .. } => {
2598 if *e == eid {
2599 saw_edge = true;
2600 }
2601 assert!(
2603 saw_vertex_a && saw_vertex_b,
2604 "Edge should be logged after both vertices"
2605 );
2606 }
2607 _ => {}
2608 }
2609 }
2610
2611 assert!(saw_vertex_a, "Vertex A should be in WAL");
2612 assert!(saw_vertex_b, "Vertex B should be in WAL");
2613 assert!(saw_edge, "Edge should be in WAL");
2614
2615 let l0_read = l0.read();
2617 assert!(
2618 l0_read.vertex_properties.contains_key(&vid_a),
2619 "Vertex A should be in main L0"
2620 );
2621 assert!(
2622 l0_read.vertex_properties.contains_key(&vid_b),
2623 "Vertex B should be in main L0"
2624 );
2625 assert!(
2626 l0_read.edge_endpoints.contains_key(&eid),
2627 "Edge should be in main L0"
2628 );
2629
2630 Ok(())
2631 }
2632
2633 #[tokio::test]
2635 async fn test_commit_transaction_wal_failure_rollback() -> Result<()> {
2636 use crate::runtime::wal::WriteAheadLog;
2637 use crate::storage::manager::StorageManager;
2638 use object_store::local::LocalFileSystem;
2639 use object_store::path::Path as ObjectStorePath;
2640 use uni_common::core::schema::SchemaManager;
2641
2642 let dir = tempdir()?;
2643 let path = dir.path().to_str().unwrap();
2644 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2645 let schema_path = ObjectStorePath::from("schema.json");
2646
2647 let schema_manager =
2648 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
2649 let _label_id = schema_manager.add_label("Test")?;
2650 let _baseline_label_id = schema_manager.add_label("Baseline")?;
2651 let _txdata_label_id = schema_manager.add_label("TxData")?;
2652 schema_manager.save().await?;
2653
2654 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
2655
2656 let wal_path = ObjectStorePath::from("wal");
2658 let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
2659
2660 let mut writer = Writer::new_with_config(
2661 storage.clone(),
2662 schema_manager.clone(),
2663 1,
2664 UniConfig::default(),
2665 Some(wal),
2666 None,
2667 )
2668 .await?;
2669
2670 let baseline_vid = writer.next_vid().await?;
2672 writer
2673 .insert_vertex_with_labels(
2674 baseline_vid,
2675 [("baseline".to_string(), Value::Bool(true))]
2676 .into_iter()
2677 .collect(),
2678 &["Baseline".to_string()],
2679 )
2680 .await?;
2681
2682 writer.begin_transaction()?;
2684
2685 let tx_vid = writer.next_vid().await?;
2687 writer
2688 .insert_vertex_with_labels(
2689 tx_vid,
2690 [("tx_data".to_string(), Value::Bool(true))]
2691 .into_iter()
2692 .collect(),
2693 &["TxData".to_string()],
2694 )
2695 .await?;
2696
2697 let l0 = writer.l0_manager.get_current();
2699 let vertex_count_before = l0.read().vertex_properties.len();
2700
2701 writer.rollback_transaction()?;
2703
2704 let vertex_count_after = l0.read().vertex_properties.len();
2706 assert_eq!(
2707 vertex_count_before, vertex_count_after,
2708 "Main L0 should not change after rollback"
2709 );
2710
2711 assert!(
2713 l0.read().vertex_properties.contains_key(&baseline_vid),
2714 "Baseline data should remain"
2715 );
2716
2717 assert!(
2719 !l0.read().vertex_properties.contains_key(&tx_vid),
2720 "Transaction data should not be in main L0 after rollback"
2721 );
2722
2723 Ok(())
2724 }
2725
2726 #[tokio::test]
2729 async fn test_batch_insert_shared_labels() -> Result<()> {
2730 use crate::storage::manager::StorageManager;
2731 use object_store::local::LocalFileSystem;
2732 use object_store::path::Path as ObjectStorePath;
2733 use uni_common::core::schema::SchemaManager;
2734
2735 let dir = tempdir()?;
2736 let path = dir.path().to_str().unwrap();
2737 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2738 let schema_path = ObjectStorePath::from("schema.json");
2739
2740 let schema_manager =
2741 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
2742 let _label_id = schema_manager.add_label("Person")?;
2743 schema_manager.save().await?;
2744
2745 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
2746
2747 let mut writer = Writer::new(storage.clone(), schema_manager.clone(), 1).await?;
2748
2749 let labels = &["Person".to_string()];
2751
2752 let mut vids = Vec::new();
2754 for i in 0..100 {
2755 let vid = writer.next_vid().await?;
2756 let mut props = std::collections::HashMap::new();
2757 props.insert("id".to_string(), Value::Int(i));
2758 writer.insert_vertex_with_labels(vid, props, labels).await?;
2759 vids.push(vid);
2760 }
2761
2762 let l0 = writer.l0_manager.get_current();
2764 for vid in vids {
2765 let l0_guard = l0.read();
2766 let vertex_labels = l0_guard.vertex_labels.get(&vid);
2767 assert!(vertex_labels.is_some(), "Vertex should have labels");
2768 assert_eq!(
2769 vertex_labels.unwrap(),
2770 &vec!["Person".to_string()],
2771 "Labels should match"
2772 );
2773 }
2774
2775 Ok(())
2776 }
2777
2778 #[tokio::test]
2781 async fn test_estimated_size_tracks_mutations() -> Result<()> {
2782 use crate::storage::manager::StorageManager;
2783 use object_store::local::LocalFileSystem;
2784 use object_store::path::Path as ObjectStorePath;
2785 use uni_common::core::schema::SchemaManager;
2786
2787 let dir = tempdir()?;
2788 let path = dir.path().to_str().unwrap();
2789 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2790 let schema_path = ObjectStorePath::from("schema.json");
2791
2792 let schema_manager =
2793 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
2794 let _label_id = schema_manager.add_label("Test")?;
2795 schema_manager.save().await?;
2796
2797 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
2798
2799 let mut writer = Writer::new(storage.clone(), schema_manager.clone(), 1).await?;
2800
2801 let l0 = writer.l0_manager.get_current();
2802
2803 let initial_estimated = l0.read().estimated_size;
2805 let initial_actual = l0.read().size_bytes();
2806 assert_eq!(initial_estimated, 0, "Initial estimated_size should be 0");
2807 assert_eq!(initial_actual, 0, "Initial size_bytes should be 0");
2808
2809 let mut vids = Vec::new();
2811 for i in 0..10 {
2812 let vid = writer.next_vid().await?;
2813 let mut props = std::collections::HashMap::new();
2814 props.insert("name".to_string(), Value::String(format!("vertex_{}", i)));
2815 props.insert("index".to_string(), Value::Int(i));
2816 writer.insert_vertex_with_labels(vid, props, &[]).await?;
2817 vids.push(vid);
2818 }
2819
2820 let after_vertices_estimated = l0.read().estimated_size;
2822 let after_vertices_actual = l0.read().size_bytes();
2823 assert!(
2824 after_vertices_estimated > 0,
2825 "estimated_size should grow after insertions"
2826 );
2827
2828 let ratio = after_vertices_estimated as f64 / after_vertices_actual as f64;
2830 assert!(
2831 (0.5..=2.0).contains(&ratio),
2832 "estimated_size ({}) should be within 2x of size_bytes ({}), ratio: {}",
2833 after_vertices_estimated,
2834 after_vertices_actual,
2835 ratio
2836 );
2837
2838 let edge_type = 1u32;
2840 for i in 0..9 {
2841 let eid = writer.next_eid(edge_type).await?;
2842 writer
2843 .insert_edge(
2844 vids[i],
2845 vids[i + 1],
2846 edge_type,
2847 eid,
2848 std::collections::HashMap::new(),
2849 Some("NEXT".to_string()),
2850 )
2851 .await?;
2852 }
2853
2854 let after_edges_estimated = l0.read().estimated_size;
2856 let after_edges_actual = l0.read().size_bytes();
2857 assert!(
2858 after_edges_estimated > after_vertices_estimated,
2859 "estimated_size should grow after edge insertions"
2860 );
2861
2862 let ratio = after_edges_estimated as f64 / after_edges_actual as f64;
2864 assert!(
2865 (0.5..=2.0).contains(&ratio),
2866 "estimated_size ({}) should be within 2x of size_bytes ({}), ratio: {}",
2867 after_edges_estimated,
2868 after_edges_actual,
2869 ratio
2870 );
2871
2872 Ok(())
2873 }
2874}