1use crate::runtime::context::QueryContext;
5use crate::runtime::id_allocator::IdAllocator;
6use crate::runtime::l0::{L0Buffer, serialize_constraint_key};
7use crate::runtime::l0_manager::L0Manager;
8use crate::runtime::property_manager::PropertyManager;
9use crate::runtime::wal::WriteAheadLog;
10use crate::storage::adjacency_manager::AdjacencyManager;
11use crate::storage::delta::{L1Entry, Op};
12use crate::storage::main_edge::MainEdgeDataset;
13use crate::storage::main_vertex::MainVertexDataset;
14use crate::storage::manager::StorageManager;
15use anyhow::{Result, anyhow};
16use chrono::Utc;
17use metrics;
18use parking_lot::RwLock;
19use std::collections::{HashMap, HashSet};
20use std::sync::Arc;
21use tracing::{debug, info, instrument};
22use uni_common::Properties;
23use uni_common::Value;
24use uni_common::config::UniConfig;
25use uni_common::core::id::{Eid, Vid};
26use uni_common::core::schema::{ConstraintTarget, ConstraintType, IndexDefinition};
27use uni_common::core::snapshot::{EdgeSnapshot, LabelSnapshot, SnapshotManifest};
28use uni_xervo::runtime::ModelRuntime;
29use uuid::Uuid;
30
31#[derive(Clone, Debug)]
32pub struct WriterConfig {
33 pub max_mutations: usize,
34}
35
36impl Default for WriterConfig {
37 fn default() -> Self {
38 Self {
39 max_mutations: 10_000,
40 }
41 }
42}
43
44pub struct Writer {
45 pub l0_manager: Arc<L0Manager>,
46 pub storage: Arc<StorageManager>,
47 pub schema_manager: Arc<uni_common::core::schema::SchemaManager>,
48 pub allocator: Arc<IdAllocator>,
49 pub config: UniConfig,
50 pub xervo_runtime: Option<Arc<ModelRuntime>>,
51 pub property_manager: Option<Arc<PropertyManager>>,
53 adjacency_manager: Arc<AdjacencyManager>,
55 last_flush_time: std::time::Instant,
57 compaction_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
59 index_rebuild_manager: Option<Arc<crate::storage::index_rebuild::IndexRebuildManager>>,
61}
62
63impl Writer {
64 pub async fn new(
65 storage: Arc<StorageManager>,
66 schema_manager: Arc<uni_common::core::schema::SchemaManager>,
67 start_version: u64,
68 ) -> Result<Self> {
69 Self::new_with_config(
70 storage,
71 schema_manager,
72 start_version,
73 UniConfig::default(),
74 None,
75 None,
76 )
77 .await
78 }
79
80 pub async fn new_with_config(
81 storage: Arc<StorageManager>,
82 schema_manager: Arc<uni_common::core::schema::SchemaManager>,
83 start_version: u64,
84 config: UniConfig,
85 wal: Option<Arc<WriteAheadLog>>,
86 allocator: Option<Arc<IdAllocator>>,
87 ) -> Result<Self> {
88 let allocator = if let Some(a) = allocator {
89 a
90 } else {
91 let store = storage.store();
92 let path = object_store::path::Path::from("id_allocator.json");
93 Arc::new(IdAllocator::new(store, path, 1000).await?)
94 };
95
96 let l0_manager = Arc::new(L0Manager::new(start_version, wal));
97
98 let property_manager = Some(Arc::new(PropertyManager::new(
99 storage.clone(),
100 schema_manager.clone(),
101 1000,
102 )));
103
104 let adjacency_manager = storage.adjacency_manager();
105
106 Ok(Self {
107 l0_manager,
108 storage,
109 schema_manager,
110 allocator,
111 config,
112 xervo_runtime: None,
113 property_manager,
114 adjacency_manager,
115 last_flush_time: std::time::Instant::now(),
116 compaction_handle: Arc::new(RwLock::new(None)),
117 index_rebuild_manager: None,
118 })
119 }
120
121 pub fn set_index_rebuild_manager(
123 &mut self,
124 manager: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
125 ) {
126 self.index_rebuild_manager = Some(manager);
127 }
128
129 pub async fn replay_wal(&self, wal_high_water_mark: u64) -> Result<usize> {
131 let l0 = self.l0_manager.get_current();
132 let wal = l0.read().wal.clone();
133
134 if let Some(wal) = wal {
135 wal.initialize().await?;
136 let mutations = wal.replay_since(wal_high_water_mark).await?;
137 let count = mutations.len();
138
139 if count > 0 {
140 log::info!(
141 "Replaying {} mutations from WAL (LSN > {})",
142 count,
143 wal_high_water_mark
144 );
145 let mut l0_guard = l0.write();
146 l0_guard.replay_mutations(mutations)?;
147 }
148
149 Ok(count)
150 } else {
151 Ok(0)
152 }
153 }
154
155 pub async fn next_vid(&self) -> Result<Vid> {
157 self.allocator.allocate_vid().await
158 }
159
160 pub async fn allocate_vids(&self, count: usize) -> Result<Vec<Vid>> {
163 self.allocator.allocate_vids(count).await
164 }
165
166 pub async fn next_eid(&self, _type_id: u32) -> Result<Eid> {
168 self.allocator.allocate_eid().await
169 }
170
171 pub fn set_xervo_runtime(&mut self, runtime: Arc<ModelRuntime>) {
172 self.xervo_runtime = Some(runtime);
173 }
174
175 pub fn xervo_runtime(&self) -> Option<Arc<ModelRuntime>> {
176 self.xervo_runtime.clone()
177 }
178
179 pub fn create_transaction_l0(&self) -> Arc<RwLock<L0Buffer>> {
185 let current_version = self.l0_manager.get_current().read().current_version;
186 Arc::new(RwLock::new(L0Buffer::new(current_version, None)))
188 }
189
190 fn resolve_l0(&self, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) -> Arc<RwLock<L0Buffer>> {
195 tx_l0
196 .cloned()
197 .unwrap_or_else(|| self.l0_manager.get_current())
198 }
199
200 fn update_metrics(&self) {
201 let l0 = self.l0_manager.get_current();
202 let size = l0.read().estimated_size;
203 metrics::gauge!("l0_buffer_size_bytes").set(size as f64);
204 }
205
206 pub async fn commit_transaction_l0(&mut self, tx_l0_arc: Arc<RwLock<L0Buffer>>) -> Result<u64> {
212 {
215 let tx_l0 = tx_l0_arc.read();
216 let main_l0_arc = self.l0_manager.get_current();
217 let main_l0 = main_l0_arc.read();
218
219 if let Some(wal) = main_l0.wal.as_ref() {
221 for (vid, properties) in &tx_l0.vertex_properties {
225 if !tx_l0.vertex_tombstones.contains(vid) {
226 let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
227 wal.append(&crate::runtime::wal::Mutation::InsertVertex {
228 vid: *vid,
229 properties: properties.clone(),
230 labels,
231 })?;
232 }
233 }
234
235 for vid in &tx_l0.vertex_tombstones {
237 let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
238 wal.append(&crate::runtime::wal::Mutation::DeleteVertex { vid: *vid, labels })?;
239 }
240
241 for (eid, (src_vid, dst_vid, edge_type)) in &tx_l0.edge_endpoints {
243 if tx_l0.tombstones.contains_key(eid) {
244 let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
245 wal.append(&crate::runtime::wal::Mutation::DeleteEdge {
246 eid: *eid,
247 src_vid: *src_vid,
248 dst_vid: *dst_vid,
249 edge_type: *edge_type,
250 version,
251 })?;
252 } else {
253 let properties =
254 tx_l0.edge_properties.get(eid).cloned().unwrap_or_default();
255 let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
256 let edge_type_name = tx_l0.edge_types.get(eid).cloned();
257 wal.append(&crate::runtime::wal::Mutation::InsertEdge {
258 src_vid: *src_vid,
259 dst_vid: *dst_vid,
260 edge_type: *edge_type,
261 eid: *eid,
262 version,
263 properties,
264 edge_type_name,
265 })?;
266 }
267 }
268
269 for (eid, tombstone) in &tx_l0.tombstones {
273 if !tx_l0.edge_endpoints.contains_key(eid) {
274 let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
275 wal.append(&crate::runtime::wal::Mutation::DeleteEdge {
276 eid: *eid,
277 src_vid: tombstone.src_vid,
278 dst_vid: tombstone.dst_vid,
279 edge_type: tombstone.edge_type,
280 version,
281 })?;
282 }
283 }
284 }
285 }
286
287 let wal_lsn = self.flush_wal().await?;
289
290 {
292 let tx_l0 = tx_l0_arc.read();
293 let main_l0_arc = self.l0_manager.get_current();
294 let mut main_l0 = main_l0_arc.write();
295 main_l0.merge(&tx_l0)?;
296
297 for (eid, (src, dst, etype)) in &tx_l0.edge_endpoints {
299 let edge_version = tx_l0
300 .edge_versions
301 .get(eid)
302 .copied()
303 .unwrap_or(main_l0.current_version);
304 if tx_l0.tombstones.contains_key(eid) {
305 self.adjacency_manager
306 .add_tombstone(*eid, *src, *dst, *etype, edge_version);
307 } else {
308 self.adjacency_manager
309 .insert_edge(*src, *dst, *eid, *etype, edge_version);
310 }
311 }
312
313 for (eid, tombstone) in &tx_l0.tombstones {
316 if !tx_l0.edge_endpoints.contains_key(eid) {
317 let edge_version = tx_l0
318 .edge_versions
319 .get(eid)
320 .copied()
321 .unwrap_or(main_l0.current_version);
322 self.adjacency_manager.add_tombstone(
323 *eid,
324 tombstone.src_vid,
325 tombstone.dst_vid,
326 tombstone.edge_type,
327 edge_version,
328 );
329 }
330 }
331 }
332
333 self.update_metrics();
334
335 if let Err(e) = self.check_flush().await {
337 tracing::warn!("Post-commit flush check failed (non-critical): {}", e);
338 }
339
340 Ok(wal_lsn)
341 }
342
343 pub async fn flush_wal(&self) -> Result<u64> {
347 let l0 = self.l0_manager.get_current();
348 let wal = l0.read().wal.clone();
349
350 match wal {
351 Some(wal) => Ok(wal.flush().await?),
352 None => Ok(0),
353 }
354 }
355
356 pub fn track_properties_removed(&self, count: usize, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) {
360 if count == 0 {
361 return;
362 }
363 let l0 = self.resolve_l0(tx_l0);
364 l0.write().mutation_stats.properties_removed += count;
365 }
366
367 async fn validate_vertex_constraints_for_label(
370 &self,
371 vid: Vid,
372 properties: &Properties,
373 label: &str,
374 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
375 ) -> Result<()> {
376 let schema = self.schema_manager.schema();
377
378 {
379 if let Some(props_meta) = schema.properties.get(label) {
381 for (prop_name, meta) in props_meta {
382 if !meta.nullable && properties.get(prop_name).is_none_or(|v| v.is_null()) {
383 log::warn!(
384 "Constraint violation: Property '{}' cannot be null for label '{}'",
385 prop_name,
386 label
387 );
388 return Err(anyhow!(
389 "Constraint violation: Property '{}' cannot be null",
390 prop_name
391 ));
392 }
393 }
394 }
395
396 for constraint in &schema.constraints {
398 if !constraint.enabled {
399 continue;
400 }
401 match &constraint.target {
402 ConstraintTarget::Label(l) if l == label => {}
403 _ => continue,
404 }
405
406 match &constraint.constraint_type {
407 ConstraintType::Unique {
408 properties: unique_props,
409 } => {
410 if !unique_props.is_empty() {
412 let mut key_values = Vec::new();
413 let mut missing = false;
414 for prop in unique_props {
415 if let Some(val) = properties.get(prop) {
416 key_values.push((prop.clone(), val.clone()));
417 } else {
418 missing = true; }
423 }
424
425 if !missing {
426 self.check_unique_constraint_multi(label, &key_values, vid, tx_l0)
427 .await?;
428 }
429 }
430 }
431 ConstraintType::Exists { property } => {
432 if properties.get(property).is_none_or(|v| v.is_null()) {
433 log::warn!(
434 "Constraint violation: Property '{}' must exist for label '{}'",
435 property,
436 label
437 );
438 return Err(anyhow!(
439 "Constraint violation: Property '{}' must exist",
440 property
441 ));
442 }
443 }
444 ConstraintType::Check { expression } => {
445 if !self.evaluate_check_constraint(expression, properties)? {
446 return Err(anyhow!(
447 "CHECK constraint '{}' violated: expression '{}' evaluated to false",
448 constraint.name,
449 expression
450 ));
451 }
452 }
453 _ => {
454 return Err(anyhow!("Unsupported constraint type"));
455 }
456 }
457 }
458 }
459 Ok(())
460 }
461
462 async fn validate_vertex_constraints(
466 &self,
467 vid: Vid,
468 properties: &Properties,
469 labels: &[String],
470 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
471 ) -> Result<()> {
472 let schema = self.schema_manager.schema();
473
474 for label in labels {
476 if schema.get_label_case_insensitive(label).is_none() {
478 continue;
479 }
480 self.validate_vertex_constraints_for_label(vid, properties, label, tx_l0)
481 .await?;
482 }
483
484 if let Some(ext_id) = properties.get("ext_id").and_then(|v| v.as_str()) {
486 self.check_extid_globally_unique(ext_id, vid, tx_l0).await?;
487 }
488
489 Ok(())
490 }
491
492 fn collect_constraint_keys_from_properties<'a>(
496 properties_iter: impl Iterator<Item = &'a Properties>,
497 label: &str,
498 constraints: &[uni_common::core::schema::Constraint],
499 existing_keys: &mut HashMap<String, HashSet<String>>,
500 existing_extids: &mut HashSet<String>,
501 ) {
502 for props in properties_iter {
503 if let Some(ext_id) = props.get("ext_id").and_then(|v| v.as_str()) {
504 existing_extids.insert(ext_id.to_string());
505 }
506
507 for constraint in constraints {
508 if !constraint.enabled {
509 continue;
510 }
511 if let ConstraintTarget::Label(l) = &constraint.target {
512 if l != label {
513 continue;
514 }
515 } else {
516 continue;
517 }
518
519 if let ConstraintType::Unique {
520 properties: unique_props,
521 } = &constraint.constraint_type
522 {
523 let mut key_parts = Vec::new();
524 let mut all_present = true;
525 for prop in unique_props {
526 if let Some(val) = props.get(prop) {
527 key_parts.push(format!("{}:{}", prop, val));
528 } else {
529 all_present = false;
530 break;
531 }
532 }
533 if all_present {
534 let key = key_parts.join("|");
535 existing_keys
536 .entry(constraint.name.clone())
537 .or_default()
538 .insert(key);
539 }
540 }
541 }
542 }
543 }
544
545 async fn validate_vertex_batch_constraints(
560 &self,
561 vids: &[Vid],
562 properties_batch: &[Properties],
563 label: &str,
564 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
565 ) -> Result<()> {
566 if vids.len() != properties_batch.len() {
567 return Err(anyhow!("VID/properties length mismatch"));
568 }
569
570 let schema = self.schema_manager.schema();
571
572 if let Some(props_meta) = schema.properties.get(label) {
574 for (idx, properties) in properties_batch.iter().enumerate() {
575 for (prop_name, meta) in props_meta {
576 if !meta.nullable && properties.get(prop_name).is_none_or(|v| v.is_null()) {
577 return Err(anyhow!(
578 "Constraint violation at index {}: Property '{}' cannot be null",
579 idx,
580 prop_name
581 ));
582 }
583 }
584 }
585 }
586
587 let mut existing_keys: HashMap<String, HashSet<String>> = HashMap::new();
589 let mut existing_extids: HashSet<String> = HashSet::new();
590
591 {
593 let l0 = self.l0_manager.get_current();
594 let l0_guard = l0.read();
595 Self::collect_constraint_keys_from_properties(
596 l0_guard.vertex_properties.values(),
597 label,
598 &schema.constraints,
599 &mut existing_keys,
600 &mut existing_extids,
601 );
602 }
603
604 if let Some(tx_l0) = tx_l0 {
606 let tx_l0_guard = tx_l0.read();
607 Self::collect_constraint_keys_from_properties(
608 tx_l0_guard.vertex_properties.values(),
609 label,
610 &schema.constraints,
611 &mut existing_keys,
612 &mut existing_extids,
613 );
614 }
615
616 let mut batch_keys: HashMap<String, HashMap<String, usize>> = HashMap::new();
618 let mut batch_extids: HashMap<String, usize> = HashMap::new();
619
620 for (idx, (_vid, properties)) in vids.iter().zip(properties_batch.iter()).enumerate() {
621 if let Some(ext_id) = properties.get("ext_id").and_then(|v| v.as_str()) {
623 if existing_extids.contains(ext_id) {
624 return Err(anyhow!(
625 "Constraint violation at index {}: ext_id '{}' already exists",
626 idx,
627 ext_id
628 ));
629 }
630 if let Some(first_idx) = batch_extids.get(ext_id) {
631 return Err(anyhow!(
632 "Constraint violation: ext_id '{}' duplicated in batch at indices {} and {}",
633 ext_id,
634 first_idx,
635 idx
636 ));
637 }
638 batch_extids.insert(ext_id.to_string(), idx);
639 }
640
641 for constraint in &schema.constraints {
643 if !constraint.enabled {
644 continue;
645 }
646 if let ConstraintTarget::Label(l) = &constraint.target {
647 if l != label {
648 continue;
649 }
650 } else {
651 continue;
652 }
653
654 match &constraint.constraint_type {
655 ConstraintType::Unique {
656 properties: unique_props,
657 } => {
658 let mut key_parts = Vec::new();
659 let mut all_present = true;
660 for prop in unique_props {
661 if let Some(val) = properties.get(prop) {
662 key_parts.push(format!("{}:{}", prop, val));
663 } else {
664 all_present = false;
665 break;
666 }
667 }
668
669 if all_present {
670 let key = key_parts.join("|");
671
672 if let Some(keys) = existing_keys.get(&constraint.name)
674 && keys.contains(&key)
675 {
676 return Err(anyhow!(
677 "Constraint violation at index {}: Duplicate composite key for label '{}' (constraint '{}')",
678 idx,
679 label,
680 constraint.name
681 ));
682 }
683
684 let batch_constraint_keys =
686 batch_keys.entry(constraint.name.clone()).or_default();
687 if let Some(first_idx) = batch_constraint_keys.get(&key) {
688 return Err(anyhow!(
689 "Constraint violation: Duplicate key '{}' in batch at indices {} and {}",
690 key,
691 first_idx,
692 idx
693 ));
694 }
695 batch_constraint_keys.insert(key, idx);
696 }
697 }
698 ConstraintType::Exists { property } => {
699 if properties.get(property).is_none_or(|v| v.is_null()) {
700 return Err(anyhow!(
701 "Constraint violation at index {}: Property '{}' must exist",
702 idx,
703 property
704 ));
705 }
706 }
707 ConstraintType::Check { expression } => {
708 if !self.evaluate_check_constraint(expression, properties)? {
709 return Err(anyhow!(
710 "Constraint violation at index {}: CHECK constraint '{}' violated",
711 idx,
712 constraint.name
713 ));
714 }
715 }
716 _ => {}
717 }
718 }
719 }
720
721 for constraint in &schema.constraints {
723 if !constraint.enabled {
724 continue;
725 }
726 if let ConstraintTarget::Label(l) = &constraint.target {
727 if l != label {
728 continue;
729 }
730 } else {
731 continue;
732 }
733
734 if let ConstraintType::Unique {
735 properties: unique_props,
736 } = &constraint.constraint_type
737 {
738 let mut or_filters = Vec::new();
740 for properties in properties_batch.iter() {
741 let mut and_parts = Vec::new();
742 let mut all_present = true;
743 for prop in unique_props {
744 if let Some(val) = properties.get(prop) {
745 let val_str = match val {
746 Value::String(s) => format!("'{}'", s.replace('\'', "''")),
747 Value::Int(n) => n.to_string(),
748 Value::Float(f) => f.to_string(),
749 Value::Bool(b) => b.to_string(),
750 _ => {
751 all_present = false;
752 break;
753 }
754 };
755 and_parts.push(format!("{} = {}", prop, val_str));
756 } else {
757 all_present = false;
758 break;
759 }
760 }
761 if all_present {
762 or_filters.push(format!("({})", and_parts.join(" AND ")));
763 }
764 }
765
766 #[cfg(feature = "lance-backend")]
767 if !or_filters.is_empty() {
768 let vid_list: Vec<String> =
769 vids.iter().map(|v| v.as_u64().to_string()).collect();
770 let filter = format!(
771 "({}) AND _deleted = false AND _vid NOT IN ({})",
772 or_filters.join(" OR "),
773 vid_list.join(", ")
774 );
775
776 if let Ok(ds) = self.storage.vertex_dataset(label)
777 && let Ok(lance_ds) = ds.open_raw().await
778 {
779 let count = lance_ds.count_rows(Some(filter.clone())).await?;
780 if count > 0 {
781 return Err(anyhow!(
782 "Constraint violation: Duplicate composite key for label '{}' in storage (constraint '{}')",
783 label,
784 constraint.name
785 ));
786 }
787 }
788 }
789 }
790 }
791
792 Ok(())
793 }
794
795 async fn check_extid_globally_unique(
804 &self,
805 ext_id: &str,
806 current_vid: Vid,
807 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
808 ) -> Result<()> {
809 let l0_buffers_to_check: Vec<Arc<RwLock<L0Buffer>>> = {
811 let mut buffers = vec![self.l0_manager.get_current()];
812 if let Some(tx_l0) = tx_l0 {
813 buffers.push(tx_l0.clone());
814 }
815 buffers.extend(self.l0_manager.get_pending_flush());
816 buffers
817 };
818
819 for l0 in &l0_buffers_to_check {
820 if let Some(vid) =
821 Self::find_extid_in_properties(&l0.read().vertex_properties, ext_id, current_vid)
822 {
823 return Err(anyhow!(
824 "Constraint violation: ext_id '{}' already exists (vertex {:?})",
825 ext_id,
826 vid
827 ));
828 }
829 }
830
831 let backend = self.storage.backend();
834 if let Ok(Some(found_vid)) = MainVertexDataset::find_by_ext_id(backend, ext_id, None).await
835 && found_vid != current_vid
836 {
837 return Err(anyhow!(
838 "Constraint violation: ext_id '{}' already exists (vertex {:?})",
839 ext_id,
840 found_vid
841 ));
842 }
843
844 Ok(())
845 }
846
847 fn find_extid_in_properties(
849 vertex_properties: &HashMap<Vid, Properties>,
850 ext_id: &str,
851 current_vid: Vid,
852 ) -> Option<Vid> {
853 vertex_properties.iter().find_map(|(&vid, props)| {
854 if vid != current_vid && props.get("ext_id").and_then(|v| v.as_str()) == Some(ext_id) {
855 Some(vid)
856 } else {
857 None
858 }
859 })
860 }
861
862 fn get_vertex_labels_from_l0(&self, vid: Vid) -> Option<Vec<String>> {
864 let l0 = self.l0_manager.get_current();
865 let l0_guard = l0.read();
866 if l0_guard.vertex_tombstones.contains(&vid) {
868 return None;
869 }
870 l0_guard.get_vertex_labels(vid).map(|l| l.to_vec())
871 }
872
873 pub async fn get_vertex_labels(
877 &self,
878 vid: Vid,
879 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
880 ) -> Option<Vec<String>> {
881 if let Some(labels) = self.get_vertex_labels_from_l0(vid) {
883 return Some(labels);
884 }
885
886 if let Some(tx_l0) = tx_l0 {
888 let guard = tx_l0.read();
889 if guard.vertex_tombstones.contains(&vid) {
890 return None;
891 }
892 if let Some(labels) = guard.get_vertex_labels(vid) {
893 return Some(labels.to_vec());
894 }
895 }
896
897 for pending_l0 in self.l0_manager.get_pending_flush() {
899 let guard = pending_l0.read();
900 if guard.vertex_tombstones.contains(&vid) {
901 return None;
902 }
903 if let Some(labels) = guard.get_vertex_labels(vid) {
904 return Some(labels.to_vec());
905 }
906 }
907
908 self.find_vertex_labels_in_storage(vid).await.ok().flatten()
910 }
911
912 fn get_edge_type_from_l0(&self, eid: Eid) -> Option<String> {
914 let l0 = self.l0_manager.get_current();
915 let l0_guard = l0.read();
916 l0_guard.get_edge_type(eid).map(|s| s.to_string())
917 }
918
919 pub fn get_edge_type_id_from_l0(
922 &self,
923 eid: Eid,
924 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
925 ) -> Option<u32> {
926 if let Some(tx_l0) = tx_l0 {
928 let guard = tx_l0.read();
929 if let Some((_, _, etype)) = guard.get_edge_endpoint_full(eid) {
930 return Some(etype);
931 }
932 }
933 let l0 = self.l0_manager.get_current();
935 let l0_guard = l0.read();
936 l0_guard
937 .get_edge_endpoint_full(eid)
938 .map(|(_, _, etype)| etype)
939 }
940
941 pub fn set_edge_type(
944 &self,
945 eid: Eid,
946 type_name: String,
947 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
948 ) {
949 self.resolve_l0(tx_l0).write().set_edge_type(eid, type_name);
950 }
951
952 fn evaluate_check_constraint(&self, expression: &str, properties: &Properties) -> Result<bool> {
955 let parts: Vec<&str> = expression.split_whitespace().collect();
956 if parts.len() != 3 {
957 log::warn!(
960 "Complex CHECK constraint expression '{}' not fully supported yet; allowing write.",
961 expression
962 );
963 return Ok(true);
964 }
965
966 let prop_part = parts[0].trim_start_matches('(');
967 let prop_name = if let Some(idx) = prop_part.find('.') {
969 &prop_part[idx + 1..]
970 } else {
971 prop_part
972 };
973
974 let op = parts[1];
975 let val_str = parts[2].trim_end_matches(')');
976
977 let prop_val = match properties.get(prop_name) {
978 Some(v) => v,
979 None => return Ok(true), };
981
982 let target_val = if (val_str.starts_with('\'') && val_str.ends_with('\''))
984 || (val_str.starts_with('"') && val_str.ends_with('"'))
985 {
986 Value::String(val_str[1..val_str.len() - 1].to_string())
987 } else if let Ok(n) = val_str.parse::<i64>() {
988 Value::Int(n)
989 } else if let Ok(n) = val_str.parse::<f64>() {
990 Value::Float(n)
991 } else if let Ok(b) = val_str.parse::<bool>() {
992 Value::Bool(b)
993 } else {
994 if val_str.starts_with("Number(") && val_str.ends_with(')') {
996 let n_str = &val_str[7..val_str.len() - 1];
997 if let Ok(n) = n_str.parse::<i64>() {
998 Value::Int(n)
999 } else if let Ok(n) = n_str.parse::<f64>() {
1000 Value::Float(n)
1001 } else {
1002 Value::String(val_str.to_string())
1003 }
1004 } else {
1005 Value::String(val_str.to_string())
1006 }
1007 };
1008
1009 match op {
1010 "=" | "==" => Ok(prop_val == &target_val),
1011 "!=" | "<>" => Ok(prop_val != &target_val),
1012 ">" => self
1013 .compare_values(prop_val, &target_val)
1014 .map(|o| o.is_gt()),
1015 "<" => self
1016 .compare_values(prop_val, &target_val)
1017 .map(|o| o.is_lt()),
1018 ">=" => self
1019 .compare_values(prop_val, &target_val)
1020 .map(|o| o.is_ge()),
1021 "<=" => self
1022 .compare_values(prop_val, &target_val)
1023 .map(|o| o.is_le()),
1024 _ => {
1025 log::warn!("Unsupported operator '{}' in CHECK constraint", op);
1026 Ok(true)
1027 }
1028 }
1029 }
1030
1031 fn compare_values(&self, a: &Value, b: &Value) -> Result<std::cmp::Ordering> {
1032 use std::cmp::Ordering;
1033
1034 fn cmp_f64(x: f64, y: f64) -> Ordering {
1035 x.partial_cmp(&y).unwrap_or(Ordering::Equal)
1036 }
1037
1038 match (a, b) {
1039 (Value::Int(n1), Value::Int(n2)) => Ok(n1.cmp(n2)),
1040 (Value::Float(f1), Value::Float(f2)) => Ok(cmp_f64(*f1, *f2)),
1041 (Value::Int(n), Value::Float(f)) => Ok(cmp_f64(*n as f64, *f)),
1042 (Value::Float(f), Value::Int(n)) => Ok(cmp_f64(*f, *n as f64)),
1043 (Value::String(s1), Value::String(s2)) => Ok(s1.cmp(s2)),
1044 _ => Err(anyhow!(
1045 "Cannot compare incompatible types: {:?} vs {:?}",
1046 a,
1047 b
1048 )),
1049 }
1050 }
1051
1052 async fn check_unique_constraint_multi(
1053 &self,
1054 label: &str,
1055 key_values: &[(String, Value)],
1056 current_vid: Vid,
1057 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1058 ) -> Result<()> {
1059 let key = serialize_constraint_key(label, key_values);
1061
1062 {
1064 let l0 = self.l0_manager.get_current();
1065 let l0_guard = l0.read();
1066 if l0_guard.has_constraint_key(&key, current_vid) {
1067 return Err(anyhow!(
1068 "Constraint violation: Duplicate composite key for label '{}'",
1069 label
1070 ));
1071 }
1072 }
1073
1074 if let Some(tx_l0) = tx_l0 {
1076 let tx_l0_guard = tx_l0.read();
1077 if tx_l0_guard.has_constraint_key(&key, current_vid) {
1078 return Err(anyhow!(
1079 "Constraint violation: Duplicate composite key for label '{}' (in tx)",
1080 label
1081 ));
1082 }
1083 }
1084
1085 let filters: Vec<String> = key_values
1087 .iter()
1088 .map(|(prop, val)| {
1089 let val_str = match val {
1090 Value::String(s) => format!("'{}'", s.replace('\'', "''")),
1091 Value::Int(n) => n.to_string(),
1092 Value::Float(f) => f.to_string(),
1093 Value::Bool(b) => b.to_string(),
1094 _ => "NULL".to_string(),
1095 };
1096 format!("{} = {}", prop, val_str)
1097 })
1098 .collect();
1099
1100 let mut filter = filters.join(" AND ");
1101 filter.push_str(&format!(
1102 " AND _deleted = false AND _vid != {}",
1103 current_vid.as_u64()
1104 ));
1105
1106 #[cfg(feature = "lance-backend")]
1107 if let Ok(ds) = self.storage.vertex_dataset(label)
1108 && let Ok(lance_ds) = ds.open_raw().await
1109 {
1110 let count = lance_ds.count_rows(Some(filter.clone())).await?;
1111 if count > 0 {
1112 return Err(anyhow!(
1113 "Constraint violation: Duplicate composite key for label '{}' (in storage). Filter: {}",
1114 label,
1115 filter
1116 ));
1117 }
1118 }
1119
1120 Ok(())
1121 }
1122
1123 async fn check_write_pressure(&self) -> Result<()> {
1124 let status = self
1125 .storage
1126 .compaction_status()
1127 .map_err(|e| anyhow::anyhow!("Failed to get compaction status: {}", e))?;
1128 let l1_runs = status.l1_runs;
1129 let throttle = &self.config.throttle;
1130
1131 if l1_runs >= throttle.hard_limit {
1132 log::warn!("Write stalled: L1 runs ({}) at hard limit", l1_runs);
1133 while self
1135 .storage
1136 .compaction_status()
1137 .map_err(|e| anyhow::anyhow!("Failed to get compaction status: {}", e))?
1138 .l1_runs
1139 >= throttle.hard_limit
1140 {
1141 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1142 }
1143 } else if l1_runs >= throttle.soft_limit {
1144 let excess = l1_runs - throttle.soft_limit;
1145 let excess = std::cmp::min(excess, 31);
1147 let multiplier = 2_u32.pow(excess as u32);
1148 let delay = throttle.base_delay * multiplier;
1149 tokio::time::sleep(delay).await;
1150 }
1151 Ok(())
1152 }
1153
1154 fn check_transaction_memory(&self, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) -> Result<()> {
1157 if let Some(tx_l0) = tx_l0 {
1158 let size = tx_l0.read().estimated_size;
1159 if size > self.config.max_transaction_memory {
1160 return Err(anyhow!(
1161 "Transaction memory limit exceeded: {} bytes used, limit is {} bytes. \
1162 Roll back or commit the current transaction.",
1163 size,
1164 self.config.max_transaction_memory
1165 ));
1166 }
1167 }
1168 Ok(())
1169 }
1170
1171 async fn get_query_context(
1172 &self,
1173 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1174 ) -> Option<QueryContext> {
1175 Some(QueryContext::new_with_pending(
1176 self.l0_manager.get_current(),
1177 tx_l0.cloned(),
1178 self.l0_manager.get_pending_flush(),
1179 ))
1180 }
1181
1182 async fn prepare_vertex_upsert(
1191 &self,
1192 vid: Vid,
1193 properties: &mut Properties,
1194 label: Option<&str>,
1195 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1196 ) -> Result<()> {
1197 let Some(pm) = &self.property_manager else {
1198 return Ok(());
1199 };
1200
1201 let schema = self.schema_manager.schema();
1202
1203 let discovered_labels;
1205 let label_name = if let Some(l) = label {
1206 Some(l)
1207 } else {
1208 discovered_labels = self.get_vertex_labels(vid, tx_l0).await;
1209 discovered_labels
1210 .as_ref()
1211 .and_then(|l| l.first().map(|s| s.as_str()))
1212 };
1213
1214 let Some(label_str) = label_name else {
1215 return Ok(());
1216 };
1217 let Some(props_meta) = schema.properties.get(label_str) else {
1218 return Ok(());
1219 };
1220
1221 let crdt_keys: Vec<String> = properties
1223 .keys()
1224 .filter(|key| {
1225 props_meta.get(*key).is_some_and(|meta| {
1226 matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
1227 })
1228 })
1229 .cloned()
1230 .collect();
1231
1232 if crdt_keys.is_empty() {
1233 return Ok(());
1234 }
1235
1236 let ctx = self.get_query_context(tx_l0).await;
1237 for key in crdt_keys {
1238 let existing = pm.get_vertex_prop_with_ctx(vid, &key, ctx.as_ref()).await?;
1239 if !existing.is_null()
1240 && let Some(val) = properties.get_mut(&key)
1241 {
1242 *val = pm.merge_crdt_values(&existing, val)?;
1243 }
1244 }
1245
1246 Ok(())
1247 }
1248
1249 async fn prepare_edge_upsert(
1250 &self,
1251 eid: Eid,
1252 properties: &mut Properties,
1253 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1254 ) -> Result<()> {
1255 if let Some(pm) = &self.property_manager {
1256 let schema = self.schema_manager.schema();
1257 let type_name = self.get_edge_type_from_l0(eid);
1259
1260 if let Some(ref t_name) = type_name
1261 && let Some(props_meta) = schema.properties.get(t_name)
1262 {
1263 let mut crdt_keys = Vec::new();
1264 for (key, _) in properties.iter() {
1265 if let Some(meta) = props_meta.get(key)
1266 && matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
1267 {
1268 crdt_keys.push(key.clone());
1269 }
1270 }
1271
1272 if !crdt_keys.is_empty() {
1273 let ctx = self.get_query_context(tx_l0).await;
1274 for key in crdt_keys {
1275 let existing = pm.get_edge_prop(eid, &key, ctx.as_ref()).await?;
1276
1277 if !existing.is_null()
1278 && let Some(val) = properties.get_mut(&key)
1279 {
1280 *val = pm.merge_crdt_values(&existing, val)?;
1281 }
1282 }
1283 }
1284 }
1285 }
1286 Ok(())
1287 }
1288
1289 #[instrument(skip(self, properties), level = "trace")]
1290 pub async fn insert_vertex(
1291 &mut self,
1292 vid: Vid,
1293 properties: Properties,
1294 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1295 ) -> Result<()> {
1296 self.insert_vertex_with_labels(vid, properties, &[], tx_l0)
1297 .await?;
1298 Ok(())
1299 }
1300
1301 #[instrument(skip(self, properties, labels), level = "trace")]
1302 pub async fn insert_vertex_with_labels(
1303 &mut self,
1304 vid: Vid,
1305 mut properties: Properties,
1306 labels: &[String],
1307 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1308 ) -> Result<Properties> {
1309 let start = std::time::Instant::now();
1310 self.check_write_pressure().await?;
1311 self.check_transaction_memory(tx_l0)?;
1312 self.process_embeddings_for_labels(labels, &mut properties)
1313 .await?;
1314 self.validate_vertex_constraints(vid, &properties, labels, tx_l0)
1315 .await?;
1316 self.prepare_vertex_upsert(
1317 vid,
1318 &mut properties,
1319 labels.first().map(|s| s.as_str()),
1320 tx_l0,
1321 )
1322 .await?;
1323
1324 let properties_copy = properties.clone();
1326 let labels_copy = labels.to_vec();
1327
1328 {
1329 let l0 = self.resolve_l0(tx_l0);
1330 let mut l0_guard = l0.write();
1331 l0_guard.insert_vertex_with_labels(vid, properties, labels);
1332
1333 let schema = self.schema_manager.schema();
1335 for label in &labels_copy {
1336 if schema.get_label_case_insensitive(label).is_none() {
1338 continue;
1339 }
1340
1341 for constraint in &schema.constraints {
1343 if !constraint.enabled {
1344 continue;
1345 }
1346 if let ConstraintTarget::Label(l) = &constraint.target {
1347 if l != label {
1348 continue;
1349 }
1350 } else {
1351 continue;
1352 }
1353
1354 if let ConstraintType::Unique {
1355 properties: unique_props,
1356 } = &constraint.constraint_type
1357 {
1358 let mut key_values = Vec::new();
1359 let mut all_present = true;
1360 for prop in unique_props {
1361 if let Some(val) = properties_copy.get(prop) {
1362 key_values.push((prop.clone(), val.clone()));
1363 } else {
1364 all_present = false;
1365 break;
1366 }
1367 }
1368
1369 if all_present {
1370 let key = serialize_constraint_key(label, &key_values);
1371 l0_guard.insert_constraint_key(key, vid);
1372 }
1373 }
1374 }
1375 }
1376 }
1377
1378 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
1379 self.update_metrics();
1380
1381 if tx_l0.is_none() {
1382 self.check_flush().await?;
1383 }
1384 if start.elapsed().as_millis() > 100 {
1385 log::warn!("Slow insert_vertex: {}ms", start.elapsed().as_millis());
1386 }
1387 Ok(properties_copy)
1388 }
1389
1390 pub async fn insert_vertices_batch(
1416 &mut self,
1417 vids: Vec<Vid>,
1418 mut properties_batch: Vec<Properties>,
1419 labels: Vec<String>,
1420 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1421 ) -> Result<Vec<Properties>> {
1422 let start = std::time::Instant::now();
1423
1424 if vids.len() != properties_batch.len() {
1426 return Err(anyhow!(
1427 "VID/properties size mismatch: {} vids, {} properties",
1428 vids.len(),
1429 properties_batch.len()
1430 ));
1431 }
1432
1433 if vids.is_empty() {
1434 return Ok(Vec::new());
1435 }
1436
1437 let result = async {
1440 self.check_write_pressure().await?;
1441 self.check_transaction_memory(tx_l0)?;
1442
1443 self.process_embeddings_for_batch(&labels, &mut properties_batch)
1445 .await?;
1446
1447 let label = labels
1449 .first()
1450 .ok_or_else(|| anyhow!("No labels provided"))?;
1451 self.validate_vertex_batch_constraints(&vids, &properties_batch, label, tx_l0)
1452 .await?;
1453
1454 let has_crdt_fields = {
1459 let schema = self.schema_manager.schema();
1460 schema
1461 .properties
1462 .get(label.as_str())
1463 .is_some_and(|props_meta| {
1464 props_meta.values().any(|meta| {
1465 matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
1466 })
1467 })
1468 };
1469
1470 if has_crdt_fields {
1471 let schema = self.schema_manager.schema();
1474 let crdt_keys: Vec<String> = schema
1475 .properties
1476 .get(label.as_str())
1477 .map(|props_meta| {
1478 props_meta
1479 .iter()
1480 .filter(|(_, meta)| {
1481 matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
1482 })
1483 .map(|(key, _)| key.clone())
1484 .collect()
1485 })
1486 .unwrap_or_default();
1487
1488 if let Some(pm) = &self.property_manager {
1489 let ctx = self.get_query_context(tx_l0).await;
1490 for (vid, props) in vids.iter().zip(&mut properties_batch) {
1491 for key in &crdt_keys {
1492 if props.contains_key(key) {
1493 let existing =
1494 pm.get_vertex_prop_with_ctx(*vid, key, ctx.as_ref()).await?;
1495 if !existing.is_null()
1496 && let Some(val) = props.get_mut(key)
1497 {
1498 *val = pm.merge_crdt_values(&existing, val)?;
1499 }
1500 }
1501 }
1502 }
1503 }
1504 }
1505
1506 let target_l0 = self.resolve_l0(tx_l0);
1508
1509 let properties_result = properties_batch.clone();
1510 {
1511 let mut l0_guard = target_l0.write();
1512 for (vid, props) in vids.iter().zip(properties_batch.iter()) {
1513 l0_guard.insert_vertex_with_labels(*vid, props.clone(), &labels);
1514 }
1515 }
1516
1517 metrics::counter!("uni_l0_buffer_mutations_total").increment(vids.len() as u64);
1519 self.update_metrics();
1520
1521 Ok::<Vec<Properties>, anyhow::Error>(properties_result)
1522 }
1523 .await;
1524
1525 let props = result?;
1526
1527 if start.elapsed().as_millis() > 100 {
1528 log::warn!(
1529 "Slow insert_vertices_batch ({} vertices): {}ms",
1530 vids.len(),
1531 start.elapsed().as_millis()
1532 );
1533 }
1534
1535 Ok(props)
1536 }
1537
1538 #[instrument(skip(self, labels), level = "trace")]
1549 pub async fn delete_vertex(
1550 &mut self,
1551 vid: Vid,
1552 labels: Option<Vec<String>>,
1553 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1554 ) -> Result<()> {
1555 let start = std::time::Instant::now();
1556 self.check_write_pressure().await?;
1557 self.check_transaction_memory(tx_l0)?;
1558 let l0 = self.resolve_l0(tx_l0);
1559
1560 let has_labels = {
1563 let l0_guard = l0.read();
1564 l0_guard.vertex_labels.contains_key(&vid)
1565 };
1566
1567 if !has_labels {
1568 let resolved_labels = if let Some(provided) = labels {
1569 Some(provided)
1571 } else {
1572 let mut found = None;
1574 for pending_l0 in self.l0_manager.get_pending_flush() {
1575 let pending_guard = pending_l0.read();
1576 if let Some(l) = pending_guard.get_vertex_labels(vid) {
1577 found = Some(l.to_vec());
1578 break;
1579 }
1580 }
1581 if found.is_none() {
1582 found = self.find_vertex_labels_in_storage(vid).await?;
1583 }
1584 found
1585 };
1586
1587 if let Some(found_labels) = resolved_labels {
1588 let mut l0_guard = l0.write();
1589 l0_guard.vertex_labels.insert(vid, found_labels);
1590 }
1591 }
1592
1593 l0.write().delete_vertex(vid)?;
1594 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
1595 self.update_metrics();
1596
1597 if tx_l0.is_none() {
1598 self.check_flush().await?;
1599 }
1600 if start.elapsed().as_millis() > 100 {
1601 log::warn!("Slow delete_vertex: {}ms", start.elapsed().as_millis());
1602 }
1603 Ok(())
1604 }
1605
1606 async fn find_vertex_labels_in_storage(&self, vid: Vid) -> Result<Option<Vec<String>>> {
1609 use crate::backend::types::ScanRequest;
1610 use arrow_array::Array;
1611 use arrow_array::cast::AsArray;
1612
1613 let backend = self.storage.backend();
1614 let table_name = MainVertexDataset::table_name();
1615
1616 if !backend.table_exists(table_name).await? {
1618 return Ok(None);
1619 }
1620
1621 let filter = format!("_vid = {}", vid.as_u64());
1623 let batches = backend
1624 .scan(
1625 ScanRequest::all(table_name)
1626 .with_filter(filter)
1627 .with_columns(vec![
1628 "_vid".to_string(),
1629 "labels".to_string(),
1630 "_version".to_string(),
1631 "_deleted".to_string(),
1632 ]),
1633 )
1634 .await
1635 .unwrap_or_default();
1636
1637 let mut max_version: Option<u64> = None;
1639 let mut labels: Option<Vec<String>> = None;
1640 let mut is_deleted = false;
1641
1642 for batch in batches {
1643 if batch.num_rows() == 0 {
1644 continue;
1645 }
1646
1647 let version_array = batch
1648 .column_by_name("_version")
1649 .unwrap()
1650 .as_primitive::<arrow_array::types::UInt64Type>();
1651
1652 let deleted_array = batch.column_by_name("_deleted").unwrap().as_boolean();
1653
1654 let labels_array = batch.column_by_name("labels").unwrap().as_list::<i32>();
1655
1656 for row_idx in 0..batch.num_rows() {
1657 let version = version_array.value(row_idx);
1658
1659 if max_version.is_none_or(|mv| version > mv) {
1660 is_deleted = deleted_array.value(row_idx);
1661
1662 let labels_list = labels_array.value(row_idx);
1663 let string_array = labels_list.as_string::<i32>();
1664 let vertex_labels: Vec<String> = (0..string_array.len())
1665 .filter(|&i| !string_array.is_null(i))
1666 .map(|i| string_array.value(i).to_string())
1667 .collect();
1668
1669 max_version = Some(version);
1670 labels = Some(vertex_labels);
1671 }
1672 }
1673 }
1674
1675 if is_deleted { Ok(None) } else { Ok(labels) }
1677 }
1678
1679 #[expect(clippy::too_many_arguments)]
1680 #[instrument(skip(self, properties), level = "trace")]
1681 pub async fn insert_edge(
1682 &mut self,
1683 src_vid: Vid,
1684 dst_vid: Vid,
1685 edge_type: u32,
1686 eid: Eid,
1687 mut properties: Properties,
1688 edge_type_name: Option<String>,
1689 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1690 ) -> Result<()> {
1691 let start = std::time::Instant::now();
1692 self.check_write_pressure().await?;
1693 self.check_transaction_memory(tx_l0)?;
1694 self.prepare_edge_upsert(eid, &mut properties, tx_l0)
1695 .await?;
1696
1697 let l0 = self.resolve_l0(tx_l0);
1698 l0.write()
1699 .insert_edge(src_vid, dst_vid, edge_type, eid, properties, edge_type_name)?;
1700
1701 if tx_l0.is_none() {
1704 let version = l0.read().current_version;
1705 self.adjacency_manager
1706 .insert_edge(src_vid, dst_vid, eid, edge_type, version);
1707 }
1708
1709 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
1710 self.update_metrics();
1711
1712 if tx_l0.is_none() {
1713 self.check_flush().await?;
1714 }
1715 if start.elapsed().as_millis() > 100 {
1716 log::warn!("Slow insert_edge: {}ms", start.elapsed().as_millis());
1717 }
1718 Ok(())
1719 }
1720
1721 #[instrument(skip(self), level = "trace")]
1722 pub async fn delete_edge(
1723 &mut self,
1724 eid: Eid,
1725 src_vid: Vid,
1726 dst_vid: Vid,
1727 edge_type: u32,
1728 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1729 ) -> Result<()> {
1730 let start = std::time::Instant::now();
1731 self.check_write_pressure().await?;
1732 self.check_transaction_memory(tx_l0)?;
1733 let l0 = self.resolve_l0(tx_l0);
1734
1735 l0.write().delete_edge(eid, src_vid, dst_vid, edge_type)?;
1736
1737 if tx_l0.is_none() {
1739 let version = l0.read().current_version;
1740 self.adjacency_manager
1741 .add_tombstone(eid, src_vid, dst_vid, edge_type, version);
1742 }
1743 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
1744 self.update_metrics();
1745
1746 if tx_l0.is_none() {
1747 self.check_flush().await?;
1748 }
1749 if start.elapsed().as_millis() > 100 {
1750 log::warn!("Slow delete_edge: {}ms", start.elapsed().as_millis());
1751 }
1752 Ok(())
1753 }
1754
1755 pub async fn check_flush(&mut self) -> Result<()> {
1759 let count = self.l0_manager.get_current().read().mutation_count;
1760
1761 if count == 0 {
1763 return Ok(());
1764 }
1765
1766 if count >= self.config.auto_flush_threshold {
1768 self.flush_to_l1(None).await?;
1769 return Ok(());
1770 }
1771
1772 if let Some(interval) = self.config.auto_flush_interval
1774 && self.last_flush_time.elapsed() >= interval
1775 && count >= self.config.auto_flush_min_mutations
1776 {
1777 self.flush_to_l1(None).await?;
1778 }
1779
1780 Ok(())
1781 }
1782
1783 async fn process_embeddings_for_labels(
1786 &self,
1787 labels: &[String],
1788 properties: &mut Properties,
1789 ) -> Result<()> {
1790 let label_name = labels.first().map(|s| s.as_str());
1791 self.process_embeddings_impl(label_name, properties).await
1792 }
1793
1794 async fn process_embeddings_for_batch(
1804 &self,
1805 labels: &[String],
1806 properties_batch: &mut [Properties],
1807 ) -> Result<()> {
1808 let label_name = labels.first().map(|s| s.as_str());
1809 let schema = self.schema_manager.schema();
1810
1811 if let Some(label) = label_name {
1812 let mut configs = Vec::new();
1814 for idx in &schema.indexes {
1815 if let IndexDefinition::Vector(v_config) = idx
1816 && v_config.label == label
1817 && let Some(emb_config) = &v_config.embedding_config
1818 {
1819 configs.push((v_config.property.clone(), emb_config.clone()));
1820 }
1821 }
1822
1823 if configs.is_empty() {
1824 return Ok(());
1825 }
1826
1827 for (target_prop, emb_config) in configs {
1828 let mut input_texts: Vec<String> = Vec::new();
1830 let mut needs_embedding: Vec<usize> = Vec::new();
1831
1832 for (idx, properties) in properties_batch.iter().enumerate() {
1833 if properties.contains_key(&target_prop) {
1835 continue;
1836 }
1837
1838 let mut inputs = Vec::new();
1840 for src_prop in &emb_config.source_properties {
1841 if let Some(val) = properties.get(src_prop)
1842 && let Some(s) = val.as_str()
1843 {
1844 inputs.push(s.to_string());
1845 }
1846 }
1847
1848 if !inputs.is_empty() {
1849 let input_text = inputs.join(" ");
1850 input_texts.push(input_text);
1851 needs_embedding.push(idx);
1852 }
1853 }
1854
1855 if input_texts.is_empty() {
1856 continue;
1857 }
1858
1859 let runtime = self.xervo_runtime.as_ref().ok_or_else(|| {
1860 anyhow!("Uni-Xervo runtime not configured for auto-embedding")
1861 })?;
1862 let embedder = runtime.embedding(&emb_config.alias).await?;
1863
1864 let input_refs: Vec<&str> = input_texts.iter().map(|s| s.as_str()).collect();
1866 let embeddings = embedder.embed(input_refs).await?;
1867
1868 for (embedding_idx, &prop_idx) in needs_embedding.iter().enumerate() {
1870 if let Some(vec) = embeddings.get(embedding_idx) {
1871 let vals: Vec<Value> =
1872 vec.iter().map(|f| Value::Float(*f as f64)).collect();
1873 properties_batch[prop_idx].insert(target_prop.clone(), Value::List(vals));
1874 }
1875 }
1876 }
1877 }
1878
1879 Ok(())
1880 }
1881
1882 async fn process_embeddings_impl(
1883 &self,
1884 label_name: Option<&str>,
1885 properties: &mut Properties,
1886 ) -> Result<()> {
1887 let schema = self.schema_manager.schema();
1888
1889 if let Some(label) = label_name {
1890 let mut configs = Vec::new();
1892 for idx in &schema.indexes {
1893 if let IndexDefinition::Vector(v_config) = idx
1894 && v_config.label == label
1895 && let Some(emb_config) = &v_config.embedding_config
1896 {
1897 configs.push((v_config.property.clone(), emb_config.clone()));
1898 }
1899 }
1900
1901 if configs.is_empty() {
1902 log::info!("No embedding config found for label {}", label);
1903 }
1904
1905 for (target_prop, emb_config) in configs {
1906 if properties.contains_key(&target_prop) {
1908 continue;
1909 }
1910
1911 let mut inputs = Vec::new();
1913 for src_prop in &emb_config.source_properties {
1914 if let Some(val) = properties.get(src_prop)
1915 && let Some(s) = val.as_str()
1916 {
1917 inputs.push(s.to_string());
1918 }
1919 }
1920
1921 if inputs.is_empty() {
1922 continue;
1923 }
1924
1925 let input_text = inputs.join(" "); let runtime = self.xervo_runtime.as_ref().ok_or_else(|| {
1928 anyhow!("Uni-Xervo runtime not configured for auto-embedding")
1929 })?;
1930 let embedder = runtime.embedding(&emb_config.alias).await?;
1931
1932 let embeddings = embedder.embed(vec![input_text.as_str()]).await?;
1934 if let Some(vec) = embeddings.first() {
1935 let vals: Vec<Value> = vec.iter().map(|f| Value::Float(*f as f64)).collect();
1937 properties.insert(target_prop.clone(), Value::List(vals));
1938 }
1939 }
1940 }
1941 Ok(())
1942 }
1943
1944 #[instrument(
1954 skip(self),
1955 fields(snapshot_id, mutations_count, size_bytes),
1956 level = "info"
1957 )]
1958 pub async fn flush_to_l1(&mut self, name: Option<String>) -> Result<String> {
1959 let start = std::time::Instant::now();
1960 let schema = self.schema_manager.schema();
1961
1962 let (initial_size, initial_count) = {
1963 let l0_arc = self.l0_manager.get_current();
1964 let l0 = l0_arc.read();
1965 (l0.estimated_size, l0.mutation_count)
1966 };
1967 tracing::Span::current().record("size_bytes", initial_size);
1968 tracing::Span::current().record("mutations_count", initial_count);
1969
1970 debug!("Starting L0 flush to L1");
1971
1972 let wal_for_truncate = {
1977 let current_l0 = self.l0_manager.get_current();
1978 let l0_guard = current_l0.read();
1979 l0_guard.wal.clone()
1980 };
1981
1982 let wal_lsn = if let Some(ref w) = wal_for_truncate {
1983 w.flush().await?
1984 } else {
1985 0
1986 };
1987
1988 let old_l0_arc = self.l0_manager.begin_flush(0, None);
1992 metrics::counter!("uni_l0_buffer_rotations_total").increment(1);
1993
1994 let current_version;
1995 {
1996 let mut old_l0_guard = old_l0_arc.write();
1998 current_version = old_l0_guard.current_version;
1999
2000 old_l0_guard.wal_lsn_at_flush = wal_lsn;
2003
2004 let wal = old_l0_guard.wal.take();
2005
2006 let new_l0_arc = self.l0_manager.get_current();
2008 let mut new_l0_guard = new_l0_arc.write();
2009 new_l0_guard.wal = wal;
2010 new_l0_guard.current_version = current_version;
2011 } let mut entries_by_type: HashMap<u32, Vec<L1Entry>> = HashMap::new();
2015 type VertexEntry = (Vid, Vec<String>, Properties, bool, u64);
2017 let mut vertices_by_label: HashMap<u16, Vec<VertexEntry>> = HashMap::new();
2018 let mut vertex_created_at: HashMap<Vid, i64> = HashMap::new();
2020 let mut vertex_updated_at: HashMap<Vid, i64> = HashMap::new();
2021 let mut orphaned_tombstones: Vec<(Vid, u64)> = Vec::new();
2023
2024 {
2025 let old_l0 = old_l0_arc.read();
2026
2027 for edge in old_l0.graph.edges() {
2029 let properties = old_l0
2030 .edge_properties
2031 .get(&edge.eid)
2032 .cloned()
2033 .unwrap_or_default();
2034 let version = old_l0.edge_versions.get(&edge.eid).copied().unwrap_or(0);
2035
2036 let created_at = old_l0.edge_created_at.get(&edge.eid).copied();
2038 let updated_at = old_l0.edge_updated_at.get(&edge.eid).copied();
2039
2040 entries_by_type
2041 .entry(edge.edge_type)
2042 .or_default()
2043 .push(L1Entry {
2044 src_vid: edge.src_vid,
2045 dst_vid: edge.dst_vid,
2046 eid: edge.eid,
2047 op: Op::Insert,
2048 version,
2049 properties,
2050 created_at,
2051 updated_at,
2052 });
2053 }
2054
2055 for tombstone in old_l0.tombstones.values() {
2057 let version = old_l0
2058 .edge_versions
2059 .get(&tombstone.eid)
2060 .copied()
2061 .unwrap_or(0);
2062 let created_at = old_l0.edge_created_at.get(&tombstone.eid).copied();
2064 let updated_at = old_l0.edge_updated_at.get(&tombstone.eid).copied();
2065
2066 entries_by_type
2067 .entry(tombstone.edge_type)
2068 .or_default()
2069 .push(L1Entry {
2070 src_vid: tombstone.src_vid,
2071 dst_vid: tombstone.dst_vid,
2072 eid: tombstone.eid,
2073 op: Op::Delete,
2074 version,
2075 properties: HashMap::new(),
2076 created_at,
2077 updated_at,
2078 });
2079 }
2080
2081 let push_vertex_to_labels =
2087 |vid: Vid,
2088 all_labels: &[String],
2089 props: Properties,
2090 deleted: bool,
2091 version: u64,
2092 out: &mut HashMap<u16, Vec<VertexEntry>>| {
2093 for label in all_labels {
2094 if let Some(label_id) = schema.label_id_by_name(label) {
2095 out.entry(label_id).or_default().push((
2096 vid,
2097 all_labels.to_vec(),
2098 props.clone(),
2099 deleted,
2100 version,
2101 ));
2102 }
2103 }
2104 };
2105
2106 for (vid, props) in &old_l0.vertex_properties {
2107 let version = old_l0.vertex_versions.get(vid).copied().unwrap_or(0);
2108 if let Some(&ts) = old_l0.vertex_created_at.get(vid) {
2110 vertex_created_at.insert(*vid, ts);
2111 }
2112 if let Some(&ts) = old_l0.vertex_updated_at.get(vid) {
2113 vertex_updated_at.insert(*vid, ts);
2114 }
2115 if let Some(labels) = old_l0.vertex_labels.get(vid) {
2116 push_vertex_to_labels(
2117 *vid,
2118 labels,
2119 props.clone(),
2120 false,
2121 version,
2122 &mut vertices_by_label,
2123 );
2124 }
2125 }
2126 for &vid in &old_l0.vertex_tombstones {
2127 let version = old_l0.vertex_versions.get(&vid).copied().unwrap_or(0);
2128 if let Some(labels) = old_l0.vertex_labels.get(&vid) {
2129 push_vertex_to_labels(
2130 vid,
2131 labels,
2132 HashMap::new(),
2133 true,
2134 version,
2135 &mut vertices_by_label,
2136 );
2137 } else {
2138 orphaned_tombstones.push((vid, version));
2140 }
2141 }
2142 } if !orphaned_tombstones.is_empty() {
2146 tracing::warn!(
2147 count = orphaned_tombstones.len(),
2148 "Tombstones missing labels in L0, querying storage as fallback"
2149 );
2150 for (vid, version) in orphaned_tombstones {
2151 if let Ok(Some(labels)) = self.find_vertex_labels_in_storage(vid).await
2152 && !labels.is_empty()
2153 {
2154 for label in &labels {
2155 if let Some(label_id) = schema.label_id_by_name(label) {
2156 vertices_by_label.entry(label_id).or_default().push((
2157 vid,
2158 labels.clone(),
2159 HashMap::new(),
2160 true,
2161 version,
2162 ));
2163 }
2164 }
2165 }
2166 }
2167 }
2168
2169 let mut manifest = self
2171 .storage
2172 .snapshot_manager()
2173 .load_latest_snapshot()
2174 .await?
2175 .unwrap_or_else(|| {
2176 SnapshotManifest::new(Uuid::new_v4().to_string(), schema.schema_version)
2177 });
2178
2179 let parent_id = manifest.snapshot_id.clone();
2182 manifest.parent_snapshot = Some(parent_id);
2183 manifest.snapshot_id = Uuid::new_v4().to_string();
2184 manifest.name = name;
2185 manifest.created_at = Utc::now();
2186 manifest.version_high_water_mark = current_version;
2187 manifest.wal_high_water_mark = wal_lsn;
2188 let snapshot_id = manifest.snapshot_id.clone();
2189
2190 tracing::Span::current().record("snapshot_id", &snapshot_id);
2191
2192 for (&edge_type_id, entries) in entries_by_type.iter() {
2194 let edge_type_name = self
2196 .storage
2197 .schema_manager()
2198 .edge_type_name_by_id_unified(edge_type_id)
2199 .ok_or_else(|| anyhow!("Edge type ID {} not found", edge_type_id))?;
2200
2201 let mut fwd_entries = entries.clone();
2203 fwd_entries.sort_by_key(|e| e.src_vid);
2204 let fwd_ds = self.storage.delta_dataset(&edge_type_name, "fwd")?;
2205 let fwd_batch = fwd_ds.build_record_batch(&fwd_entries, &schema)?;
2206
2207 let backend = self.storage.backend();
2209 fwd_ds.write_run(backend, fwd_batch).await?;
2210 fwd_ds.ensure_eid_index(backend).await?;
2211
2212 let mut bwd_entries = entries.clone();
2214 bwd_entries.sort_by_key(|e| e.dst_vid);
2215 let bwd_ds = self.storage.delta_dataset(&edge_type_name, "bwd")?;
2216 let bwd_batch = bwd_ds.build_record_batch(&bwd_entries, &schema)?;
2217
2218 let backend = self.storage.backend();
2219 bwd_ds.write_run(backend, bwd_batch).await?;
2220 bwd_ds.ensure_eid_index(backend).await?;
2221
2222 let current_snap =
2224 manifest
2225 .edges
2226 .entry(edge_type_name.to_string())
2227 .or_insert(EdgeSnapshot {
2228 version: 0,
2229 count: 0,
2230 lance_version: 0,
2231 });
2232 current_snap.version += 1;
2233 current_snap.count += entries.len() as u64;
2234 current_snap.lance_version = 0;
2236
2237 }
2240
2241 for (label_id, vertices) in vertices_by_label {
2243 let label_name = schema
2244 .label_name_by_id(label_id)
2245 .ok_or_else(|| anyhow!("Label ID {} not found", label_id))?;
2246
2247 let ds = self.storage.vertex_dataset(label_name)?;
2248
2249 type InvertedUpdateMap = HashMap<String, (HashMap<Vid, Vec<String>>, HashSet<Vid>)>;
2252 let mut inverted_updates: InvertedUpdateMap = HashMap::new();
2253
2254 for idx in &schema.indexes {
2255 if let IndexDefinition::Inverted(cfg) = idx
2256 && cfg.label == label_name
2257 {
2258 let mut added: HashMap<Vid, Vec<String>> = HashMap::new();
2259 let mut removed: HashSet<Vid> = HashSet::new();
2260
2261 for (vid, _labels, props, deleted, _version) in &vertices {
2262 if *deleted {
2263 removed.insert(*vid);
2264 } else if let Some(prop_value) = props.get(&cfg.property) {
2265 if let Some(arr) = prop_value.as_array() {
2267 let terms: Vec<String> = arr
2268 .iter()
2269 .filter_map(|v| v.as_str().map(ToString::to_string))
2270 .collect();
2271 if !terms.is_empty() {
2272 added.insert(*vid, terms);
2273 }
2274 }
2275 }
2276 }
2277
2278 if !added.is_empty() || !removed.is_empty() {
2279 inverted_updates.insert(cfg.property.clone(), (added, removed));
2280 }
2281 }
2282 }
2283
2284 let mut v_data = Vec::new();
2285 let mut d_data = Vec::new();
2286 let mut ver_data = Vec::new();
2287 for (vid, labels, props, deleted, version) in vertices {
2288 v_data.push((vid, labels, props));
2289 d_data.push(deleted);
2290 ver_data.push(version);
2291 }
2292
2293 let batch = ds.build_record_batch_with_timestamps(
2294 &v_data,
2295 &d_data,
2296 &ver_data,
2297 &schema,
2298 Some(&vertex_created_at),
2299 Some(&vertex_updated_at),
2300 )?;
2301
2302 let backend = self.storage.backend();
2304 ds.write_batch(backend, batch, &schema).await?;
2305 ds.ensure_default_indexes(backend).await?;
2306
2307 for ((vid, labels, _props), &deleted) in v_data.iter().zip(d_data.iter()) {
2309 if deleted {
2310 self.storage.remove_from_vid_labels_index(*vid);
2311 } else {
2312 self.storage.update_vid_labels_index(*vid, labels.clone());
2313 }
2314 }
2315
2316 let current_snap =
2318 manifest
2319 .vertices
2320 .entry(label_name.to_string())
2321 .or_insert(LabelSnapshot {
2322 version: 0,
2323 count: 0,
2324 lance_version: 0,
2325 });
2326 current_snap.version += 1;
2327 current_snap.count += v_data.len() as u64;
2328 current_snap.lance_version = 0;
2330
2331 self.storage.invalidate_table_cache(label_name);
2333
2334 #[cfg(feature = "lance-backend")]
2336 for idx in &schema.indexes {
2337 if let IndexDefinition::Inverted(cfg) = idx
2338 && cfg.label == label_name
2339 && let Some((added, removed)) = inverted_updates.get(&cfg.property)
2340 {
2341 self.storage
2342 .index_manager()
2343 .update_inverted_index_incremental(cfg, added, removed)
2344 .await?;
2345 }
2346 }
2347
2348 #[cfg(feature = "lance-backend")]
2351 {
2352 let mut uid_mappings: Vec<(uni_common::core::id::UniId, Vid)> = Vec::new();
2353 for (vid, _labels, props) in &v_data {
2354 let ext_id = props.get("ext_id").and_then(|v| v.as_str());
2355 let uid = crate::storage::vertex::VertexDataset::compute_vertex_uid(
2356 label_name, ext_id, props,
2357 );
2358 uid_mappings.push((uid, *vid));
2359 }
2360
2361 if !uid_mappings.is_empty()
2362 && let Ok(uid_index) = self.storage.uid_index(label_name)
2363 {
2364 uid_index.write_mapping(&uid_mappings).await?;
2365 }
2366 }
2367 }
2368
2369 let (main_edges, edge_created_at_map, edge_updated_at_map) = {
2373 let _old_l0 = old_l0_arc.read();
2374 let mut main_edges: Vec<(
2375 uni_common::core::id::Eid,
2376 Vid,
2377 Vid,
2378 String,
2379 Properties,
2380 bool,
2381 u64,
2382 )> = Vec::new();
2383 let mut edge_created_at_map: HashMap<uni_common::core::id::Eid, i64> = HashMap::new();
2384 let mut edge_updated_at_map: HashMap<uni_common::core::id::Eid, i64> = HashMap::new();
2385
2386 for (&edge_type_id, entries) in entries_by_type.iter() {
2387 for entry in entries {
2388 let edge_type_name = self
2390 .storage
2391 .schema_manager()
2392 .edge_type_name_by_id_unified(edge_type_id)
2393 .unwrap_or_else(|| "unknown".to_string());
2394
2395 let deleted = matches!(entry.op, Op::Delete);
2396 main_edges.push((
2397 entry.eid,
2398 entry.src_vid,
2399 entry.dst_vid,
2400 edge_type_name,
2401 entry.properties.clone(),
2402 deleted,
2403 entry.version,
2404 ));
2405
2406 if let Some(ts) = entry.created_at {
2407 edge_created_at_map.insert(entry.eid, ts);
2408 }
2409 if let Some(ts) = entry.updated_at {
2410 edge_updated_at_map.insert(entry.eid, ts);
2411 }
2412 }
2413 }
2414
2415 (main_edges, edge_created_at_map, edge_updated_at_map)
2416 }; if !main_edges.is_empty() {
2419 let main_edge_batch = MainEdgeDataset::build_record_batch(
2420 &main_edges,
2421 Some(&edge_created_at_map),
2422 Some(&edge_updated_at_map),
2423 )?;
2424 MainEdgeDataset::write_batch(self.storage.backend(), main_edge_batch).await?;
2425 MainEdgeDataset::ensure_default_indexes(self.storage.backend()).await?;
2426 }
2427
2428 let main_vertices: Vec<(Vid, Vec<String>, Properties, bool, u64)> = {
2431 let old_l0 = old_l0_arc.read();
2432 let mut vertices = Vec::new();
2433
2434 for (vid, props) in &old_l0.vertex_properties {
2436 let version = old_l0.vertex_versions.get(vid).copied().unwrap_or(0);
2437 let labels = old_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
2438 vertices.push((*vid, labels, props.clone(), false, version));
2439 }
2440
2441 for &vid in &old_l0.vertex_tombstones {
2443 let version = old_l0.vertex_versions.get(&vid).copied().unwrap_or(0);
2444 let labels = old_l0.vertex_labels.get(&vid).cloned().unwrap_or_default();
2445 vertices.push((vid, labels, HashMap::new(), true, version));
2446 }
2447
2448 vertices
2449 }; if !main_vertices.is_empty() {
2452 let main_vertex_batch = MainVertexDataset::build_record_batch(
2453 &main_vertices,
2454 Some(&vertex_created_at),
2455 Some(&vertex_updated_at),
2456 )?;
2457 MainVertexDataset::write_batch(self.storage.backend(), main_vertex_batch).await?;
2458 MainVertexDataset::ensure_default_indexes(self.storage.backend()).await?;
2459 }
2460
2461 self.storage
2463 .snapshot_manager()
2464 .save_snapshot(&manifest)
2465 .await?;
2466 self.storage
2467 .snapshot_manager()
2468 .set_latest_snapshot(&manifest.snapshot_id)
2469 .await?;
2470
2471 self.l0_manager.complete_flush(&old_l0_arc);
2474
2475 if let Some(w) = wal_for_truncate {
2478 let safe_lsn = self
2480 .l0_manager
2481 .min_pending_wal_lsn()
2482 .map(|min_pending| min_pending.min(wal_lsn))
2483 .unwrap_or(wal_lsn);
2484 w.truncate_before(safe_lsn).await?;
2485 }
2486
2487 if let Some(ref pm) = self.property_manager {
2490 pm.clear_cache().await;
2491 }
2492
2493 self.last_flush_time = std::time::Instant::now();
2495
2496 info!(
2497 snapshot_id,
2498 mutations_count = initial_count,
2499 size_bytes = initial_size,
2500 "L0 flush to L1 completed successfully"
2501 );
2502 metrics::histogram!("uni_flush_duration_seconds").record(start.elapsed().as_secs_f64());
2503 metrics::counter!("uni_flush_bytes_total").increment(initial_size as u64);
2504 metrics::counter!("uni_flush_rows_total").increment(initial_count as u64);
2505
2506 let am = self.adjacency_manager.clone();
2510 if am.should_compact(4) {
2511 let previous_still_running = {
2512 let guard = self.compaction_handle.read();
2513 guard.as_ref().is_some_and(|h| !h.is_finished())
2514 };
2515
2516 if previous_still_running {
2517 info!("Skipping compaction: previous compaction still in progress");
2518 } else {
2519 let handle = tokio::spawn(async move {
2520 am.compact();
2521 });
2522 *self.compaction_handle.write() = Some(handle);
2523 }
2524 }
2525
2526 if let Some(ref rebuild_mgr) = self.index_rebuild_manager
2528 && self.config.index_rebuild.auto_rebuild_enabled
2529 {
2530 self.schedule_index_rebuilds_if_needed(&manifest, rebuild_mgr.clone());
2531 }
2532
2533 Ok(snapshot_id)
2534 }
2535
2536 fn schedule_index_rebuilds_if_needed(
2540 &self,
2541 manifest: &SnapshotManifest,
2542 rebuild_mgr: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
2543 ) {
2544 let checker = crate::storage::index_rebuild::RebuildTriggerChecker::new(
2545 self.config.index_rebuild.clone(),
2546 );
2547 let schema = self.schema_manager.schema();
2548 let labels = checker.labels_needing_rebuild(manifest, &schema.indexes);
2549
2550 if labels.is_empty() {
2551 return;
2552 }
2553
2554 for label in &labels {
2556 for idx in &schema.indexes {
2557 if idx.label() == label {
2558 let _ = self.schema_manager.update_index_metadata(idx.name(), |m| {
2559 m.status = uni_common::core::schema::IndexStatus::Stale;
2560 });
2561 }
2562 }
2563 }
2564
2565 tokio::spawn(async move {
2566 if let Err(e) = rebuild_mgr.schedule(labels).await {
2567 tracing::warn!("Failed to schedule index rebuild: {e}");
2568 }
2569 });
2570 }
2571
2572 pub fn set_property_manager(&mut self, pm: Arc<PropertyManager>) {
2574 self.property_manager = Some(pm);
2575 }
2576}
2577
2578#[cfg(test)]
2579mod tests {
2580 use super::*;
2581 use tempfile::tempdir;
2582
2583 #[tokio::test]
2586 async fn test_commit_transaction_wal_before_merge() -> Result<()> {
2587 use crate::runtime::wal::WriteAheadLog;
2588 use crate::storage::manager::StorageManager;
2589 use object_store::local::LocalFileSystem;
2590 use object_store::path::Path as ObjectStorePath;
2591 use uni_common::core::schema::SchemaManager;
2592
2593 let dir = tempdir()?;
2594 let path = dir.path().to_str().unwrap();
2595 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2596 let schema_path = ObjectStorePath::from("schema.json");
2597
2598 let schema_manager =
2599 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
2600 let _label_id = schema_manager.add_label("Test")?;
2601 schema_manager.save().await?;
2602
2603 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
2604
2605 let wal_path = ObjectStorePath::from("wal");
2607 let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
2608
2609 let mut writer = Writer::new_with_config(
2610 storage.clone(),
2611 schema_manager.clone(),
2612 1,
2613 UniConfig::default(),
2614 Some(wal),
2615 None,
2616 )
2617 .await?;
2618
2619 let tx_l0 = writer.create_transaction_l0();
2621
2622 let vid_a = writer.next_vid().await?;
2624 let vid_b = writer.next_vid().await?;
2625
2626 let mut props = std::collections::HashMap::new();
2627 props.insert("test".to_string(), Value::String("data".to_string()));
2628
2629 writer
2630 .insert_vertex_with_labels(vid_a, props.clone(), &["Test".to_string()], Some(&tx_l0))
2631 .await?;
2632 writer
2633 .insert_vertex_with_labels(
2634 vid_b,
2635 std::collections::HashMap::new(),
2636 &["Test".to_string()],
2637 Some(&tx_l0),
2638 )
2639 .await?;
2640
2641 let eid = writer.next_eid(1).await?;
2642 writer
2643 .insert_edge(
2644 vid_a,
2645 vid_b,
2646 1,
2647 eid,
2648 std::collections::HashMap::new(),
2649 None,
2650 Some(&tx_l0),
2651 )
2652 .await?;
2653
2654 let l0 = writer.l0_manager.get_current();
2656 let wal = l0.read().wal.clone().expect("Main L0 should have WAL");
2657 let mutations_before = wal.replay().await?;
2658 let count_before = mutations_before.len();
2659
2660 writer.commit_transaction_l0(tx_l0).await?;
2662
2663 let mutations_after = wal.replay().await?;
2665 assert!(
2666 mutations_after.len() > count_before,
2667 "WAL should contain transaction mutations after commit"
2668 );
2669
2670 let new_mutations: Vec<_> = mutations_after.into_iter().skip(count_before).collect();
2672
2673 let mut saw_vertex_a = false;
2674 let mut saw_vertex_b = false;
2675 let mut saw_edge = false;
2676
2677 for mutation in &new_mutations {
2678 match mutation {
2679 crate::runtime::wal::Mutation::InsertVertex { vid, .. } => {
2680 if *vid == vid_a {
2681 saw_vertex_a = true;
2682 }
2683 if *vid == vid_b {
2684 saw_vertex_b = true;
2685 }
2686 assert!(!saw_edge, "Vertices should be logged to WAL before edges");
2688 }
2689 crate::runtime::wal::Mutation::InsertEdge { eid: e, .. } => {
2690 if *e == eid {
2691 saw_edge = true;
2692 }
2693 assert!(
2695 saw_vertex_a && saw_vertex_b,
2696 "Edge should be logged after both vertices"
2697 );
2698 }
2699 _ => {}
2700 }
2701 }
2702
2703 assert!(saw_vertex_a, "Vertex A should be in WAL");
2704 assert!(saw_vertex_b, "Vertex B should be in WAL");
2705 assert!(saw_edge, "Edge should be in WAL");
2706
2707 let l0_read = l0.read();
2709 assert!(
2710 l0_read.vertex_properties.contains_key(&vid_a),
2711 "Vertex A should be in main L0"
2712 );
2713 assert!(
2714 l0_read.vertex_properties.contains_key(&vid_b),
2715 "Vertex B should be in main L0"
2716 );
2717 assert!(
2718 l0_read.edge_endpoints.contains_key(&eid),
2719 "Edge should be in main L0"
2720 );
2721
2722 Ok(())
2723 }
2724
2725 #[tokio::test]
2727 async fn test_commit_transaction_wal_failure_rollback() -> Result<()> {
2728 use crate::runtime::wal::WriteAheadLog;
2729 use crate::storage::manager::StorageManager;
2730 use object_store::local::LocalFileSystem;
2731 use object_store::path::Path as ObjectStorePath;
2732 use uni_common::core::schema::SchemaManager;
2733
2734 let dir = tempdir()?;
2735 let path = dir.path().to_str().unwrap();
2736 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2737 let schema_path = ObjectStorePath::from("schema.json");
2738
2739 let schema_manager =
2740 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
2741 let _label_id = schema_manager.add_label("Test")?;
2742 let _baseline_label_id = schema_manager.add_label("Baseline")?;
2743 let _txdata_label_id = schema_manager.add_label("TxData")?;
2744 schema_manager.save().await?;
2745
2746 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
2747
2748 let wal_path = ObjectStorePath::from("wal");
2750 let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
2751
2752 let mut writer = Writer::new_with_config(
2753 storage.clone(),
2754 schema_manager.clone(),
2755 1,
2756 UniConfig::default(),
2757 Some(wal),
2758 None,
2759 )
2760 .await?;
2761
2762 let baseline_vid = writer.next_vid().await?;
2764 writer
2765 .insert_vertex_with_labels(
2766 baseline_vid,
2767 [("baseline".to_string(), Value::Bool(true))]
2768 .into_iter()
2769 .collect(),
2770 &["Baseline".to_string()],
2771 None,
2772 )
2773 .await?;
2774
2775 let tx_l0 = writer.create_transaction_l0();
2777
2778 let tx_vid = writer.next_vid().await?;
2780 writer
2781 .insert_vertex_with_labels(
2782 tx_vid,
2783 [("tx_data".to_string(), Value::Bool(true))]
2784 .into_iter()
2785 .collect(),
2786 &["TxData".to_string()],
2787 Some(&tx_l0),
2788 )
2789 .await?;
2790
2791 let l0 = writer.l0_manager.get_current();
2793 let vertex_count_before = l0.read().vertex_properties.len();
2794
2795 drop(tx_l0);
2797
2798 let vertex_count_after = l0.read().vertex_properties.len();
2800 assert_eq!(
2801 vertex_count_before, vertex_count_after,
2802 "Main L0 should not change after rollback"
2803 );
2804
2805 assert!(
2807 l0.read().vertex_properties.contains_key(&baseline_vid),
2808 "Baseline data should remain"
2809 );
2810
2811 assert!(
2813 !l0.read().vertex_properties.contains_key(&tx_vid),
2814 "Transaction data should not be in main L0 after rollback"
2815 );
2816
2817 Ok(())
2818 }
2819
2820 #[tokio::test]
2823 async fn test_batch_insert_shared_labels() -> Result<()> {
2824 use crate::storage::manager::StorageManager;
2825 use object_store::local::LocalFileSystem;
2826 use object_store::path::Path as ObjectStorePath;
2827 use uni_common::core::schema::SchemaManager;
2828
2829 let dir = tempdir()?;
2830 let path = dir.path().to_str().unwrap();
2831 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2832 let schema_path = ObjectStorePath::from("schema.json");
2833
2834 let schema_manager =
2835 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
2836 let _label_id = schema_manager.add_label("Person")?;
2837 schema_manager.save().await?;
2838
2839 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
2840
2841 let mut writer = Writer::new(storage.clone(), schema_manager.clone(), 1).await?;
2842
2843 let labels = &["Person".to_string()];
2845
2846 let mut vids = Vec::new();
2848 for i in 0..100 {
2849 let vid = writer.next_vid().await?;
2850 let mut props = std::collections::HashMap::new();
2851 props.insert("id".to_string(), Value::Int(i));
2852 writer
2853 .insert_vertex_with_labels(vid, props, labels, None)
2854 .await?;
2855 vids.push(vid);
2856 }
2857
2858 let l0 = writer.l0_manager.get_current();
2860 for vid in vids {
2861 let l0_guard = l0.read();
2862 let vertex_labels = l0_guard.vertex_labels.get(&vid);
2863 assert!(vertex_labels.is_some(), "Vertex should have labels");
2864 assert_eq!(
2865 vertex_labels.unwrap(),
2866 &vec!["Person".to_string()],
2867 "Labels should match"
2868 );
2869 }
2870
2871 Ok(())
2872 }
2873
2874 #[tokio::test]
2877 async fn test_estimated_size_tracks_mutations() -> Result<()> {
2878 use crate::storage::manager::StorageManager;
2879 use object_store::local::LocalFileSystem;
2880 use object_store::path::Path as ObjectStorePath;
2881 use uni_common::core::schema::SchemaManager;
2882
2883 let dir = tempdir()?;
2884 let path = dir.path().to_str().unwrap();
2885 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
2886 let schema_path = ObjectStorePath::from("schema.json");
2887
2888 let schema_manager =
2889 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
2890 let _label_id = schema_manager.add_label("Test")?;
2891 schema_manager.save().await?;
2892
2893 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
2894
2895 let mut writer = Writer::new(storage.clone(), schema_manager.clone(), 1).await?;
2896
2897 let l0 = writer.l0_manager.get_current();
2898
2899 let initial_estimated = l0.read().estimated_size;
2901 let initial_actual = l0.read().size_bytes();
2902 assert_eq!(initial_estimated, 0, "Initial estimated_size should be 0");
2903 assert_eq!(initial_actual, 0, "Initial size_bytes should be 0");
2904
2905 let mut vids = Vec::new();
2907 for i in 0..10 {
2908 let vid = writer.next_vid().await?;
2909 let mut props = std::collections::HashMap::new();
2910 props.insert("name".to_string(), Value::String(format!("vertex_{}", i)));
2911 props.insert("index".to_string(), Value::Int(i));
2912 writer
2913 .insert_vertex_with_labels(vid, props, &[], None)
2914 .await?;
2915 vids.push(vid);
2916 }
2917
2918 let after_vertices_estimated = l0.read().estimated_size;
2920 let after_vertices_actual = l0.read().size_bytes();
2921 assert!(
2922 after_vertices_estimated > 0,
2923 "estimated_size should grow after insertions"
2924 );
2925
2926 let ratio = after_vertices_estimated as f64 / after_vertices_actual as f64;
2928 assert!(
2929 (0.5..=2.0).contains(&ratio),
2930 "estimated_size ({}) should be within 2x of size_bytes ({}), ratio: {}",
2931 after_vertices_estimated,
2932 after_vertices_actual,
2933 ratio
2934 );
2935
2936 let edge_type = 1u32;
2938 for i in 0..9 {
2939 let eid = writer.next_eid(edge_type).await?;
2940 writer
2941 .insert_edge(
2942 vids[i],
2943 vids[i + 1],
2944 edge_type,
2945 eid,
2946 std::collections::HashMap::new(),
2947 Some("NEXT".to_string()),
2948 None,
2949 )
2950 .await?;
2951 }
2952
2953 let after_edges_estimated = l0.read().estimated_size;
2955 let after_edges_actual = l0.read().size_bytes();
2956 assert!(
2957 after_edges_estimated > after_vertices_estimated,
2958 "estimated_size should grow after edge insertions"
2959 );
2960
2961 let ratio = after_edges_estimated as f64 / after_edges_actual as f64;
2963 assert!(
2964 (0.5..=2.0).contains(&ratio),
2965 "estimated_size ({}) should be within 2x of size_bytes ({}), ratio: {}",
2966 after_edges_estimated,
2967 after_edges_actual,
2968 ratio
2969 );
2970
2971 Ok(())
2972 }
2973}