1use crate::runtime::context::QueryContext;
5use crate::runtime::flush_coordinator::{
6 FinalizeFn, FlushCoordinator, FlushOutcome as AsyncFlushOutcome, RotatedFlush, SharedFlushCtx,
7};
8use crate::runtime::id_allocator::IdAllocator;
9use crate::runtime::l0::{L0Buffer, serialize_constraint_key};
10use crate::runtime::l0_manager::L0Manager;
11use crate::runtime::property_manager::PropertyManager;
12use crate::runtime::wal::WriteAheadLog;
13use crate::storage::adjacency_manager::AdjacencyManager;
14use crate::storage::delta::{L1Entry, Op};
15use crate::storage::main_edge::MainEdgeDataset;
16use crate::storage::main_vertex::MainVertexDataset;
17use crate::storage::manager::StorageManager;
18use anyhow::{Result, anyhow};
19use chrono::Utc;
20use metrics;
21use parking_lot::{Mutex as PlMutex, RwLock};
22use std::collections::{HashMap, HashSet};
23use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
24use std::sync::{Arc, OnceLock};
25use tracing::{debug, info, instrument};
26use uni_common::Properties;
27use uni_common::Value;
28use uni_common::config::UniConfig;
29use uni_common::core::fork::ForkId;
30use uni_common::core::id::{Eid, Vid};
31use uni_common::core::schema::{ConstraintTarget, ConstraintType, IndexDefinition};
32use uni_common::core::snapshot::{EdgeSnapshot, LabelSnapshot, SnapshotManifest};
33use uni_xervo::runtime::ModelRuntime;
34use uuid::Uuid;
35
36#[derive(Clone, Debug)]
37pub struct WriterConfig {
38 pub max_mutations: usize,
39 pub partial_lance_writes: bool,
56}
57
58impl Default for WriterConfig {
59 fn default() -> Self {
60 Self {
61 max_mutations: 10_000,
62 partial_lance_writes: false,
63 }
64 }
65}
66
67pub use crate::storage::manager::FlushInProgressGuard;
77
78struct RotateOutput {
83 old_l0_arc: Arc<RwLock<L0Buffer>>,
84 wal_lsn: u64,
85 current_version: u64,
86 flush_in_progress_guard: FlushInProgressGuard,
87}
88
89fn props_subset(props: &Properties, keys: &HashSet<String>) -> Properties {
93 let mut out = Properties::new();
94 for k in keys {
95 if let Some(v) = props.get(k) {
96 out.insert(k.clone(), v.clone());
97 }
98 }
99 out
100}
101
102struct FlushOutcome {
107 manifest: SnapshotManifest,
108 snapshot_id: String,
109}
110
111pub struct Writer {
112 pub l0_manager: Arc<L0Manager>,
113 pub storage: Arc<StorageManager>,
114 pub schema_manager: Arc<uni_common::core::schema::SchemaManager>,
115 pub allocator: Arc<IdAllocator>,
116 pub config: UniConfig,
117 xervo_runtime: OnceLock<Arc<ModelRuntime>>,
123 pub property_manager: Option<Arc<PropertyManager>>,
125 adjacency_manager: Arc<AdjacencyManager>,
127 last_flush_time: Arc<PlMutex<std::time::Instant>>,
133 compaction_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
135 index_rebuild_manager: Arc<OnceLock<Arc<crate::storage::index_rebuild::IndexRebuildManager>>>,
140 cached_manifest: Arc<PlMutex<Option<SnapshotManifest>>>,
145 pub fork_id: Option<ForkId>,
150 fork_flush_count: Arc<AtomicU64>,
160 fork_fragment_warn_fired: Arc<AtomicBool>,
164 flush_lock: Arc<tokio::sync::Mutex<()>>,
171 #[allow(dead_code)] pub(crate) flush_coordinator: Option<Arc<crate::runtime::flush_coordinator::FlushCoordinator>>,
187 commit_sequence: Arc<AtomicU64>,
193 committed_writes: Arc<PlMutex<crate::runtime::occ::CommitRegistry>>,
198 for_update_locks: Arc<dashmap::DashMap<Vec<u8>, Arc<tokio::sync::Mutex<()>>>>,
205}
206
207const OCC_REGISTRY_CAPACITY: usize = 4096;
211
212impl Writer {
213 pub async fn new(
214 storage: Arc<StorageManager>,
215 schema_manager: Arc<uni_common::core::schema::SchemaManager>,
216 start_version: u64,
217 ) -> Result<Self> {
218 Self::new_with_config(
219 storage,
220 schema_manager,
221 start_version,
222 UniConfig::default(),
223 None,
224 None,
225 )
226 .await
227 }
228
229 pub async fn new_with_config(
230 storage: Arc<StorageManager>,
231 schema_manager: Arc<uni_common::core::schema::SchemaManager>,
232 start_version: u64,
233 config: UniConfig,
234 wal: Option<Arc<WriteAheadLog>>,
235 allocator: Option<Arc<IdAllocator>>,
236 ) -> Result<Self> {
237 let allocator = if let Some(a) = allocator {
238 a
239 } else {
240 let store = storage.store();
241 let path = object_store::path::Path::from("id_allocator.json");
242 Arc::new(IdAllocator::new(store, path, 1000).await?)
243 };
244
245 let l0_manager = Arc::new(L0Manager::new(start_version, wal));
246
247 let property_manager = Some(Arc::new(PropertyManager::new(
248 storage.clone(),
249 schema_manager.clone(),
250 1000,
251 )));
252
253 let adjacency_manager = storage.adjacency_manager();
254
255 let last_flush_time = Arc::new(PlMutex::new(std::time::Instant::now()));
260 let cached_manifest = Arc::new(PlMutex::new(None));
261 let fork_flush_count = Arc::new(AtomicU64::new(0));
262 let fork_fragment_warn_fired = Arc::new(AtomicBool::new(false));
263 let flush_lock = Arc::new(tokio::sync::Mutex::new(()));
264 let compaction_handle = Arc::new(RwLock::new(None));
265 let index_rebuild_manager: Arc<
266 OnceLock<Arc<crate::storage::index_rebuild::IndexRebuildManager>>,
267 > = Arc::new(OnceLock::new());
268
269 let flush_coordinator = if config.async_flush_enabled {
270 let shared = SharedFlushCtx {
271 storage: storage.clone(),
272 l0_manager: l0_manager.clone(),
273 adjacency_manager: adjacency_manager.clone(),
274 property_manager: property_manager.clone(),
275 schema_manager: schema_manager.clone(),
276 cached_manifest: cached_manifest.clone(),
277 last_flush_time: last_flush_time.clone(),
278 fork_id: None,
279 fork_flush_count: fork_flush_count.clone(),
280 fork_fragment_warn_fired: fork_fragment_warn_fired.clone(),
281 fork_fragment_warn_threshold: config.fork_fragment_warn_threshold,
282 flush_lock: flush_lock.clone(),
283 index_rebuild_manager: index_rebuild_manager.clone(),
284 compaction_handle: compaction_handle.clone(),
285 compaction_config: config.compaction.clone(),
286 index_rebuild_config: config.index_rebuild.clone(),
287 auto_rebuild_enabled: config.index_rebuild.auto_rebuild_enabled,
288 };
289 let finalize_fn: Arc<dyn FinalizeFn> = Arc::new(WriterFinalizer);
290 Some(Arc::new(FlushCoordinator::new(
291 config.max_pending_flushes,
292 shared,
293 finalize_fn,
294 )))
295 } else {
296 None
297 };
298
299 let commit_sequence = Arc::new(AtomicU64::new(0));
300 let committed_writes = Arc::new(PlMutex::new(crate::runtime::occ::CommitRegistry::new(
301 OCC_REGISTRY_CAPACITY,
302 )));
303 let for_update_locks = Arc::new(dashmap::DashMap::new());
304
305 Ok(Self {
306 l0_manager,
307 storage,
308 schema_manager,
309 allocator,
310 config,
311 xervo_runtime: OnceLock::new(),
312 property_manager,
313 adjacency_manager,
314 last_flush_time,
315 compaction_handle,
316 index_rebuild_manager,
317 cached_manifest,
318 fork_id: None,
319 fork_flush_count,
320 fork_fragment_warn_fired,
321 flush_lock,
322 flush_coordinator,
323 commit_sequence,
324 committed_writes,
325 for_update_locks,
326 })
327 }
328
329 pub fn row_lock_handle(&self, key: &[u8]) -> Arc<tokio::sync::Mutex<()>> {
333 self.for_update_locks
334 .entry(key.to_vec())
335 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
336 .clone()
337 }
338
339 pub fn release_for_update_locks(&self, keys: &[Vec<u8>]) {
351 for key in keys {
352 self.for_update_locks
353 .remove_if(key, |_, handle| Arc::strong_count(handle) == 1);
354 }
355 }
356
357 pub fn for_update_lock_count(&self) -> usize {
360 self.for_update_locks.len()
361 }
362
363 pub fn current_commit_sequence(&self) -> u64 {
367 self.commit_sequence.load(Ordering::Relaxed)
368 }
369
370 pub(crate) fn shared_ctx(&self) -> SharedFlushCtx {
375 SharedFlushCtx {
376 storage: self.storage.clone(),
377 l0_manager: self.l0_manager.clone(),
378 adjacency_manager: self.adjacency_manager.clone(),
379 property_manager: self.property_manager.clone(),
380 schema_manager: self.schema_manager.clone(),
381 cached_manifest: self.cached_manifest.clone(),
382 last_flush_time: self.last_flush_time.clone(),
383 fork_id: self.fork_id,
384 fork_flush_count: self.fork_flush_count.clone(),
385 fork_fragment_warn_fired: self.fork_fragment_warn_fired.clone(),
386 fork_fragment_warn_threshold: self.config.fork_fragment_warn_threshold,
387 flush_lock: self.flush_lock.clone(),
388 index_rebuild_manager: self.index_rebuild_manager.clone(),
389 compaction_handle: self.compaction_handle.clone(),
390 compaction_config: self.config.compaction.clone(),
391 index_rebuild_config: self.config.index_rebuild.clone(),
392 auto_rebuild_enabled: self.config.index_rebuild.auto_rebuild_enabled,
393 }
394 }
395
396 pub fn flush_coordinator(
400 &self,
401 ) -> Option<&Arc<crate::runtime::flush_coordinator::FlushCoordinator>> {
402 self.flush_coordinator.as_ref()
403 }
404
405 pub fn set_index_rebuild_manager(
410 &self,
411 manager: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
412 ) -> Result<()> {
413 self.index_rebuild_manager
414 .set(manager)
415 .map_err(|_| anyhow!("index_rebuild_manager already set"))
416 }
417
418 pub async fn replay_wal(&self, wal_high_water_mark: u64) -> Result<usize> {
420 let l0 = self.l0_manager.get_current();
421 let wal = l0.read().wal.clone();
422
423 if let Some(wal) = wal {
424 wal.initialize().await?;
425 let mutations = wal.replay_since(wal_high_water_mark).await?;
426 let count = mutations.len();
427
428 if count > 0 {
429 log::info!(
430 "Replaying {} mutations from WAL (LSN > {})",
431 count,
432 wal_high_water_mark
433 );
434 let mut l0_guard = l0.write();
435 l0_guard.replay_mutations(mutations)?;
436 }
437
438 Ok(count)
439 } else {
440 Ok(0)
441 }
442 }
443
444 pub async fn next_vid(&self) -> Result<Vid> {
446 self.allocator.allocate_vid().await
447 }
448
449 pub async fn allocate_vids(&self, count: usize) -> Result<Vec<Vid>> {
452 self.allocator.allocate_vids(count).await
453 }
454
455 pub async fn next_eid(&self, _type_id: u32) -> Result<Eid> {
457 self.allocator.allocate_eid().await
458 }
459
460 pub async fn allocate_eids(&self, count: usize) -> Result<Vec<Eid>> {
463 self.allocator.allocate_eids(count).await
464 }
465
466 pub fn set_xervo_runtime(&self, runtime: Arc<ModelRuntime>) -> Result<()> {
469 self.xervo_runtime
470 .set(runtime)
471 .map_err(|_| anyhow!("xervo_runtime already set"))
472 }
473
474 pub fn xervo_runtime(&self) -> Option<Arc<ModelRuntime>> {
475 self.xervo_runtime.get().cloned()
476 }
477
478 pub fn create_transaction_l0(&self) -> Arc<RwLock<L0Buffer>> {
484 let current_version = self.l0_manager.get_current().read().current_version;
485 let buf = L0Buffer::new(current_version, None);
487 let buf = if self.config.ssi_enabled {
492 let mut buf = buf;
493 buf.occ_read_seq = self.commit_sequence.load(Ordering::Relaxed);
494 buf.occ_read_set = Some(Arc::new(parking_lot::Mutex::new(
497 crate::runtime::l0::OccReadSet::default(),
498 )));
499 buf
500 } else {
501 buf
502 };
503 Arc::new(RwLock::new(buf))
504 }
505
506 fn resolve_l0(&self, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) -> Arc<RwLock<L0Buffer>> {
511 tx_l0
512 .cloned()
513 .unwrap_or_else(|| self.l0_manager.get_current())
514 }
515
516 fn update_metrics(&self) {
517 let l0 = self.l0_manager.get_current();
518 let size = l0.read().estimated_size;
519 metrics::gauge!("l0_buffer_size_bytes").set(size as f64);
520 }
521
522 pub async fn commit_transaction_l0(
535 self: &Arc<Self>,
536 tx_l0_arc: Arc<RwLock<L0Buffer>>,
537 ) -> Result<(u64, bool)> {
538 let _flush_lock_guard = self.flush_lock.lock().await;
544
545 fail::fail_point!("commit::after-flush-lock");
549
550 let occ_write_set: Option<crate::runtime::occ::WriteSet> = if self.config.ssi_enabled {
559 let tx_l0 = tx_l0_arc.read();
560 let read_seq = tx_l0.occ_read_seq;
561 let write_set = crate::runtime::occ::WriteSet::from_l0(&tx_l0);
562 if !write_set.is_empty() {
563 metrics::counter!("uni_ssi_commit_validations_total").increment(1);
566 let read_guard = tx_l0.occ_read_set.as_ref().map(|rs| rs.lock());
569 if let Some(conflict) =
570 self.committed_writes
571 .lock()
572 .check(read_seq, &write_set, read_guard.as_deref())
573 {
574 use crate::runtime::occ::Conflict;
575 match &conflict {
576 Conflict::WriteWrite { .. } => metrics::counter!(
577 "uni_ssi_serialization_conflicts_total",
578 "kind" => "write_write",
579 )
580 .increment(1),
581 Conflict::ReadWrite { .. } => metrics::counter!(
582 "uni_ssi_serialization_conflicts_total",
583 "kind" => "read_write",
584 )
585 .increment(1),
586 Conflict::HistoryTruncated { .. } => {
587 metrics::counter!("uni_ssi_history_truncated_total").increment(1)
588 }
589 }
590 return Err(anyhow::Error::new(
591 uni_common::UniError::SerializationConflict {
592 message: conflict.to_string(),
593 },
594 ));
595 }
596 }
597
598 {
601 let main_l0 = self.l0_manager.get_current();
602 let main_l0 = main_l0.read();
603
604 for (key, vid) in &tx_l0.constraint_index {
609 if main_l0.has_constraint_key(key, *vid) {
610 metrics::counter!("uni_ssi_constraint_conflicts_total").increment(1);
611 return Err(anyhow::Error::new(
612 uni_common::UniError::ConstraintConflict {
613 message: "unique key already committed by a concurrent \
614 transaction"
615 .to_string(),
616 },
617 ));
618 }
619 }
620
621 if let Some(conflict) =
626 crate::runtime::occ::crdt_carveout_overwrite(&tx_l0, &main_l0)
627 {
628 metrics::counter!("uni_ssi_crdt_aborts_total").increment(1);
629 return Err(anyhow::Error::new(
630 uni_common::UniError::SerializationConflict {
631 message: conflict.to_string(),
632 },
633 ));
634 }
635 }
636 Some(write_set)
637 } else {
638 None
639 };
640
641 fail::fail_point!("commit::after-validate");
645
646 {
649 let tx_l0 = tx_l0_arc.read();
650 let main_l0_arc = self.l0_manager.get_current();
651 let main_l0 = main_l0_arc.read();
652
653 if let Some(wal) = main_l0.wal.as_ref() {
655 for (vid, properties) in &tx_l0.vertex_properties {
659 if !tx_l0.vertex_tombstones.contains(vid) {
660 let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
661 wal.append(&crate::runtime::wal::Mutation::InsertVertex {
662 vid: *vid,
663 properties: properties.clone(),
664 labels,
665 })?;
666 }
667 }
668
669 for vid in &tx_l0.vertex_tombstones {
671 let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
672 wal.append(&crate::runtime::wal::Mutation::DeleteVertex { vid: *vid, labels })?;
673 }
674
675 for vid in &tx_l0.vertex_label_overwrites {
679 if tx_l0.vertex_tombstones.contains(vid) {
680 continue;
681 }
682 let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
683 wal.append(&crate::runtime::wal::Mutation::SetVertexLabels {
684 vid: *vid,
685 labels,
686 })?;
687 }
688
689 fail::fail_point!("commit::mid-wal");
693
694 for (eid, (src_vid, dst_vid, edge_type)) in &tx_l0.edge_endpoints {
696 if tx_l0.tombstones.contains_key(eid) {
697 let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
698 wal.append(&crate::runtime::wal::Mutation::DeleteEdge {
699 eid: *eid,
700 src_vid: *src_vid,
701 dst_vid: *dst_vid,
702 edge_type: *edge_type,
703 version,
704 })?;
705 } else {
706 let properties =
707 tx_l0.edge_properties.get(eid).cloned().unwrap_or_default();
708 let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
709 let edge_type_name = tx_l0.edge_types.get(eid).cloned();
710 wal.append(&crate::runtime::wal::Mutation::InsertEdge {
711 src_vid: *src_vid,
712 dst_vid: *dst_vid,
713 edge_type: *edge_type,
714 eid: *eid,
715 version,
716 properties,
717 edge_type_name,
718 })?;
719 }
720 }
721
722 for (eid, tombstone) in &tx_l0.tombstones {
726 if !tx_l0.edge_endpoints.contains_key(eid) {
727 let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
728 wal.append(&crate::runtime::wal::Mutation::DeleteEdge {
729 eid: *eid,
730 src_vid: tombstone.src_vid,
731 dst_vid: tombstone.dst_vid,
732 edge_type: tombstone.edge_type,
733 version,
734 })?;
735 }
736 }
737 }
738 }
739
740 let wal_lsn = self.flush_wal().await?;
742
743 fail::fail_point!("commit::after-wal-flush");
747
748 if self.l0_manager.is_current_pinned() {
758 self.l0_manager.freeze_current_for_snapshot();
759 metrics::counter!("uni_l0_snapshot_freezes_total").increment(1);
760 }
761
762 {
764 let tx_l0 = tx_l0_arc.read();
765 let main_l0_arc = self.l0_manager.get_current();
766 let mut main_l0 = main_l0_arc.write();
767 main_l0.merge(&tx_l0)?;
768
769 for (eid, (src, dst, etype)) in &tx_l0.edge_endpoints {
771 let edge_version = tx_l0
772 .edge_versions
773 .get(eid)
774 .copied()
775 .unwrap_or(main_l0.current_version);
776 if tx_l0.tombstones.contains_key(eid) {
777 self.adjacency_manager
778 .add_tombstone(*eid, *src, *dst, *etype, edge_version);
779 } else {
780 self.adjacency_manager
781 .insert_edge(*src, *dst, *eid, *etype, edge_version);
782 }
783 }
784
785 for (eid, tombstone) in &tx_l0.tombstones {
788 if !tx_l0.edge_endpoints.contains_key(eid) {
789 let edge_version = tx_l0
790 .edge_versions
791 .get(eid)
792 .copied()
793 .unwrap_or(main_l0.current_version);
794 self.adjacency_manager.add_tombstone(
795 *eid,
796 tombstone.src_vid,
797 tombstone.dst_vid,
798 tombstone.edge_type,
799 edge_version,
800 );
801 }
802 }
803 }
804
805 fail::fail_point!("commit::after-merge");
810
811 if let Some(write_set) = occ_write_set
816 && !write_set.is_empty()
817 {
818 let seq = self.commit_sequence.fetch_add(1, Ordering::Relaxed) + 1;
819 self.committed_writes.lock().record(seq, write_set);
820 }
821
822 self.update_metrics();
823
824 let mut flush_pending = false;
837 if self.should_flush() {
838 if self.config.async_flush_enabled
839 && let Some(coord) = self.flush_coordinator.as_ref()
840 && coord.pending_flush_count() < self.config.max_pending_flushes
841 {
842 match coord.try_acquire_permit() {
843 Some(permit) => {
844 let seq = coord.next_rotate_seq();
845 coord.note_pending();
846 match self.flush_l0_rotate().await {
847 Ok(rotate_out) => {
848 drop(_flush_lock_guard);
851 let parent_manifest = self.cached_manifest.lock().clone();
852 let rotated = crate::runtime::flush_coordinator::RotatedFlush {
853 seq,
854 old_l0_arc: rotate_out.old_l0_arc.clone(),
855 wal_lsn: rotate_out.wal_lsn,
856 current_version: rotate_out.current_version,
857 name: None,
858 parent_manifest,
859 permit,
860 flush_in_progress_guard: rotate_out.flush_in_progress_guard,
861 };
862 let writer = self.clone();
863 let _ticket = coord.submit_for_stream(
864 rotated,
865 move |old_l0, wal, ver, n| async move {
866 let outcome =
867 writer.flush_stream_l1(old_l0, wal, ver, n).await?;
868 Ok(crate::runtime::flush_coordinator::FlushOutcome {
869 new_manifest: outcome.manifest,
870 snapshot_id: outcome.snapshot_id,
871 })
872 },
873 );
874 flush_pending = true;
875 return Ok((wal_lsn, flush_pending));
877 }
878 Err(e) => {
879 tracing::warn!("Async rotate failed (non-critical): {}", e);
880 }
882 }
883 }
884 None => {
885 metrics::counter!("uni_flush_trigger_skipped_total").increment(1);
888 }
889 }
890 } else if let Err(e) = self.flush_inline_under_lock(None).await {
891 tracing::warn!("Post-commit flush check failed (non-critical): {}", e);
892 }
893 }
894
895 Ok((wal_lsn, flush_pending))
896 }
897
898 pub async fn flush_wal(&self) -> Result<u64> {
902 let l0 = self.l0_manager.get_current();
903 let wal = l0.read().wal.clone();
904
905 match wal {
906 Some(wal) => Ok(wal.flush().await?),
907 None => Ok(0),
908 }
909 }
910
911 pub fn track_properties_removed(&self, count: usize, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) {
915 if count == 0 {
916 return;
917 }
918 let l0 = self.resolve_l0(tx_l0);
919 l0.write().mutation_stats.properties_removed += count;
920 }
921
922 async fn validate_vertex_constraints_for_label(
925 &self,
926 vid: Vid,
927 properties: &Properties,
928 label: &str,
929 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
930 ) -> Result<()> {
931 self.validate_vertex_constraints_for_label_impl(vid, properties, label, tx_l0, false)
932 .await
933 }
934
935 async fn validate_vertex_constraints_for_label_partial(
942 &self,
943 vid: Vid,
944 properties: &Properties,
945 label: &str,
946 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
947 ) -> Result<()> {
948 self.validate_vertex_constraints_for_label_impl(vid, properties, label, tx_l0, true)
949 .await
950 }
951
952 async fn validate_vertex_constraints_for_label_impl(
953 &self,
954 vid: Vid,
955 properties: &Properties,
956 label: &str,
957 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
958 partial: bool,
959 ) -> Result<()> {
960 let schema = self.schema_manager.schema();
961
962 {
963 if let Some(props_meta) = schema.properties.get(label) {
968 for (prop_name, meta) in props_meta {
969 if !meta.nullable {
970 let present = properties.get(prop_name);
971 if partial && present.is_none() {
972 continue;
973 }
974 if present.is_none_or(|v| v.is_null()) {
975 log::warn!(
976 "Constraint violation: Property '{}' cannot be null for label '{}'",
977 prop_name,
978 label
979 );
980 return Err(anyhow!(
981 "Constraint violation: Property '{}' cannot be null",
982 prop_name
983 ));
984 }
985 }
986 }
987 }
988
989 for constraint in &schema.constraints {
991 if !constraint.enabled {
992 continue;
993 }
994 match &constraint.target {
995 ConstraintTarget::Label(l) if l == label => {}
996 _ => continue,
997 }
998
999 match &constraint.constraint_type {
1000 ConstraintType::Unique {
1001 properties: unique_props,
1002 } => {
1003 if !unique_props.is_empty() {
1005 let mut key_values = Vec::new();
1006 let mut missing = false;
1007 for prop in unique_props {
1008 if let Some(val) = properties.get(prop) {
1009 key_values.push((prop.clone(), val.clone()));
1010 } else {
1011 missing = true; }
1016 }
1017
1018 if !missing {
1019 self.check_unique_constraint_multi(label, &key_values, vid, tx_l0)
1020 .await?;
1021 }
1022 }
1023 }
1024 ConstraintType::Exists { property } => {
1025 if properties.get(property).is_none_or(|v| v.is_null()) {
1026 log::warn!(
1027 "Constraint violation: Property '{}' must exist for label '{}'",
1028 property,
1029 label
1030 );
1031 return Err(anyhow!(
1032 "Constraint violation: Property '{}' must exist",
1033 property
1034 ));
1035 }
1036 }
1037 ConstraintType::Check { expression } => {
1038 if !self.evaluate_check_constraint(expression, properties)? {
1039 return Err(anyhow!(
1040 "CHECK constraint '{}' violated: expression '{}' evaluated to false",
1041 constraint.name,
1042 expression
1043 ));
1044 }
1045 }
1046 _ => {
1047 return Err(anyhow!("Unsupported constraint type"));
1048 }
1049 }
1050 }
1051 }
1052 Ok(())
1053 }
1054
1055 async fn validate_vertex_constraints(
1059 &self,
1060 vid: Vid,
1061 properties: &Properties,
1062 labels: &[String],
1063 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1064 ) -> Result<()> {
1065 let schema = self.schema_manager.schema();
1066
1067 for label in labels {
1069 if schema.get_label_case_insensitive(label).is_none() {
1071 continue;
1072 }
1073 self.validate_vertex_constraints_for_label(vid, properties, label, tx_l0)
1074 .await?;
1075 }
1076
1077 if let Some(ext_id) = properties.get("ext_id").and_then(|v| v.as_str()) {
1079 self.check_extid_globally_unique(ext_id, vid, tx_l0).await?;
1080 }
1081
1082 Ok(())
1083 }
1084
1085 async fn validate_vertex_constraints_partial(
1090 &self,
1091 vid: Vid,
1092 touched: &Properties,
1093 labels: &[String],
1094 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1095 ) -> Result<()> {
1096 let schema = self.schema_manager.schema();
1097 for label in labels {
1098 if schema.get_label_case_insensitive(label).is_none() {
1099 continue;
1100 }
1101 self.validate_vertex_constraints_for_label_partial(vid, touched, label, tx_l0)
1102 .await?;
1103 }
1104 if let Some(ext_id) = touched.get("ext_id").and_then(|v| v.as_str()) {
1105 self.check_extid_globally_unique(ext_id, vid, tx_l0).await?;
1106 }
1107 Ok(())
1108 }
1109
1110 fn collect_constraint_keys_from_properties<'a>(
1114 properties_iter: impl Iterator<Item = &'a Properties>,
1115 label: &str,
1116 constraints: &[uni_common::core::schema::Constraint],
1117 existing_keys: &mut HashMap<String, HashSet<String>>,
1118 existing_extids: &mut HashSet<String>,
1119 ) {
1120 for props in properties_iter {
1121 if let Some(ext_id) = props.get("ext_id").and_then(|v| v.as_str()) {
1122 existing_extids.insert(ext_id.to_string());
1123 }
1124
1125 for constraint in constraints {
1126 if !constraint.enabled {
1127 continue;
1128 }
1129 if let ConstraintTarget::Label(l) = &constraint.target {
1130 if l != label {
1131 continue;
1132 }
1133 } else {
1134 continue;
1135 }
1136
1137 if let ConstraintType::Unique {
1138 properties: unique_props,
1139 } = &constraint.constraint_type
1140 {
1141 let mut key_parts = Vec::new();
1142 let mut all_present = true;
1143 for prop in unique_props {
1144 if let Some(val) = props.get(prop) {
1145 key_parts.push(format!("{}:{}", prop, val));
1146 } else {
1147 all_present = false;
1148 break;
1149 }
1150 }
1151 if all_present {
1152 let key = key_parts.join("|");
1153 existing_keys
1154 .entry(constraint.name.clone())
1155 .or_default()
1156 .insert(key);
1157 }
1158 }
1159 }
1160 }
1161 }
1162
1163 async fn validate_vertex_batch_constraints(
1178 &self,
1179 vids: &[Vid],
1180 properties_batch: &[Properties],
1181 label: &str,
1182 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1183 ) -> Result<()> {
1184 if vids.len() != properties_batch.len() {
1185 return Err(anyhow!("VID/properties length mismatch"));
1186 }
1187
1188 let schema = self.schema_manager.schema();
1189
1190 if let Some(props_meta) = schema.properties.get(label) {
1192 for (idx, properties) in properties_batch.iter().enumerate() {
1193 for (prop_name, meta) in props_meta {
1194 if !meta.nullable && properties.get(prop_name).is_none_or(|v| v.is_null()) {
1195 return Err(anyhow!(
1196 "Constraint violation at index {}: Property '{}' cannot be null",
1197 idx,
1198 prop_name
1199 ));
1200 }
1201 }
1202 }
1203 }
1204
1205 let mut existing_keys: HashMap<String, HashSet<String>> = HashMap::new();
1207 let mut existing_extids: HashSet<String> = HashSet::new();
1208
1209 {
1211 let l0 = self.l0_manager.get_current();
1212 let l0_guard = l0.read();
1213 Self::collect_constraint_keys_from_properties(
1214 l0_guard.vertex_properties.values(),
1215 label,
1216 &schema.constraints,
1217 &mut existing_keys,
1218 &mut existing_extids,
1219 );
1220 }
1221
1222 if let Some(tx_l0) = tx_l0 {
1224 let tx_l0_guard = tx_l0.read();
1225 Self::collect_constraint_keys_from_properties(
1226 tx_l0_guard.vertex_properties.values(),
1227 label,
1228 &schema.constraints,
1229 &mut existing_keys,
1230 &mut existing_extids,
1231 );
1232 }
1233
1234 let mut batch_keys: HashMap<String, HashMap<String, usize>> = HashMap::new();
1236 let mut batch_extids: HashMap<String, usize> = HashMap::new();
1237
1238 for (idx, (_vid, properties)) in vids.iter().zip(properties_batch.iter()).enumerate() {
1239 if let Some(ext_id) = properties.get("ext_id").and_then(|v| v.as_str()) {
1241 if existing_extids.contains(ext_id) {
1242 return Err(anyhow!(
1243 "Constraint violation at index {}: ext_id '{}' already exists",
1244 idx,
1245 ext_id
1246 ));
1247 }
1248 if let Some(first_idx) = batch_extids.get(ext_id) {
1249 return Err(anyhow!(
1250 "Constraint violation: ext_id '{}' duplicated in batch at indices {} and {}",
1251 ext_id,
1252 first_idx,
1253 idx
1254 ));
1255 }
1256 batch_extids.insert(ext_id.to_string(), idx);
1257 }
1258
1259 for constraint in &schema.constraints {
1261 if !constraint.enabled {
1262 continue;
1263 }
1264 if let ConstraintTarget::Label(l) = &constraint.target {
1265 if l != label {
1266 continue;
1267 }
1268 } else {
1269 continue;
1270 }
1271
1272 match &constraint.constraint_type {
1273 ConstraintType::Unique {
1274 properties: unique_props,
1275 } => {
1276 let mut key_parts = Vec::new();
1277 let mut all_present = true;
1278 for prop in unique_props {
1279 if let Some(val) = properties.get(prop) {
1280 key_parts.push(format!("{}:{}", prop, val));
1281 } else {
1282 all_present = false;
1283 break;
1284 }
1285 }
1286
1287 if all_present {
1288 let key = key_parts.join("|");
1289
1290 if let Some(keys) = existing_keys.get(&constraint.name)
1292 && keys.contains(&key)
1293 {
1294 return Err(anyhow!(
1295 "Constraint violation at index {}: Duplicate composite key for label '{}' (constraint '{}')",
1296 idx,
1297 label,
1298 constraint.name
1299 ));
1300 }
1301
1302 let batch_constraint_keys =
1304 batch_keys.entry(constraint.name.clone()).or_default();
1305 if let Some(first_idx) = batch_constraint_keys.get(&key) {
1306 return Err(anyhow!(
1307 "Constraint violation: Duplicate key '{}' in batch at indices {} and {}",
1308 key,
1309 first_idx,
1310 idx
1311 ));
1312 }
1313 batch_constraint_keys.insert(key, idx);
1314 }
1315 }
1316 ConstraintType::Exists { property }
1317 if properties.get(property).is_none_or(|v| v.is_null()) =>
1318 {
1319 return Err(anyhow!(
1320 "Constraint violation at index {}: Property '{}' must exist",
1321 idx,
1322 property
1323 ));
1324 }
1325 ConstraintType::Check { expression }
1326 if !self.evaluate_check_constraint(expression, properties)? =>
1327 {
1328 return Err(anyhow!(
1329 "Constraint violation at index {}: CHECK constraint '{}' violated",
1330 idx,
1331 constraint.name
1332 ));
1333 }
1334 _ => {}
1335 }
1336 }
1337 }
1338
1339 for constraint in &schema.constraints {
1341 if !constraint.enabled {
1342 continue;
1343 }
1344 if let ConstraintTarget::Label(l) = &constraint.target {
1345 if l != label {
1346 continue;
1347 }
1348 } else {
1349 continue;
1350 }
1351
1352 if let ConstraintType::Unique {
1353 properties: unique_props,
1354 } = &constraint.constraint_type
1355 {
1356 let mut or_filters = Vec::new();
1358 for properties in properties_batch.iter() {
1359 let mut and_parts = Vec::new();
1360 let mut all_present = true;
1361 for prop in unique_props {
1362 if let Some(val) = properties.get(prop) {
1363 let val_str = match val {
1364 Value::String(s) => format!("'{}'", s.replace('\'', "''")),
1365 Value::Int(n) => n.to_string(),
1366 Value::Float(f) => f.to_string(),
1367 Value::Bool(b) => b.to_string(),
1368 _ => {
1369 all_present = false;
1370 break;
1371 }
1372 };
1373 and_parts.push(format!("{} = {}", prop, val_str));
1374 } else {
1375 all_present = false;
1376 break;
1377 }
1378 }
1379 if all_present {
1380 or_filters.push(format!("({})", and_parts.join(" AND ")));
1381 }
1382 }
1383
1384 #[cfg(feature = "lance-backend")]
1385 if !or_filters.is_empty() {
1386 let vid_list: Vec<String> =
1387 vids.iter().map(|v| v.as_u64().to_string()).collect();
1388 let filter = format!(
1389 "({}) AND _deleted = false AND _vid NOT IN ({})",
1390 or_filters.join(" OR "),
1391 vid_list.join(", ")
1392 );
1393
1394 if let Ok(ds) = self.storage.vertex_dataset(label)
1395 && let Ok(lance_ds) = ds.open_raw().await
1396 {
1397 let count = lance_ds.count_rows(Some(filter.clone())).await?;
1398 if count > 0 {
1399 return Err(anyhow!(
1400 "Constraint violation: Duplicate composite key for label '{}' in storage (constraint '{}')",
1401 label,
1402 constraint.name
1403 ));
1404 }
1405 }
1406 }
1407 }
1408 }
1409
1410 Ok(())
1411 }
1412
1413 async fn check_extid_globally_unique(
1422 &self,
1423 ext_id: &str,
1424 current_vid: Vid,
1425 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1426 ) -> Result<()> {
1427 let l0_buffers_to_check: Vec<Arc<RwLock<L0Buffer>>> = {
1429 let mut buffers = vec![self.l0_manager.get_current()];
1430 if let Some(tx_l0) = tx_l0 {
1431 buffers.push(tx_l0.clone());
1432 }
1433 buffers.extend(self.l0_manager.get_pending_flush());
1434 buffers
1435 };
1436
1437 for l0 in &l0_buffers_to_check {
1438 if let Some(vid) =
1439 Self::find_extid_in_properties(&l0.read().vertex_properties, ext_id, current_vid)
1440 {
1441 return Err(anyhow!(
1442 "Constraint violation: ext_id '{}' already exists (vertex {:?})",
1443 ext_id,
1444 vid
1445 ));
1446 }
1447 }
1448
1449 let backend = self.storage.backend();
1452 if let Ok(Some(found_vid)) = MainVertexDataset::find_by_ext_id(backend, ext_id, None).await
1453 && found_vid != current_vid
1454 {
1455 return Err(anyhow!(
1456 "Constraint violation: ext_id '{}' already exists (vertex {:?})",
1457 ext_id,
1458 found_vid
1459 ));
1460 }
1461
1462 Ok(())
1463 }
1464
1465 fn find_extid_in_properties(
1467 vertex_properties: &HashMap<Vid, Properties>,
1468 ext_id: &str,
1469 current_vid: Vid,
1470 ) -> Option<Vid> {
1471 vertex_properties.iter().find_map(|(&vid, props)| {
1472 if vid != current_vid && props.get("ext_id").and_then(|v| v.as_str()) == Some(ext_id) {
1473 Some(vid)
1474 } else {
1475 None
1476 }
1477 })
1478 }
1479
1480 fn get_vertex_labels_from_l0(&self, vid: Vid) -> Option<Vec<String>> {
1482 let l0 = self.l0_manager.get_current();
1483 let l0_guard = l0.read();
1484 if l0_guard.vertex_tombstones.contains(&vid) {
1486 return None;
1487 }
1488 l0_guard.get_vertex_labels(vid).map(|l| l.to_vec())
1489 }
1490
1491 pub async fn get_vertex_labels(
1495 &self,
1496 vid: Vid,
1497 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1498 ) -> Option<Vec<String>> {
1499 if let Some(labels) = self.get_vertex_labels_from_l0(vid) {
1501 return Some(labels);
1502 }
1503
1504 if let Some(tx_l0) = tx_l0 {
1506 let guard = tx_l0.read();
1507 if guard.vertex_tombstones.contains(&vid) {
1508 return None;
1509 }
1510 if let Some(labels) = guard.get_vertex_labels(vid) {
1511 return Some(labels.to_vec());
1512 }
1513 }
1514
1515 for pending_l0 in self.l0_manager.get_pending_flush() {
1517 let guard = pending_l0.read();
1518 if guard.vertex_tombstones.contains(&vid) {
1519 return None;
1520 }
1521 if let Some(labels) = guard.get_vertex_labels(vid) {
1522 return Some(labels.to_vec());
1523 }
1524 }
1525
1526 self.find_vertex_labels_in_storage(vid).await.ok().flatten()
1528 }
1529
1530 fn get_edge_type_from_l0(&self, eid: Eid) -> Option<String> {
1532 let l0 = self.l0_manager.get_current();
1533 let l0_guard = l0.read();
1534 l0_guard.get_edge_type(eid).map(|s| s.to_string())
1535 }
1536
1537 pub fn get_edge_type_id_from_l0(
1540 &self,
1541 eid: Eid,
1542 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1543 ) -> Option<u32> {
1544 if let Some(tx_l0) = tx_l0 {
1546 let guard = tx_l0.read();
1547 if let Some((_, _, etype)) = guard.get_edge_endpoint_full(eid) {
1548 return Some(etype);
1549 }
1550 }
1551 let l0 = self.l0_manager.get_current();
1553 let l0_guard = l0.read();
1554 l0_guard
1555 .get_edge_endpoint_full(eid)
1556 .map(|(_, _, etype)| etype)
1557 }
1558
1559 pub fn set_edge_type(
1562 &self,
1563 eid: Eid,
1564 type_name: String,
1565 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1566 ) {
1567 self.resolve_l0(tx_l0).write().set_edge_type(eid, type_name);
1568 }
1569
1570 fn evaluate_check_constraint(&self, expression: &str, properties: &Properties) -> Result<bool> {
1573 let parts: Vec<&str> = expression.split_whitespace().collect();
1574 if parts.len() != 3 {
1575 log::warn!(
1578 "Complex CHECK constraint expression '{}' not fully supported yet; allowing write.",
1579 expression
1580 );
1581 return Ok(true);
1582 }
1583
1584 let prop_part = parts[0].trim_start_matches('(');
1585 let prop_name = if let Some(idx) = prop_part.find('.') {
1587 &prop_part[idx + 1..]
1588 } else {
1589 prop_part
1590 };
1591
1592 let op = parts[1];
1593 let val_str = parts[2].trim_end_matches(')');
1594
1595 let prop_val = match properties.get(prop_name) {
1596 Some(v) => v,
1597 None => return Ok(true), };
1599
1600 let target_val = if (val_str.starts_with('\'') && val_str.ends_with('\''))
1602 || (val_str.starts_with('"') && val_str.ends_with('"'))
1603 {
1604 Value::String(val_str[1..val_str.len() - 1].to_string())
1605 } else if let Ok(n) = val_str.parse::<i64>() {
1606 Value::Int(n)
1607 } else if let Ok(n) = val_str.parse::<f64>() {
1608 Value::Float(n)
1609 } else if let Ok(b) = val_str.parse::<bool>() {
1610 Value::Bool(b)
1611 } else {
1612 if val_str.starts_with("Number(") && val_str.ends_with(')') {
1614 let n_str = &val_str[7..val_str.len() - 1];
1615 if let Ok(n) = n_str.parse::<i64>() {
1616 Value::Int(n)
1617 } else if let Ok(n) = n_str.parse::<f64>() {
1618 Value::Float(n)
1619 } else {
1620 Value::String(val_str.to_string())
1621 }
1622 } else {
1623 Value::String(val_str.to_string())
1624 }
1625 };
1626
1627 match op {
1628 "=" | "==" => Ok(prop_val == &target_val),
1629 "!=" | "<>" => Ok(prop_val != &target_val),
1630 ">" => self
1631 .compare_values(prop_val, &target_val)
1632 .map(|o| o.is_gt()),
1633 "<" => self
1634 .compare_values(prop_val, &target_val)
1635 .map(|o| o.is_lt()),
1636 ">=" => self
1637 .compare_values(prop_val, &target_val)
1638 .map(|o| o.is_ge()),
1639 "<=" => self
1640 .compare_values(prop_val, &target_val)
1641 .map(|o| o.is_le()),
1642 _ => {
1643 log::warn!("Unsupported operator '{}' in CHECK constraint", op);
1644 Ok(true)
1645 }
1646 }
1647 }
1648
1649 fn compare_values(&self, a: &Value, b: &Value) -> Result<std::cmp::Ordering> {
1650 use std::cmp::Ordering;
1651
1652 fn cmp_f64(x: f64, y: f64) -> Ordering {
1653 x.partial_cmp(&y).unwrap_or(Ordering::Equal)
1654 }
1655
1656 match (a, b) {
1657 (Value::Int(n1), Value::Int(n2)) => Ok(n1.cmp(n2)),
1658 (Value::Float(f1), Value::Float(f2)) => Ok(cmp_f64(*f1, *f2)),
1659 (Value::Int(n), Value::Float(f)) => Ok(cmp_f64(*n as f64, *f)),
1660 (Value::Float(f), Value::Int(n)) => Ok(cmp_f64(*f, *n as f64)),
1661 (Value::String(s1), Value::String(s2)) => Ok(s1.cmp(s2)),
1662 _ => Err(anyhow!(
1663 "Cannot compare incompatible types: {:?} vs {:?}",
1664 a,
1665 b
1666 )),
1667 }
1668 }
1669
1670 async fn check_unique_constraint_multi(
1671 &self,
1672 label: &str,
1673 key_values: &[(String, Value)],
1674 current_vid: Vid,
1675 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1676 ) -> Result<()> {
1677 let key = serialize_constraint_key(label, key_values);
1679
1680 {
1682 let l0 = self.l0_manager.get_current();
1683 let l0_guard = l0.read();
1684 if l0_guard.has_constraint_key(&key, current_vid) {
1685 return Err(anyhow!(
1686 "Constraint violation: Duplicate composite key for label '{}'",
1687 label
1688 ));
1689 }
1690 }
1691
1692 if let Some(tx_l0) = tx_l0 {
1694 let tx_l0_guard = tx_l0.read();
1695 if tx_l0_guard.has_constraint_key(&key, current_vid) {
1696 return Err(anyhow!(
1697 "Constraint violation: Duplicate composite key for label '{}' (in tx)",
1698 label
1699 ));
1700 }
1701 }
1702
1703 let filters: Vec<String> = key_values
1705 .iter()
1706 .map(|(prop, val)| {
1707 let val_str = match val {
1708 Value::String(s) => format!("'{}'", s.replace('\'', "''")),
1709 Value::Int(n) => n.to_string(),
1710 Value::Float(f) => f.to_string(),
1711 Value::Bool(b) => b.to_string(),
1712 _ => "NULL".to_string(),
1713 };
1714 format!("{} = {}", prop, val_str)
1715 })
1716 .collect();
1717
1718 let mut filter = filters.join(" AND ");
1719 filter.push_str(&format!(
1720 " AND _deleted = false AND _vid != {}",
1721 current_vid.as_u64()
1722 ));
1723
1724 #[cfg(feature = "lance-backend")]
1725 if let Ok(ds) = self.storage.vertex_dataset(label)
1726 && let Ok(lance_ds) = ds.open_raw().await
1727 {
1728 let count = lance_ds.count_rows(Some(filter.clone())).await?;
1729 if count > 0 {
1730 return Err(anyhow!(
1731 "Constraint violation: Duplicate composite key for label '{}' (in storage). Filter: {}",
1732 label,
1733 filter
1734 ));
1735 }
1736 }
1737
1738 Ok(())
1739 }
1740
1741 async fn check_write_pressure(&self) -> Result<()> {
1742 let status = self
1743 .storage
1744 .compaction_status()
1745 .map_err(|e| anyhow::anyhow!("Failed to get compaction status: {}", e))?;
1746 let l1_runs = status.l1_runs;
1747 let throttle = &self.config.throttle;
1748
1749 if l1_runs >= throttle.hard_limit {
1750 log::warn!("Write stalled: L1 runs ({}) at hard limit", l1_runs);
1751 while self
1753 .storage
1754 .compaction_status()
1755 .map_err(|e| anyhow::anyhow!("Failed to get compaction status: {}", e))?
1756 .l1_runs
1757 >= throttle.hard_limit
1758 {
1759 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1760 }
1761 } else if l1_runs >= throttle.soft_limit {
1762 let excess = l1_runs - throttle.soft_limit;
1763 let excess = std::cmp::min(excess, 31);
1765 let multiplier = 2_u32.pow(excess as u32);
1766 let delay = throttle.base_delay * multiplier;
1767 tokio::time::sleep(delay).await;
1768 }
1769 Ok(())
1770 }
1771
1772 fn check_transaction_memory(&self, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) -> Result<()> {
1775 if let Some(tx_l0) = tx_l0 {
1776 let size = tx_l0.read().estimated_size;
1777 if size > self.config.max_transaction_memory {
1778 return Err(anyhow!(
1779 "Transaction memory limit exceeded: {} bytes used, limit is {} bytes. \
1780 Roll back or commit the current transaction.",
1781 size,
1782 self.config.max_transaction_memory
1783 ));
1784 }
1785 }
1786 Ok(())
1787 }
1788
1789 async fn get_query_context(
1790 &self,
1791 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1792 ) -> Option<QueryContext> {
1793 Some(QueryContext::new_with_pending(
1794 self.l0_manager.get_current(),
1795 tx_l0.cloned(),
1796 self.l0_manager.get_pending_flush(),
1797 ))
1798 }
1799
1800 fn enforce_crdt_variants(
1812 props_meta: &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
1813 properties: &Properties,
1814 ) -> Result<()> {
1815 for (key, value) in properties {
1816 let Some(meta) = props_meta.get(key) else {
1817 continue;
1818 };
1819 let uni_common::core::schema::DataType::Crdt(expected) = &meta.r#type else {
1820 continue;
1821 };
1822 if let Some(crdt) = crate::runtime::l0::try_as_crdt(value)
1823 && crdt.type_name() != expected.type_name()
1824 {
1825 return Err(anyhow::Error::new(uni_common::UniError::Constraint {
1826 message: format!(
1827 "CRDT property '{key}' must be written as a {} value",
1828 expected.type_name()
1829 ),
1830 }));
1831 }
1832 }
1833 Ok(())
1834 }
1835
1836 async fn prepare_vertex_upsert(
1845 &self,
1846 vid: Vid,
1847 properties: &mut Properties,
1848 label: Option<&str>,
1849 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1850 ) -> Result<()> {
1851 let Some(pm) = &self.property_manager else {
1852 return Ok(());
1853 };
1854
1855 let schema = self.schema_manager.schema();
1856
1857 let discovered_labels;
1859 let label_name = if let Some(l) = label {
1860 Some(l)
1861 } else {
1862 discovered_labels = self.get_vertex_labels(vid, tx_l0).await;
1863 discovered_labels
1864 .as_ref()
1865 .and_then(|l| l.first().map(|s| s.as_str()))
1866 };
1867
1868 let Some(label_str) = label_name else {
1869 return Ok(());
1870 };
1871 let Some(props_meta) = schema.properties.get(label_str) else {
1872 return Ok(());
1873 };
1874
1875 let crdt_keys: Vec<String> = properties
1877 .keys()
1878 .filter(|key| {
1879 props_meta.get(*key).is_some_and(|meta| {
1880 matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
1881 })
1882 })
1883 .cloned()
1884 .collect();
1885
1886 if crdt_keys.is_empty() {
1887 return Ok(());
1888 }
1889
1890 Self::enforce_crdt_variants(props_meta, properties)?;
1903
1904 let ctx = self.get_query_context(tx_l0).await;
1905 for key in crdt_keys {
1906 let existing = pm.get_vertex_prop_with_ctx(vid, &key, ctx.as_ref()).await?;
1907 if !existing.is_null()
1908 && let Some(val) = properties.get_mut(&key)
1909 {
1910 *val = pm.merge_crdt_values(&existing, val)?;
1911 }
1912 }
1913
1914 Ok(())
1915 }
1916
1917 async fn prepare_edge_upsert(
1918 &self,
1919 eid: Eid,
1920 properties: &mut Properties,
1921 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1922 ) -> Result<()> {
1923 if let Some(pm) = &self.property_manager {
1924 let schema = self.schema_manager.schema();
1925 let type_name = self.get_edge_type_from_l0(eid);
1927
1928 if let Some(ref t_name) = type_name
1929 && let Some(props_meta) = schema.properties.get(t_name)
1930 {
1931 let mut crdt_keys = Vec::new();
1932 for (key, _) in properties.iter() {
1933 if let Some(meta) = props_meta.get(key)
1934 && matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
1935 {
1936 crdt_keys.push(key.clone());
1937 }
1938 }
1939
1940 if !crdt_keys.is_empty() {
1941 let ctx = self.get_query_context(tx_l0).await;
1942 for key in crdt_keys {
1943 let existing = pm.get_edge_prop(eid, &key, ctx.as_ref()).await?;
1944
1945 if !existing.is_null()
1946 && let Some(val) = properties.get_mut(&key)
1947 {
1948 *val = pm.merge_crdt_values(&existing, val)?;
1949 }
1950 }
1951 }
1952 }
1953 }
1954 Ok(())
1955 }
1956
1957 #[instrument(skip(self, properties), level = "trace")]
1958 pub async fn insert_vertex(
1959 &self,
1960 vid: Vid,
1961 properties: Properties,
1962 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1963 ) -> Result<()> {
1964 self.insert_vertex_with_labels(vid, properties, &[], tx_l0)
1965 .await?;
1966 Ok(())
1967 }
1968
1969 async fn freeze_for_non_tx_write_if_pinned(&self, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) {
1982 if tx_l0.is_none() && self.l0_manager.is_current_pinned() {
1986 let _flush_lock_guard = self.flush_lock.lock().await;
1987 if self.l0_manager.is_current_pinned() {
1989 self.l0_manager.freeze_current_for_snapshot();
1990 metrics::counter!("uni_l0_snapshot_freezes_total").increment(1);
1991 }
1992 }
1993 }
1994
1995 #[instrument(skip(self, properties, labels), level = "trace")]
1996 pub async fn insert_vertex_with_labels(
1997 &self,
1998 vid: Vid,
1999 mut properties: Properties,
2000 labels: &[String],
2001 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2002 ) -> Result<Properties> {
2003 let start = std::time::Instant::now();
2004 self.check_write_pressure().await?;
2005 self.check_transaction_memory(tx_l0)?;
2006
2007 self.freeze_for_non_tx_write_if_pinned(tx_l0).await;
2012
2013 if !self.try_defer_embedding(labels, &properties, vid, tx_l0) {
2014 self.process_embeddings_for_labels(labels, &mut properties)
2015 .await?;
2016 }
2017 self.validate_vertex_constraints(vid, &properties, labels, tx_l0)
2018 .await?;
2019 self.prepare_vertex_upsert(
2020 vid,
2021 &mut properties,
2022 labels.first().map(|s| s.as_str()),
2023 tx_l0,
2024 )
2025 .await?;
2026
2027 let properties_copy = properties.clone();
2029 let labels_copy = labels.to_vec();
2030
2031 {
2032 let l0 = self.resolve_l0(tx_l0);
2033 let mut l0_guard = l0.write();
2034 l0_guard.insert_vertex_with_labels(vid, properties, labels);
2035
2036 let schema = self.schema_manager.schema();
2038 for label in &labels_copy {
2039 if schema.get_label_case_insensitive(label).is_none() {
2040 if self.config.strict_schema {
2041 return Err(anyhow::anyhow!(
2042 "Label '{}' is not defined in the schema \
2043 (strict_schema is enabled).",
2044 label
2045 ));
2046 }
2047 continue; }
2049
2050 for constraint in &schema.constraints {
2052 if !constraint.enabled {
2053 continue;
2054 }
2055 if let ConstraintTarget::Label(l) = &constraint.target {
2056 if l != label {
2057 continue;
2058 }
2059 } else {
2060 continue;
2061 }
2062
2063 if let ConstraintType::Unique {
2064 properties: unique_props,
2065 } = &constraint.constraint_type
2066 {
2067 let mut key_values = Vec::new();
2068 let mut all_present = true;
2069 for prop in unique_props {
2070 if let Some(val) = properties_copy.get(prop) {
2071 key_values.push((prop.clone(), val.clone()));
2072 } else {
2073 all_present = false;
2074 break;
2075 }
2076 }
2077
2078 if all_present {
2079 let key = serialize_constraint_key(label, &key_values);
2080 l0_guard.insert_constraint_key(key, vid);
2081 }
2082 }
2083 }
2084 }
2085 }
2086
2087 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2088 self.update_metrics();
2089
2090 if tx_l0.is_none() {
2091 self.check_flush().await?;
2092 }
2093 if start.elapsed().as_millis() > 100 {
2094 log::warn!("Slow insert_vertex: {}ms", start.elapsed().as_millis());
2095 }
2096 Ok(properties_copy)
2097 }
2098
2099 fn touched_needs_full_read(&self, touched: &Properties, labels: &[String]) -> bool {
2107 if touched.contains_key("ext_id") {
2108 return true;
2109 }
2110 let schema = self.schema_manager.schema();
2111 for label in labels {
2112 if schema.get_label_case_insensitive(label).is_none() {
2113 continue;
2114 }
2115 for constraint in &schema.constraints {
2116 if !constraint.enabled {
2117 continue;
2118 }
2119 if let ConstraintTarget::Label(l) = &constraint.target {
2120 if !l.eq_ignore_ascii_case(label) {
2121 continue;
2122 }
2123 } else {
2124 continue;
2125 }
2126 if let ConstraintType::Unique {
2127 properties: unique_props,
2128 } = &constraint.constraint_type
2129 {
2130 if unique_props.len() < 2 {
2131 continue; }
2133 if unique_props.iter().any(|p| touched.contains_key(p)) {
2134 return true;
2135 }
2136 }
2137 }
2138 }
2139 false
2140 }
2141
2142 #[instrument(skip(self, props, touched_keys, labels), level = "trace")]
2155 pub async fn insert_vertex_partial_full(
2156 &self,
2157 vid: Vid,
2158 mut props: Properties,
2159 touched_keys: HashSet<String>,
2160 labels: &[String],
2161 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2162 ) -> Result<()> {
2163 if !self.config.partial_lance_writes
2164 || self.touched_needs_full_read(&props_subset(&props, &touched_keys), labels)
2165 {
2166 self.insert_vertex_with_labels(vid, props, labels, tx_l0)
2167 .await?;
2168 return Ok(());
2169 }
2170
2171 self.check_write_pressure().await?;
2172 self.check_transaction_memory(tx_l0)?;
2173 if !self.try_defer_embedding(labels, &props, vid, tx_l0) {
2174 self.process_embeddings_for_labels(labels, &mut props)
2175 .await?;
2176 }
2177 self.validate_vertex_constraints(vid, &props, labels, tx_l0)
2180 .await?;
2181 {
2182 let l0 = self.resolve_l0(tx_l0);
2183 let mut l0_guard = l0.write();
2184 l0_guard.insert_vertex_partial_full(vid, props, touched_keys, labels);
2185 }
2186 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2187 metrics::counter!("uni_partial_writes_total").increment(1);
2188 self.update_metrics();
2189 if tx_l0.is_none() {
2190 self.check_flush().await?;
2191 }
2192 Ok(())
2193 }
2194
2195 #[instrument(skip(self, touched, labels), level = "trace")]
2211 pub async fn insert_vertex_partial(
2212 &self,
2213 vid: Vid,
2214 touched: Properties,
2215 labels: &[String],
2216 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2217 ) -> Result<()> {
2218 let needs_full_read =
2219 !self.config.partial_lance_writes || self.touched_needs_full_read(&touched, labels);
2220 if needs_full_read {
2221 let existing = if let Some(pm) = &self.property_manager {
2226 pm.get_all_vertex_props_with_ctx(vid, None)
2227 .await
2228 .unwrap_or_default()
2229 .unwrap_or_default()
2230 } else {
2231 Properties::new()
2232 };
2233 let mut merged = existing;
2234 for (k, v) in touched {
2235 merged.insert(k, v);
2236 }
2237 self.insert_vertex_with_labels(vid, merged, labels, tx_l0)
2238 .await?;
2239 return Ok(());
2240 }
2241
2242 let mut touched = touched;
2250 self.check_write_pressure().await?;
2251 self.check_transaction_memory(tx_l0)?;
2252 if !self.try_defer_embedding(labels, &touched, vid, tx_l0) {
2253 self.process_embeddings_for_labels(labels, &mut touched)
2254 .await?;
2255 }
2256 self.validate_vertex_constraints_partial(vid, &touched, labels, tx_l0)
2257 .await?;
2258
2259 {
2260 let l0 = self.resolve_l0(tx_l0);
2261 let mut l0_guard = l0.write();
2262 l0_guard.insert_vertex_partial(vid, touched, labels);
2263 }
2264
2265 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2266 metrics::counter!("uni_partial_writes_total").increment(1);
2267 self.update_metrics();
2268 if tx_l0.is_none() {
2269 self.check_flush().await?;
2270 }
2271 Ok(())
2272 }
2273
2274 pub async fn insert_vertices_batch(
2300 &self,
2301 vids: Vec<Vid>,
2302 mut properties_batch: Vec<Properties>,
2303 labels: Vec<String>,
2304 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2305 ) -> Result<Vec<Properties>> {
2306 let start = std::time::Instant::now();
2307
2308 if vids.len() != properties_batch.len() {
2310 return Err(anyhow!(
2311 "VID/properties size mismatch: {} vids, {} properties",
2312 vids.len(),
2313 properties_batch.len()
2314 ));
2315 }
2316
2317 if vids.is_empty() {
2318 return Ok(Vec::new());
2319 }
2320
2321 let result = async {
2324 self.check_write_pressure().await?;
2325 self.check_transaction_memory(tx_l0)?;
2326
2327 self.freeze_for_non_tx_write_if_pinned(tx_l0).await;
2331
2332 self.process_embeddings_for_batch(&labels, &mut properties_batch)
2334 .await?;
2335
2336 let label = labels
2338 .first()
2339 .ok_or_else(|| anyhow!("No labels provided"))?;
2340 self.validate_vertex_batch_constraints(&vids, &properties_batch, label, tx_l0)
2341 .await?;
2342
2343 let has_crdt_fields = {
2348 let schema = self.schema_manager.schema();
2349 schema
2350 .properties
2351 .get(label.as_str())
2352 .is_some_and(|props_meta| {
2353 props_meta.values().any(|meta| {
2354 matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2355 })
2356 })
2357 };
2358
2359 if has_crdt_fields {
2360 {
2366 let schema = self.schema_manager.schema();
2367 if let Some(props_meta) = schema.properties.get(label.as_str()) {
2368 for props in &properties_batch {
2369 Self::enforce_crdt_variants(props_meta, props)?;
2370 }
2371 }
2372 }
2373
2374 let schema = self.schema_manager.schema();
2377 let crdt_keys: Vec<String> = schema
2378 .properties
2379 .get(label.as_str())
2380 .map(|props_meta| {
2381 props_meta
2382 .iter()
2383 .filter(|(_, meta)| {
2384 matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2385 })
2386 .map(|(key, _)| key.clone())
2387 .collect()
2388 })
2389 .unwrap_or_default();
2390
2391 if let Some(pm) = &self.property_manager {
2392 let ctx = self.get_query_context(tx_l0).await;
2393 for (vid, props) in vids.iter().zip(&mut properties_batch) {
2394 for key in &crdt_keys {
2395 if props.contains_key(key) {
2396 let existing =
2397 pm.get_vertex_prop_with_ctx(*vid, key, ctx.as_ref()).await?;
2398 if !existing.is_null()
2399 && let Some(val) = props.get_mut(key)
2400 {
2401 *val = pm.merge_crdt_values(&existing, val)?;
2402 }
2403 }
2404 }
2405 }
2406 }
2407 }
2408
2409 let target_l0 = self.resolve_l0(tx_l0);
2411
2412 let properties_result = properties_batch.clone();
2413 {
2414 let mut l0_guard = target_l0.write();
2415 for (vid, props) in vids.iter().zip(properties_batch.iter()) {
2416 l0_guard.insert_vertex_with_labels(*vid, props.clone(), &labels);
2417 }
2418 }
2419
2420 metrics::counter!("uni_l0_buffer_mutations_total").increment(vids.len() as u64);
2422 self.update_metrics();
2423
2424 Ok::<Vec<Properties>, anyhow::Error>(properties_result)
2425 }
2426 .await;
2427
2428 let props = result?;
2429
2430 if start.elapsed().as_millis() > 100 {
2431 log::warn!(
2432 "Slow insert_vertices_batch ({} vertices): {}ms",
2433 vids.len(),
2434 start.elapsed().as_millis()
2435 );
2436 }
2437
2438 Ok(props)
2439 }
2440
2441 #[instrument(skip(self, labels), level = "trace")]
2452 pub async fn delete_vertex(
2453 &self,
2454 vid: Vid,
2455 labels: Option<Vec<String>>,
2456 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2457 ) -> Result<()> {
2458 let start = std::time::Instant::now();
2459 self.check_write_pressure().await?;
2460 self.check_transaction_memory(tx_l0)?;
2461 self.freeze_for_non_tx_write_if_pinned(tx_l0).await; let l0 = self.resolve_l0(tx_l0);
2463
2464 let has_labels = {
2467 let l0_guard = l0.read();
2468 l0_guard.vertex_labels.contains_key(&vid)
2469 };
2470
2471 if !has_labels {
2472 let resolved_labels = if let Some(provided) = labels {
2473 Some(provided)
2475 } else {
2476 let mut found = None;
2478 for pending_l0 in self.l0_manager.get_pending_flush() {
2479 let pending_guard = pending_l0.read();
2480 if let Some(l) = pending_guard.get_vertex_labels(vid) {
2481 found = Some(l.to_vec());
2482 break;
2483 }
2484 }
2485 if found.is_none() {
2486 found = self.find_vertex_labels_in_storage(vid).await?;
2487 }
2488 found
2489 };
2490
2491 if let Some(found_labels) = resolved_labels {
2492 let mut l0_guard = l0.write();
2493 l0_guard.vertex_labels.insert(vid, found_labels);
2494 }
2495 }
2496
2497 l0.write().delete_vertex(vid)?;
2498 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2499 self.update_metrics();
2500
2501 if tx_l0.is_none() {
2502 self.check_flush().await?;
2503 }
2504 if start.elapsed().as_millis() > 100 {
2505 log::warn!("Slow delete_vertex: {}ms", start.elapsed().as_millis());
2506 }
2507 Ok(())
2508 }
2509
2510 async fn find_vertex_labels_in_storage(&self, vid: Vid) -> Result<Option<Vec<String>>> {
2513 use crate::backend::types::ScanRequest;
2514 use arrow_array::Array;
2515 use arrow_array::cast::AsArray;
2516
2517 let backend = self.storage.backend();
2518 let table_name = MainVertexDataset::table_name();
2519
2520 if !backend.table_exists(table_name).await? {
2522 return Ok(None);
2523 }
2524
2525 let filter = format!("_vid = {}", vid.as_u64());
2527 let batches = backend
2528 .scan(
2529 ScanRequest::all(table_name)
2530 .with_filter(filter)
2531 .with_columns(vec![
2532 "_vid".to_string(),
2533 "labels".to_string(),
2534 "_version".to_string(),
2535 "_deleted".to_string(),
2536 ]),
2537 )
2538 .await
2539 .unwrap_or_default();
2540
2541 let mut max_version: Option<u64> = None;
2543 let mut labels: Option<Vec<String>> = None;
2544 let mut is_deleted = false;
2545
2546 for batch in batches {
2547 if batch.num_rows() == 0 {
2548 continue;
2549 }
2550
2551 let version_array = batch
2552 .column_by_name("_version")
2553 .unwrap()
2554 .as_primitive::<arrow_array::types::UInt64Type>();
2555
2556 let deleted_array = batch.column_by_name("_deleted").unwrap().as_boolean();
2557
2558 let labels_array = batch.column_by_name("labels").unwrap().as_list::<i32>();
2559
2560 for row_idx in 0..batch.num_rows() {
2561 let version = version_array.value(row_idx);
2562
2563 if max_version.is_none_or(|mv| version > mv) {
2564 is_deleted = deleted_array.value(row_idx);
2565
2566 let labels_list = labels_array.value(row_idx);
2567 let string_array = labels_list.as_string::<i32>();
2568 let vertex_labels: Vec<String> = (0..string_array.len())
2569 .filter(|&i| !string_array.is_null(i))
2570 .map(|i| string_array.value(i).to_string())
2571 .collect();
2572
2573 max_version = Some(version);
2574 labels = Some(vertex_labels);
2575 }
2576 }
2577 }
2578
2579 if is_deleted { Ok(None) } else { Ok(labels) }
2581 }
2582
2583 #[expect(clippy::too_many_arguments)]
2584 #[instrument(skip(self, props, touched_keys), level = "trace")]
2585 pub async fn insert_edge_partial_full(
2586 &self,
2587 src_vid: Vid,
2588 dst_vid: Vid,
2589 edge_type: u32,
2590 eid: Eid,
2591 props: Properties,
2592 edge_type_name: Option<String>,
2593 touched_keys: HashSet<String>,
2594 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2595 ) -> Result<()> {
2596 self.freeze_for_non_tx_write_if_pinned(tx_l0).await; if !self.config.partial_lance_writes {
2598 return self
2599 .insert_edge(
2600 src_vid,
2601 dst_vid,
2602 edge_type,
2603 eid,
2604 props,
2605 edge_type_name,
2606 tx_l0,
2607 )
2608 .await;
2609 }
2610
2611 let start = std::time::Instant::now();
2612 self.check_write_pressure().await?;
2613 self.check_transaction_memory(tx_l0)?;
2614 let mut props = props;
2615 self.prepare_edge_upsert(eid, &mut props, tx_l0).await?;
2616
2617 let l0 = self.resolve_l0(tx_l0);
2618 l0.write().insert_edge_partial_full(
2619 src_vid,
2620 dst_vid,
2621 edge_type,
2622 eid,
2623 props,
2624 edge_type_name,
2625 touched_keys,
2626 )?;
2627
2628 if tx_l0.is_none() {
2629 let version = l0.read().current_version;
2630 self.adjacency_manager
2631 .insert_edge(src_vid, dst_vid, eid, edge_type, version);
2632 }
2633
2634 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2635 metrics::counter!("uni_partial_writes_total").increment(1);
2636 self.update_metrics();
2637 if tx_l0.is_none() {
2638 self.check_flush().await?;
2639 }
2640 if start.elapsed().as_millis() > 100 {
2641 log::warn!(
2642 "Slow insert_edge_partial_full: {}ms",
2643 start.elapsed().as_millis()
2644 );
2645 }
2646 Ok(())
2647 }
2648
2649 #[expect(clippy::too_many_arguments)]
2650 pub async fn insert_edge(
2651 &self,
2652 src_vid: Vid,
2653 dst_vid: Vid,
2654 edge_type: u32,
2655 eid: Eid,
2656 mut properties: Properties,
2657 edge_type_name: Option<String>,
2658 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2659 ) -> Result<()> {
2660 let start = std::time::Instant::now();
2661 self.check_write_pressure().await?;
2662 self.check_transaction_memory(tx_l0)?;
2663 self.freeze_for_non_tx_write_if_pinned(tx_l0).await; self.prepare_edge_upsert(eid, &mut properties, tx_l0)
2665 .await?;
2666
2667 let l0 = self.resolve_l0(tx_l0);
2668 l0.write()
2669 .insert_edge(src_vid, dst_vid, edge_type, eid, properties, edge_type_name)?;
2670
2671 if tx_l0.is_none() {
2674 let version = l0.read().current_version;
2675 self.adjacency_manager
2676 .insert_edge(src_vid, dst_vid, eid, edge_type, version);
2677 }
2678
2679 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2680 self.update_metrics();
2681
2682 if tx_l0.is_none() {
2683 self.check_flush().await?;
2684 }
2685 if start.elapsed().as_millis() > 100 {
2686 log::warn!("Slow insert_edge: {}ms", start.elapsed().as_millis());
2687 }
2688 Ok(())
2689 }
2690
2691 #[instrument(skip(self), level = "trace")]
2692 pub async fn delete_edge(
2693 &self,
2694 eid: Eid,
2695 src_vid: Vid,
2696 dst_vid: Vid,
2697 edge_type: u32,
2698 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2699 ) -> Result<()> {
2700 let start = std::time::Instant::now();
2701 self.check_write_pressure().await?;
2702 self.check_transaction_memory(tx_l0)?;
2703 self.freeze_for_non_tx_write_if_pinned(tx_l0).await; let l0 = self.resolve_l0(tx_l0);
2705
2706 l0.write().delete_edge(eid, src_vid, dst_vid, edge_type)?;
2707
2708 if tx_l0.is_none() {
2710 let version = l0.read().current_version;
2711 self.adjacency_manager
2712 .add_tombstone(eid, src_vid, dst_vid, edge_type, version);
2713 }
2714 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2715 self.update_metrics();
2716
2717 if tx_l0.is_none() {
2718 self.check_flush().await?;
2719 }
2720 if start.elapsed().as_millis() > 100 {
2721 log::warn!("Slow delete_edge: {}ms", start.elapsed().as_millis());
2722 }
2723 Ok(())
2724 }
2725
2726 fn should_flush(&self) -> bool {
2733 let count = self.l0_manager.get_current().read().mutation_count;
2734 if count == 0 {
2735 return false;
2736 }
2737 if count >= self.config.auto_flush_threshold {
2738 return true;
2739 }
2740 if let Some(interval) = self.config.auto_flush_interval
2741 && self.last_flush_time.lock().elapsed() >= interval
2742 && count >= self.config.auto_flush_min_mutations
2743 {
2744 return true;
2745 }
2746 false
2747 }
2748
2749 pub async fn check_flush(&self) -> Result<()> {
2753 if self.should_flush() {
2754 self.flush_to_l1(None).await?;
2755 }
2756 Ok(())
2757 }
2758
2759 async fn process_embeddings_for_labels(
2762 &self,
2763 labels: &[String],
2764 properties: &mut Properties,
2765 ) -> Result<()> {
2766 let label_name = labels.first().map(|s| s.as_str());
2767 self.process_embeddings_impl(label_name, properties).await
2768 }
2769
2770 fn try_defer_embedding(
2792 &self,
2793 labels: &[String],
2794 properties: &Properties,
2795 vid: Vid,
2796 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2797 ) -> bool {
2798 if !self.config.defer_embeddings {
2799 return false;
2800 }
2801 let Some(label) = labels.first() else {
2802 return false;
2803 };
2804
2805 let schema = self.schema_manager.schema();
2806 let mut has_unsatisfied_cfg = false;
2807 for idx in &schema.indexes {
2808 if let IndexDefinition::Vector(v_cfg) = idx
2809 && v_cfg.label == *label
2810 && v_cfg.embedding_config.is_some()
2811 && !properties.contains_key(&v_cfg.property)
2812 {
2813 has_unsatisfied_cfg = true;
2814 break;
2815 }
2816 }
2817 if !has_unsatisfied_cfg {
2818 return false;
2819 }
2820
2821 let l0 = self.resolve_l0(tx_l0);
2822 let mut guard = l0.write();
2823 guard.pending_embeddings.insert(vid, label.clone());
2824 true
2825 }
2826
2827 async fn drain_pending_embeddings(&self, old_l0_arc: &Arc<RwLock<L0Buffer>>) -> Result<()> {
2841 let by_label: HashMap<String, Vec<Vid>> = {
2842 let guard = old_l0_arc.read();
2843 if guard.pending_embeddings.is_empty() {
2844 return Ok(());
2845 }
2846 let mut m: HashMap<String, Vec<Vid>> = HashMap::new();
2847 for (vid, label) in &guard.pending_embeddings {
2848 m.entry(label.clone()).or_default().push(*vid);
2849 }
2850 m
2851 };
2852
2853 for (label, vids) in by_label {
2854 let mut properties_batch: Vec<Properties> = {
2855 let guard = old_l0_arc.read();
2856 vids.iter()
2857 .map(|vid| {
2858 guard
2859 .vertex_properties
2860 .get(vid)
2861 .cloned()
2862 .unwrap_or_default()
2863 })
2864 .collect()
2865 };
2866
2867 self.process_embeddings_for_batch(std::slice::from_ref(&label), &mut properties_batch)
2868 .await?;
2869
2870 let mut guard = old_l0_arc.write();
2871 for (vid, props) in vids.iter().zip(properties_batch) {
2872 let target = guard.vertex_properties.entry(*vid).or_default();
2873 for (k, v) in props {
2874 target.insert(k, v);
2875 }
2876 guard.pending_embeddings.remove(vid);
2877 }
2878 }
2879 Ok(())
2880 }
2881
2882 async fn process_embeddings_for_batch(
2892 &self,
2893 labels: &[String],
2894 properties_batch: &mut [Properties],
2895 ) -> Result<()> {
2896 let label_name = labels.first().map(|s| s.as_str());
2897 let schema = self.schema_manager.schema();
2898
2899 if let Some(label) = label_name {
2900 let mut configs = Vec::new();
2902 for idx in &schema.indexes {
2903 if let IndexDefinition::Vector(v_config) = idx
2904 && v_config.label == label
2905 && let Some(emb_config) = &v_config.embedding_config
2906 {
2907 configs.push((v_config.property.clone(), emb_config.clone()));
2908 }
2909 }
2910
2911 if configs.is_empty() {
2912 return Ok(());
2913 }
2914
2915 for (target_prop, emb_config) in configs {
2916 let mut input_texts: Vec<String> = Vec::new();
2918 let mut needs_embedding: Vec<usize> = Vec::new();
2919
2920 for (idx, properties) in properties_batch.iter().enumerate() {
2921 if properties.contains_key(&target_prop) {
2923 continue;
2924 }
2925
2926 let mut inputs = Vec::new();
2928 for src_prop in &emb_config.source_properties {
2929 if let Some(val) = properties.get(src_prop)
2930 && let Some(s) = val.as_str()
2931 {
2932 inputs.push(s.to_string());
2933 }
2934 }
2935
2936 if !inputs.is_empty() {
2937 let input_text = inputs.join(" ");
2938 let input_text = match &emb_config.document_prefix {
2939 Some(prefix) => format!("{prefix}{input_text}"),
2940 None => input_text,
2941 };
2942 input_texts.push(input_text);
2943 needs_embedding.push(idx);
2944 }
2945 }
2946
2947 if input_texts.is_empty() {
2948 continue;
2949 }
2950
2951 let runtime = self.xervo_runtime.get().ok_or_else(|| {
2952 anyhow!("Uni-Xervo runtime not configured for auto-embedding")
2953 })?;
2954 let embedder = runtime.embedding(&emb_config.alias).await?;
2955
2956 let input_refs: Vec<&str> = input_texts.iter().map(|s| s.as_str()).collect();
2958 let embeddings = embedder.embed(input_refs).await?;
2959
2960 for (embedding_idx, &prop_idx) in needs_embedding.iter().enumerate() {
2962 if let Some(vec) = embeddings.get(embedding_idx) {
2963 let vals: Vec<Value> =
2964 vec.iter().map(|f| Value::Float(*f as f64)).collect();
2965 properties_batch[prop_idx].insert(target_prop.clone(), Value::List(vals));
2966 }
2967 }
2968 }
2969 }
2970
2971 Ok(())
2972 }
2973
2974 async fn process_embeddings_impl(
2975 &self,
2976 label_name: Option<&str>,
2977 properties: &mut Properties,
2978 ) -> Result<()> {
2979 let schema = self.schema_manager.schema();
2980
2981 if let Some(label) = label_name {
2982 let mut configs = Vec::new();
2984 for idx in &schema.indexes {
2985 if let IndexDefinition::Vector(v_config) = idx
2986 && v_config.label == label
2987 && let Some(emb_config) = &v_config.embedding_config
2988 {
2989 configs.push((v_config.property.clone(), emb_config.clone()));
2990 }
2991 }
2992
2993 if configs.is_empty() {
2994 log::info!("No embedding config found for label {}", label);
2995 }
2996
2997 for (target_prop, emb_config) in configs {
2998 if properties.contains_key(&target_prop) {
3000 continue;
3001 }
3002
3003 let mut inputs = Vec::new();
3005 for src_prop in &emb_config.source_properties {
3006 if let Some(val) = properties.get(src_prop)
3007 && let Some(s) = val.as_str()
3008 {
3009 inputs.push(s.to_string());
3010 }
3011 }
3012
3013 if inputs.is_empty() {
3014 continue;
3015 }
3016
3017 let input_text = inputs.join(" ");
3018 let input_text = match &emb_config.document_prefix {
3019 Some(prefix) => format!("{prefix}{input_text}"),
3020 None => input_text,
3021 };
3022
3023 let runtime = self.xervo_runtime.get().ok_or_else(|| {
3024 anyhow!("Uni-Xervo runtime not configured for auto-embedding")
3025 })?;
3026 let embedder = runtime.embedding(&emb_config.alias).await?;
3027
3028 let embeddings = embedder.embed(vec![input_text.as_str()]).await?;
3030 if let Some(vec) = embeddings.first() {
3031 let vals: Vec<Value> = vec.iter().map(|f| Value::Float(*f as f64)).collect();
3033 properties.insert(target_prop.clone(), Value::List(vals));
3034 }
3035 }
3036 }
3037 Ok(())
3038 }
3039
3040 pub async fn flush_to_l1(&self, name: Option<String>) -> Result<String> {
3055 if let Some(coord) = self.flush_coordinator.as_ref() {
3063 let _ = coord.drain(self.config.drop_fork_drain_timeout).await;
3064 }
3065 let _flush_lock_guard = self.flush_lock.lock().await;
3066 self.flush_inline_under_lock(name).await
3067 }
3068
3069 pub async fn flush_to_l1_async(
3077 self: &Arc<Self>,
3078 name: Option<String>,
3079 ) -> Result<crate::runtime::flush_coordinator::FlushTicket> {
3080 let coord = self
3081 .flush_coordinator
3082 .as_ref()
3083 .ok_or_else(|| anyhow!("async flush not enabled (config.async_flush_enabled=false)"))?
3084 .clone();
3085 let permit = coord.acquire_permit().await?;
3088 let seq = coord.next_rotate_seq();
3089 coord.note_pending();
3090 let RotateOutput {
3092 old_l0_arc,
3093 wal_lsn,
3094 current_version,
3095 flush_in_progress_guard,
3096 } = {
3097 let _flush_lock_guard = self.flush_lock.lock().await;
3098 self.flush_l0_rotate().await?
3099 };
3100 let parent_manifest = self.cached_manifest.lock().clone();
3103 let rotated = RotatedFlush {
3104 seq,
3105 old_l0_arc: old_l0_arc.clone(),
3106 wal_lsn,
3107 current_version,
3108 name: name.clone(),
3109 parent_manifest,
3110 permit,
3111 flush_in_progress_guard,
3112 };
3113 let writer = self.clone();
3117 let ticket = coord.submit_for_stream(rotated, move |old_l0, wal, ver, n| async move {
3118 let outcome = writer.flush_stream_l1(old_l0, wal, ver, n).await?;
3119 Ok(crate::runtime::flush_coordinator::FlushOutcome {
3120 new_manifest: outcome.manifest,
3121 snapshot_id: outcome.snapshot_id,
3122 })
3123 });
3124 Ok(ticket)
3125 }
3126
3127 async fn flush_l0_rotate(&self) -> Result<RotateOutput> {
3138 let flush_in_progress_guard = FlushInProgressGuard::new(&self.storage);
3143
3144 let wal_for_truncate = {
3148 let current_l0 = self.l0_manager.get_current();
3149 let l0_guard = current_l0.read();
3150 l0_guard.wal.clone()
3151 };
3152 let wal_lsn = if let Some(ref w) = wal_for_truncate {
3153 w.flush().await?
3154 } else {
3155 0
3156 };
3157
3158 let old_l0_arc = self.l0_manager.begin_flush(0, None);
3161 metrics::counter!("uni_l0_buffer_rotations_total").increment(1);
3162
3163 let current_version;
3166 {
3167 let mut old_l0_guard = old_l0_arc.write();
3168 current_version = old_l0_guard.current_version;
3169 old_l0_guard.wal_lsn_at_flush = wal_lsn;
3170 let wal = old_l0_guard.wal.take();
3171 let new_l0_arc = self.l0_manager.get_current();
3172 let mut new_l0_guard = new_l0_arc.write();
3173 new_l0_guard.wal = wal;
3174 new_l0_guard.current_version = current_version;
3175 }
3176
3177 Ok(RotateOutput {
3178 old_l0_arc,
3179 wal_lsn,
3180 current_version,
3181 flush_in_progress_guard,
3182 })
3183 }
3184
3185 async fn flush_stream_l1(
3196 &self,
3197 old_l0_arc: Arc<RwLock<L0Buffer>>,
3198 wal_lsn: u64,
3199 current_version: u64,
3200 name: Option<String>,
3201 ) -> Result<FlushOutcome> {
3202 self.drain_pending_embeddings(&old_l0_arc).await?;
3207
3208 let schema = self.schema_manager.schema();
3209 let mut entries_by_type: HashMap<u32, Vec<L1Entry>> = HashMap::new();
3211 type VertexEntry = (Vid, Vec<String>, Properties, bool, u64);
3213 let mut vertices_by_label: HashMap<u16, Vec<VertexEntry>> = HashMap::new();
3214 type PartialEntry = (Vid, Properties, u64, std::collections::HashSet<String>);
3220 let mut partial_by_label: HashMap<u16, Vec<PartialEntry>> = HashMap::new();
3221 let mut tombstones_by_label: HashMap<u16, Vec<(Vid, u64)>> = HashMap::new();
3226 let mut main_vertex_tombstones: Vec<(Vid, u64)> = Vec::new();
3227 let mut vertex_created_at: HashMap<Vid, i64> = HashMap::new();
3229 let mut vertex_updated_at: HashMap<Vid, i64> = HashMap::new();
3230 let mut orphaned_tombstones: Vec<(Vid, u64)> = Vec::new();
3232
3233 {
3234 let old_l0 = old_l0_arc.read();
3235
3236 for edge in old_l0.graph.edges() {
3238 let properties = old_l0
3239 .edge_properties
3240 .get(&edge.eid)
3241 .cloned()
3242 .unwrap_or_default();
3243 let version = old_l0.edge_versions.get(&edge.eid).copied().unwrap_or(0);
3244
3245 let created_at = old_l0.edge_created_at.get(&edge.eid).copied();
3247 let updated_at = old_l0.edge_updated_at.get(&edge.eid).copied();
3248
3249 entries_by_type
3250 .entry(edge.edge_type)
3251 .or_default()
3252 .push(L1Entry {
3253 src_vid: edge.src_vid,
3254 dst_vid: edge.dst_vid,
3255 eid: edge.eid,
3256 op: Op::Insert,
3257 version,
3258 properties,
3259 created_at,
3260 updated_at,
3261 });
3262 }
3263
3264 for tombstone in old_l0.tombstones.values() {
3266 let version = old_l0
3267 .edge_versions
3268 .get(&tombstone.eid)
3269 .copied()
3270 .unwrap_or(0);
3271 let created_at = old_l0.edge_created_at.get(&tombstone.eid).copied();
3273 let updated_at = old_l0.edge_updated_at.get(&tombstone.eid).copied();
3274
3275 entries_by_type
3276 .entry(tombstone.edge_type)
3277 .or_default()
3278 .push(L1Entry {
3279 src_vid: tombstone.src_vid,
3280 dst_vid: tombstone.dst_vid,
3281 eid: tombstone.eid,
3282 op: Op::Delete,
3283 version,
3284 properties: HashMap::new(),
3285 created_at,
3286 updated_at,
3287 });
3288 }
3289
3290 let push_vertex_to_labels =
3296 |vid: Vid,
3297 all_labels: &[String],
3298 props: Properties,
3299 deleted: bool,
3300 version: u64,
3301 out: &mut HashMap<u16, Vec<VertexEntry>>| {
3302 for label in all_labels {
3303 if let Some(label_id) = schema.label_id_by_name(label) {
3304 out.entry(label_id).or_default().push((
3305 vid,
3306 all_labels.to_vec(),
3307 props.clone(),
3308 deleted,
3309 version,
3310 ));
3311 }
3312 }
3313 };
3314
3315 for (vid, props) in &old_l0.vertex_properties {
3316 let version = old_l0.vertex_versions.get(vid).copied().unwrap_or(0);
3317 if let Some(&ts) = old_l0.vertex_created_at.get(vid) {
3319 vertex_created_at.insert(*vid, ts);
3320 }
3321 if let Some(&ts) = old_l0.vertex_updated_at.get(vid) {
3322 vertex_updated_at.insert(*vid, ts);
3323 }
3324 if let Some(labels) = old_l0.vertex_labels.get(vid) {
3325 let is_partial = self.config.partial_lance_writes
3332 && old_l0.vertex_partial_keys.contains_key(vid);
3333 if is_partial {
3334 if let Some(touched) = old_l0.vertex_partial_keys.get(vid) {
3335 for label in labels {
3336 if let Some(label_id) = schema.label_id_by_name(label) {
3337 partial_by_label.entry(label_id).or_default().push((
3338 *vid,
3339 props.clone(),
3340 version,
3341 touched.clone(),
3342 ));
3343 }
3344 }
3345 }
3346 } else {
3347 push_vertex_to_labels(
3348 *vid,
3349 labels,
3350 props.clone(),
3351 false,
3352 version,
3353 &mut vertices_by_label,
3354 );
3355 }
3356 }
3357 }
3358 for &vid in &old_l0.vertex_tombstones {
3359 let version = old_l0.vertex_versions.get(&vid).copied().unwrap_or(0);
3360 if let Some(&ts) = old_l0.vertex_updated_at.get(&vid) {
3361 vertex_updated_at.insert(vid, ts);
3362 }
3363 if let Some(labels) = old_l0.vertex_labels.get(&vid) {
3364 for label in labels {
3370 if let Some(label_id) = schema.label_id_by_name(label) {
3371 tombstones_by_label
3372 .entry(label_id)
3373 .or_default()
3374 .push((vid, version));
3375 }
3376 }
3377 } else {
3378 orphaned_tombstones.push((vid, version));
3380 }
3381 }
3382 } if !orphaned_tombstones.is_empty() {
3386 tracing::warn!(
3387 count = orphaned_tombstones.len(),
3388 "Tombstones missing labels in L0, querying storage as fallback"
3389 );
3390 for (vid, version) in orphaned_tombstones {
3391 if let Ok(Some(labels)) = self.find_vertex_labels_in_storage(vid).await
3392 && !labels.is_empty()
3393 {
3394 for label in &labels {
3395 if let Some(label_id) = schema.label_id_by_name(label) {
3396 tombstones_by_label
3398 .entry(label_id)
3399 .or_default()
3400 .push((vid, version));
3401 }
3402 }
3403 }
3404 }
3405 }
3406
3407 let mut manifest = if let Some(cached) = self.cached_manifest.lock().clone() {
3416 cached
3417 } else {
3418 self.storage
3419 .snapshot_manager()
3420 .load_latest_snapshot()
3421 .await?
3422 .unwrap_or_else(|| {
3423 SnapshotManifest::new(Uuid::new_v4().to_string(), schema.schema_version)
3424 })
3425 };
3426
3427 let parent_id = manifest.snapshot_id.clone();
3430 manifest.parent_snapshot = Some(parent_id);
3431 manifest.snapshot_id = Uuid::new_v4().to_string();
3432 manifest.name = name;
3433 manifest.created_at = Utc::now();
3434 manifest.version_high_water_mark = current_version;
3435 manifest.wal_high_water_mark = wal_lsn;
3436 let snapshot_id = manifest.snapshot_id.clone();
3437
3438 tracing::Span::current().record("snapshot_id", &snapshot_id);
3439
3440 let (main_edges, edge_created_at_map, edge_updated_at_map) = {
3448 let _old_l0 = old_l0_arc.read();
3449 let mut main_edges: Vec<(
3450 uni_common::core::id::Eid,
3451 Vid,
3452 Vid,
3453 String,
3454 Properties,
3455 bool,
3456 u64,
3457 )> = Vec::new();
3458 let mut edge_created_at_map: HashMap<uni_common::core::id::Eid, i64> = HashMap::new();
3459 let mut edge_updated_at_map: HashMap<uni_common::core::id::Eid, i64> = HashMap::new();
3460
3461 for (&edge_type_id, entries) in entries_by_type.iter() {
3462 for entry in entries {
3463 let edge_type_name = self
3464 .storage
3465 .schema_manager()
3466 .edge_type_name_by_id_unified(edge_type_id)
3467 .unwrap_or_else(|| "unknown".to_string());
3468
3469 let deleted = matches!(entry.op, Op::Delete);
3470 main_edges.push((
3471 entry.eid,
3472 entry.src_vid,
3473 entry.dst_vid,
3474 edge_type_name,
3475 entry.properties.clone(),
3476 deleted,
3477 entry.version,
3478 ));
3479
3480 if let Some(ts) = entry.created_at {
3481 edge_created_at_map.insert(entry.eid, ts);
3482 }
3483 if let Some(ts) = entry.updated_at {
3484 edge_updated_at_map.insert(entry.eid, ts);
3485 }
3486 }
3487 }
3488
3489 (main_edges, edge_created_at_map, edge_updated_at_map)
3490 };
3491
3492 if !main_edges.is_empty() {
3493 let main_edge_batch = MainEdgeDataset::build_record_batch(
3494 &main_edges,
3495 Some(&edge_created_at_map),
3496 Some(&edge_updated_at_map),
3497 )?;
3498 MainEdgeDataset::write_batch(self.storage.backend(), main_edge_batch).await?;
3499 MainEdgeDataset::ensure_default_indexes(self.storage.backend()).await?;
3500 }
3501
3502 let main_vertices: Vec<(Vid, Vec<String>, Properties, bool, u64)> = {
3504 let old_l0 = old_l0_arc.read();
3505 let mut vertices = Vec::new();
3506
3507 for (vid, props) in &old_l0.vertex_properties {
3514 let version = old_l0.vertex_versions.get(vid).copied().unwrap_or(0);
3515 let labels = old_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
3516 vertices.push((*vid, labels, props.clone(), false, version));
3517 }
3518
3519 for &vid in &old_l0.vertex_tombstones {
3522 let version = old_l0.vertex_versions.get(&vid).copied().unwrap_or(0);
3523 main_vertex_tombstones.push((vid, version));
3524 }
3525
3526 vertices
3527 };
3528
3529 if !main_vertices.is_empty() {
3530 let main_vertex_batch = MainVertexDataset::build_record_batch(
3531 &main_vertices,
3532 Some(&vertex_created_at),
3533 Some(&vertex_updated_at),
3534 )?;
3535 MainVertexDataset::write_batch(self.storage.backend(), main_vertex_batch).await?;
3536 }
3537 if !main_vertex_tombstones.is_empty() {
3540 let tomb_batch = MainVertexDataset::build_tombstone_partial_batch(
3541 &main_vertex_tombstones,
3542 Some(&vertex_updated_at),
3543 )?;
3544 MainVertexDataset::merge_insert_tombstone_batch(self.storage.backend(), tomb_batch)
3545 .await?;
3546 }
3547 if !main_vertices.is_empty() || !main_vertex_tombstones.is_empty() {
3548 MainVertexDataset::ensure_default_indexes(self.storage.backend()).await?;
3549 }
3550
3551 for (&edge_type_id, entries) in entries_by_type.iter() {
3553 let edge_type_name = self
3555 .storage
3556 .schema_manager()
3557 .edge_type_name_by_id_unified(edge_type_id)
3558 .ok_or_else(|| anyhow!("Edge type ID {} not found", edge_type_id))?;
3559
3560 let partial_eids: std::collections::HashSet<Eid> = {
3570 let old_l0 = old_l0_arc.read();
3571 entries
3572 .iter()
3573 .filter(|e| {
3574 self.config.partial_lance_writes
3575 && old_l0.edge_partial_keys.contains_key(&e.eid)
3576 })
3577 .map(|e| e.eid)
3578 .collect()
3579 };
3580 let touched_union_by_eid: HashMap<Eid, std::collections::HashSet<String>> = {
3581 let old_l0 = old_l0_arc.read();
3582 partial_eids
3583 .iter()
3584 .filter_map(|eid| old_l0.edge_partial_keys.get(eid).map(|s| (*eid, s.clone())))
3585 .collect()
3586 };
3587 let (full_entries, partial_entries): (Vec<L1Entry>, Vec<L1Entry>) = entries
3588 .clone()
3589 .into_iter()
3590 .partition(|e| !partial_eids.contains(&e.eid));
3591
3592 let backend = self.storage.backend();
3593
3594 let mut fwd_full = full_entries.clone();
3596 fwd_full.sort_by_key(|e| e.src_vid);
3597 let mut fwd_partial = partial_entries.clone();
3598 fwd_partial.sort_by_key(|e| e.src_vid);
3599 let fwd_ds = self.storage.delta_dataset(&edge_type_name, "fwd")?;
3600 if !fwd_full.is_empty() {
3601 let fwd_batch = fwd_ds.build_record_batch(&fwd_full, &schema)?;
3602 fwd_ds.write_run(backend, fwd_batch).await?;
3603 }
3604 if !fwd_partial.is_empty() {
3605 let touched_union: std::collections::HashSet<String> = fwd_partial
3606 .iter()
3607 .flat_map(|e| {
3608 touched_union_by_eid
3609 .get(&e.eid)
3610 .cloned()
3611 .unwrap_or_default()
3612 .into_iter()
3613 })
3614 .collect();
3615 let fwd_partial_batch =
3616 fwd_ds.build_partial_record_batch(&fwd_partial, &touched_union, &schema)?;
3617 fwd_ds
3618 .merge_insert_partial_run(backend, fwd_partial_batch)
3619 .await?;
3620 }
3621 fwd_ds.ensure_eid_index(backend).await?;
3622
3623 let mut bwd_full = full_entries.clone();
3625 bwd_full.sort_by_key(|e| e.dst_vid);
3626 let mut bwd_partial = partial_entries.clone();
3627 bwd_partial.sort_by_key(|e| e.dst_vid);
3628 let bwd_ds = self.storage.delta_dataset(&edge_type_name, "bwd")?;
3629 if !bwd_full.is_empty() {
3630 let bwd_batch = bwd_ds.build_record_batch(&bwd_full, &schema)?;
3631 bwd_ds.write_run(backend, bwd_batch).await?;
3632 }
3633 if !bwd_partial.is_empty() {
3634 let touched_union: std::collections::HashSet<String> = bwd_partial
3635 .iter()
3636 .flat_map(|e| {
3637 touched_union_by_eid
3638 .get(&e.eid)
3639 .cloned()
3640 .unwrap_or_default()
3641 .into_iter()
3642 })
3643 .collect();
3644 let bwd_partial_batch =
3645 bwd_ds.build_partial_record_batch(&bwd_partial, &touched_union, &schema)?;
3646 bwd_ds
3647 .merge_insert_partial_run(backend, bwd_partial_batch)
3648 .await?;
3649 }
3650 bwd_ds.ensure_eid_index(backend).await?;
3651
3652 let current_snap =
3654 manifest
3655 .edges
3656 .entry(edge_type_name.to_string())
3657 .or_insert(EdgeSnapshot {
3658 version: 0,
3659 count: 0,
3660 lance_version: 0,
3661 });
3662 current_snap.version += 1;
3663 current_snap.count += entries.len() as u64;
3664 current_snap.lance_version = 0;
3666
3667 }
3670
3671 let all_label_ids: std::collections::HashSet<u16> = vertices_by_label
3676 .keys()
3677 .chain(partial_by_label.keys())
3678 .chain(tombstones_by_label.keys())
3679 .copied()
3680 .collect();
3681 for label_id in all_label_ids {
3682 let vertices = vertices_by_label.remove(&label_id).unwrap_or_default();
3683 let label_name = schema
3684 .label_name_by_id(label_id)
3685 .ok_or_else(|| anyhow!("Label ID {} not found", label_id))?;
3686
3687 let ds = self.storage.vertex_dataset(label_name)?;
3688
3689 type InvertedUpdateMap = HashMap<String, (HashMap<Vid, Vec<String>>, HashSet<Vid>)>;
3692 let mut inverted_updates: InvertedUpdateMap = HashMap::new();
3693
3694 for idx in &schema.indexes {
3695 if let IndexDefinition::Inverted(cfg) = idx
3696 && cfg.label == label_name
3697 {
3698 let mut added: HashMap<Vid, Vec<String>> = HashMap::new();
3699 let mut removed: HashSet<Vid> = HashSet::new();
3700
3701 for (vid, _labels, props, deleted, _version) in &vertices {
3702 if *deleted {
3703 removed.insert(*vid);
3704 } else if let Some(prop_value) = props.get(&cfg.property) {
3705 if let Some(arr) = prop_value.as_array() {
3707 let terms: Vec<String> = arr
3708 .iter()
3709 .filter_map(|v| v.as_str().map(ToString::to_string))
3710 .collect();
3711 if !terms.is_empty() {
3712 added.insert(*vid, terms);
3713 }
3714 }
3715 }
3716 }
3717 if let Some(tomb_rows) = tombstones_by_label.get(&label_id) {
3721 for (vid, _) in tomb_rows {
3722 removed.insert(*vid);
3723 }
3724 }
3725
3726 if !added.is_empty() || !removed.is_empty() {
3727 inverted_updates.insert(cfg.property.clone(), (added, removed));
3728 }
3729 }
3730 }
3731
3732 let mut v_data = Vec::new();
3733 let mut d_data = Vec::new();
3734 let mut ver_data = Vec::new();
3735 for (vid, labels, props, deleted, version) in vertices {
3736 v_data.push((vid, labels, props));
3737 d_data.push(deleted);
3738 ver_data.push(version);
3739 }
3740
3741 let backend = self.storage.backend();
3742
3743 if !v_data.is_empty() {
3746 let batch = ds.build_record_batch_with_timestamps(
3747 &v_data,
3748 &d_data,
3749 &ver_data,
3750 &schema,
3751 Some(&vertex_created_at),
3752 Some(&vertex_updated_at),
3753 )?;
3754 ds.write_batch(backend, batch, &schema).await?;
3755 }
3756
3757 if let Some(partial_rows) = partial_by_label.remove(&label_id)
3762 && !partial_rows.is_empty()
3763 {
3764 let touched_union: std::collections::HashSet<String> = partial_rows
3765 .iter()
3766 .flat_map(|(_, _, _, keys)| keys.iter().cloned())
3767 .collect();
3768 let pairs: Vec<(Vid, Properties)> = partial_rows
3769 .iter()
3770 .map(|(vid, props, _, _)| (*vid, props.clone()))
3771 .collect();
3772 let versions: Vec<u64> = partial_rows.iter().map(|(_, _, v, _)| *v).collect();
3773 let partial_batch = ds.build_partial_record_batch(
3774 &pairs,
3775 &versions,
3776 &touched_union,
3777 &schema,
3778 Some(&vertex_updated_at),
3779 )?;
3780 if partial_batch.num_rows() > 0 {
3781 ds.merge_insert_batch(backend, partial_batch).await?;
3782 }
3783 }
3784
3785 let tombstone_rows = tombstones_by_label.remove(&label_id).unwrap_or_default();
3791 if !tombstone_rows.is_empty() {
3792 let tomb_batch =
3793 ds.build_tombstone_partial_batch(&tombstone_rows, Some(&vertex_updated_at))?;
3794 if tomb_batch.num_rows() > 0 {
3795 ds.merge_insert_batch(backend, tomb_batch).await?;
3796 }
3797 }
3798
3799 ds.ensure_default_indexes(backend).await?;
3800
3801 for ((vid, labels, _props), &deleted) in v_data.iter().zip(d_data.iter()) {
3805 if deleted {
3806 self.storage.remove_from_vid_labels_index(*vid);
3807 } else {
3808 self.storage.update_vid_labels_index(*vid, labels.clone());
3809 }
3810 }
3811 for (vid, _) in &tombstone_rows {
3812 self.storage.remove_from_vid_labels_index(*vid);
3813 }
3814
3815 let current_snap =
3817 manifest
3818 .vertices
3819 .entry(label_name.to_string())
3820 .or_insert(LabelSnapshot {
3821 version: 0,
3822 count: 0,
3823 lance_version: 0,
3824 });
3825 current_snap.version += 1;
3826 current_snap.count += v_data.len() as u64;
3827 current_snap.lance_version = 0;
3829
3830 self.storage.invalidate_table_cache(label_name);
3832
3833 #[cfg(feature = "lance-backend")]
3835 for idx in &schema.indexes {
3836 if let IndexDefinition::Inverted(cfg) = idx
3837 && cfg.label == label_name
3838 && let Some((added, removed)) = inverted_updates.get(&cfg.property)
3839 {
3840 self.storage
3841 .index_manager()
3842 .update_inverted_index_incremental(cfg, added, removed)
3843 .await?;
3844 }
3845 }
3846
3847 #[cfg(feature = "lance-backend")]
3850 {
3851 let mut uid_mappings: Vec<(uni_common::core::id::UniId, Vid)> = Vec::new();
3852 for (vid, _labels, props) in &v_data {
3853 let ext_id = props.get("ext_id").and_then(|v| v.as_str());
3854 let uid = crate::storage::vertex::VertexDataset::compute_vertex_uid(
3855 label_name, ext_id, props,
3856 );
3857 uid_mappings.push((uid, *vid));
3858 }
3859
3860 if !uid_mappings.is_empty()
3861 && let Ok(uid_index) = self.storage.uid_index(label_name)
3862 {
3863 uid_index.write_mapping(&uid_mappings).await?;
3864 }
3865 }
3866 }
3867 Ok(FlushOutcome {
3868 manifest,
3869 snapshot_id,
3870 })
3871 }
3872
3873 #[instrument(
3879 skip(self),
3880 fields(snapshot_id, mutations_count, size_bytes),
3881 level = "info"
3882 )]
3883 async fn flush_inline_under_lock(&self, name: Option<String>) -> Result<String> {
3884 let start = std::time::Instant::now();
3885
3886 let (initial_size, initial_count) = {
3887 let l0_arc = self.l0_manager.get_current();
3888 let l0 = l0_arc.read();
3889 (l0.estimated_size, l0.mutation_count)
3890 };
3891 tracing::Span::current().record("size_bytes", initial_size);
3892 tracing::Span::current().record("mutations_count", initial_count);
3893
3894 debug!("Starting L0 flush to L1");
3895
3896 let RotateOutput {
3900 old_l0_arc,
3901 wal_lsn,
3902 current_version,
3903 flush_in_progress_guard: _flush_guard,
3904 } = self.flush_l0_rotate().await?;
3905
3906 let FlushOutcome {
3909 manifest,
3910 snapshot_id,
3911 } = self
3912 .flush_stream_l1(old_l0_arc.clone(), wal_lsn, current_version, name)
3913 .await?;
3914
3915 self.flush_finalize_locked(
3919 old_l0_arc,
3920 wal_lsn,
3921 manifest,
3922 snapshot_id,
3923 initial_size,
3924 initial_count,
3925 start,
3926 )
3927 .await
3928 }
3929
3930 #[allow(clippy::too_many_arguments)]
3935 async fn flush_finalize_locked(
3936 &self,
3937 old_l0_arc: Arc<RwLock<L0Buffer>>,
3938 wal_lsn: u64,
3939 manifest: SnapshotManifest,
3940 snapshot_id: String,
3941 initial_size: usize,
3942 initial_count: usize,
3943 start: std::time::Instant,
3944 ) -> Result<String> {
3945 Self::flush_finalize_body(
3946 &self.shared_ctx(),
3947 old_l0_arc,
3948 wal_lsn,
3949 manifest,
3950 snapshot_id,
3951 initial_size,
3952 initial_count,
3953 start,
3954 )
3955 .await
3956 }
3957
3958 #[allow(clippy::too_many_arguments)]
3964 pub(crate) async fn flush_finalize_now(
3965 shared: SharedFlushCtx,
3966 old_l0_arc: Arc<RwLock<L0Buffer>>,
3967 wal_lsn: u64,
3968 manifest: SnapshotManifest,
3969 snapshot_id: String,
3970 initial_size: usize,
3971 initial_count: usize,
3972 start: std::time::Instant,
3973 ) -> Result<String> {
3974 let _flush_lock_guard = shared.flush_lock.clone().lock_owned().await;
3975 Self::flush_finalize_body(
3976 &shared,
3977 old_l0_arc,
3978 wal_lsn,
3979 manifest,
3980 snapshot_id,
3981 initial_size,
3982 initial_count,
3983 start,
3984 )
3985 .await
3986 }
3987
3988 #[allow(clippy::too_many_arguments)]
3992 async fn flush_finalize_body(
3993 shared: &SharedFlushCtx,
3994 old_l0_arc: Arc<RwLock<L0Buffer>>,
3995 wal_lsn: u64,
3996 mut manifest: SnapshotManifest,
3997 snapshot_id: String,
3998 initial_size: usize,
3999 initial_count: usize,
4000 start: std::time::Instant,
4001 ) -> Result<String> {
4002 let current_parent_id = shared
4009 .cached_manifest
4010 .lock()
4011 .as_ref()
4012 .map(|m| m.snapshot_id.clone());
4013 if current_parent_id.is_some() && manifest.parent_snapshot != current_parent_id {
4014 manifest.parent_snapshot = current_parent_id;
4015 metrics::counter!("uni_flush_parent_chain_fixups_total").increment(1);
4016 }
4017
4018 shared
4021 .storage
4022 .snapshot_manager()
4023 .save_snapshot(&manifest)
4024 .await?;
4025 shared
4026 .storage
4027 .snapshot_manager()
4028 .set_latest_snapshot(&manifest.snapshot_id)
4029 .await?;
4030
4031 *shared.cached_manifest.lock() = Some(manifest.clone());
4033
4034 shared.l0_manager.complete_flush(&old_l0_arc);
4037
4038 let wal_handle = shared.l0_manager.get_current().read().wal.clone();
4040 if let Some(w) = wal_handle {
4041 let safe_lsn = shared
4042 .l0_manager
4043 .min_pending_wal_lsn()
4044 .map(|min_pending| min_pending.min(wal_lsn))
4045 .unwrap_or(wal_lsn);
4046 w.truncate_before(safe_lsn).await?;
4047 }
4048
4049 if let Some(ref pm) = shared.property_manager {
4051 pm.clear_cache().await;
4052 }
4053
4054 *shared.last_flush_time.lock() = std::time::Instant::now();
4056
4057 info!(
4058 snapshot_id,
4059 mutations_count = initial_count,
4060 size_bytes = initial_size,
4061 "L0 flush to L1 completed successfully"
4062 );
4063 metrics::histogram!("uni_flush_duration_seconds").record(start.elapsed().as_secs_f64());
4064 metrics::counter!("uni_flush_bytes_total").increment(initial_size as u64);
4065 metrics::counter!("uni_flush_rows_total").increment(initial_count as u64);
4066
4067 {
4069 let mut status = uni_common::sync::acquire_mutex(
4070 &shared.storage.compaction_status,
4071 "compaction_status",
4072 )?;
4073 status.l1_runs += 1;
4074 }
4075
4076 let am = shared.adjacency_manager.clone();
4078 if am.should_compact(shared.compaction_config.frozen_segments_compact_threshold) {
4079 let previous_still_running = {
4080 let guard = shared.compaction_handle.read();
4081 guard.as_ref().is_some_and(|h| !h.is_finished())
4082 };
4083 if previous_still_running {
4084 info!("Skipping compaction: previous compaction still in progress");
4085 } else {
4086 let handle = tokio::spawn(async move {
4087 am.compact();
4088 });
4089 *shared.compaction_handle.write() = Some(handle);
4090 }
4091 }
4092
4093 if shared.auto_rebuild_enabled
4095 && let Some(rebuild_mgr) = shared.index_rebuild_manager.get()
4096 {
4097 Self::schedule_index_rebuilds_if_needed_static(
4098 &manifest,
4099 rebuild_mgr.clone(),
4100 shared.schema_manager.clone(),
4101 shared.index_rebuild_config.clone(),
4102 );
4103 }
4104
4105 Self::tick_fork_fragment_observability_static(
4107 shared.fork_id,
4108 shared.fork_flush_count.clone(),
4109 shared.fork_fragment_warn_fired.clone(),
4110 shared.fork_fragment_warn_threshold,
4111 );
4112
4113 Ok(snapshot_id)
4114 }
4115
4116 #[allow(dead_code)] pub(crate) fn tick_fork_fragment_observability(&self) {
4130 Self::tick_fork_fragment_observability_static(
4131 self.fork_id,
4132 self.fork_flush_count.clone(),
4133 self.fork_fragment_warn_fired.clone(),
4134 self.config.fork_fragment_warn_threshold,
4135 );
4136 }
4137
4138 pub(crate) fn tick_fork_fragment_observability_static(
4142 fork_id: Option<ForkId>,
4143 fork_flush_count: Arc<AtomicU64>,
4144 fork_fragment_warn_fired: Arc<AtomicBool>,
4145 warn_threshold: usize,
4146 ) {
4147 let Some(fork_id) = fork_id else { return };
4148 let new_count = fork_flush_count.fetch_add(1, Ordering::Relaxed) + 1;
4150 let fork_label = fork_id.to_string();
4151 metrics::gauge!(
4152 "uni_fork_l1_flushes",
4153 "fork" => fork_label.clone(),
4154 )
4155 .set(new_count as f64);
4156 let threshold = warn_threshold as u64;
4157 if !fork_fragment_warn_fired.load(Ordering::Relaxed)
4158 && threshold > 0
4159 && new_count >= threshold
4160 {
4161 fork_fragment_warn_fired.store(true, Ordering::Relaxed);
4162 tracing::warn!(
4163 fork = %fork_label,
4164 flush_count = new_count,
4165 threshold,
4166 "fork has exceeded the L1 flush-count threshold; \
4167 fork compaction is deferred to Phase 5 — consider \
4168 drop+recreate or promotion to bound fragment growth"
4169 );
4170 }
4171 }
4172
4173 #[allow(dead_code)] fn schedule_index_rebuilds_if_needed(
4179 &self,
4180 manifest: &SnapshotManifest,
4181 rebuild_mgr: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
4182 ) {
4183 Self::schedule_index_rebuilds_if_needed_static(
4184 manifest,
4185 rebuild_mgr,
4186 self.schema_manager.clone(),
4187 self.config.index_rebuild.clone(),
4188 );
4189 }
4190
4191 pub(crate) fn schedule_index_rebuilds_if_needed_static(
4195 manifest: &SnapshotManifest,
4196 rebuild_mgr: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
4197 schema_manager: Arc<uni_common::core::schema::SchemaManager>,
4198 index_rebuild_config: uni_common::config::IndexRebuildConfig,
4199 ) {
4200 let checker =
4201 crate::storage::index_rebuild::RebuildTriggerChecker::new(index_rebuild_config);
4202 let schema = schema_manager.schema();
4203 let labels = checker.labels_needing_rebuild(manifest, &schema.indexes);
4204
4205 if labels.is_empty() {
4206 return;
4207 }
4208
4209 for label in &labels {
4211 for idx in &schema.indexes {
4212 if idx.label() == label {
4213 let _ = schema_manager.update_index_metadata(idx.name(), |m| {
4214 m.status = uni_common::core::schema::IndexStatus::Stale;
4215 });
4216 }
4217 }
4218 }
4219
4220 tokio::spawn(async move {
4221 if let Err(e) = rebuild_mgr.schedule(labels).await {
4222 tracing::warn!("Failed to schedule index rebuild: {e}");
4223 }
4224 });
4225 }
4226}
4227
4228pub(crate) struct WriterFinalizer;
4234
4235impl FinalizeFn for WriterFinalizer {
4236 fn finalize<'a>(
4237 &'a self,
4238 rotated: RotatedFlush,
4239 outcome: AsyncFlushOutcome,
4240 shared: SharedFlushCtx,
4241 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<String>> + Send + 'a>> {
4242 Box::pin(async move {
4243 let (initial_size, initial_count) = {
4248 let l0 = rotated.old_l0_arc.read();
4249 (l0.estimated_size, l0.mutation_count)
4250 };
4251 let result = Writer::flush_finalize_now(
4252 shared,
4253 rotated.old_l0_arc.clone(),
4254 rotated.wal_lsn,
4255 outcome.new_manifest,
4256 outcome.snapshot_id,
4257 initial_size,
4258 initial_count,
4259 std::time::Instant::now(),
4260 )
4261 .await;
4262 drop(rotated.permit);
4264 result
4265 })
4266 }
4267
4268 fn finalize_failure<'a>(
4269 &'a self,
4270 rotated: RotatedFlush,
4271 err: anyhow::Error,
4272 _shared: SharedFlushCtx,
4273 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Error> + Send + 'a>> {
4274 Box::pin(async move {
4275 tracing::warn!(
4276 error = %err,
4277 seq = rotated.seq,
4278 "async flush stream failed; old L0 remains in pending_flush, \
4279 WAL retains its data, recovery via WAL replay on restart"
4280 );
4281 metrics::counter!("uni_flush_failures_total").increment(1);
4282 drop(rotated.permit);
4285 err
4286 })
4287 }
4288}
4289
4290#[cfg(test)]
4291mod tests {
4292 use super::*;
4293 use tempfile::tempdir;
4294
4295 #[tokio::test]
4298 async fn test_commit_transaction_wal_before_merge() -> Result<()> {
4299 use crate::runtime::wal::WriteAheadLog;
4300 use crate::storage::manager::StorageManager;
4301 use object_store::local::LocalFileSystem;
4302 use object_store::path::Path as ObjectStorePath;
4303 use uni_common::core::schema::SchemaManager;
4304
4305 let dir = tempdir()?;
4306 let path = dir.path().to_str().unwrap();
4307 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
4308 let schema_path = ObjectStorePath::from("schema.json");
4309
4310 let schema_manager =
4311 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
4312 let _label_id = schema_manager.add_label("Test")?;
4313 schema_manager.save().await?;
4314
4315 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
4316
4317 let wal_path = ObjectStorePath::from("wal");
4319 let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
4320
4321 let writer = Writer::new_with_config(
4322 storage.clone(),
4323 schema_manager.clone(),
4324 1,
4325 UniConfig::default(),
4326 Some(wal),
4327 None,
4328 )
4329 .await?;
4330
4331 let tx_l0 = writer.create_transaction_l0();
4333
4334 let vid_a = writer.next_vid().await?;
4336 let vid_b = writer.next_vid().await?;
4337
4338 let mut props = std::collections::HashMap::new();
4339 props.insert("test".to_string(), Value::String("data".to_string()));
4340
4341 writer
4342 .insert_vertex_with_labels(vid_a, props.clone(), &["Test".to_string()], Some(&tx_l0))
4343 .await?;
4344 writer
4345 .insert_vertex_with_labels(
4346 vid_b,
4347 std::collections::HashMap::new(),
4348 &["Test".to_string()],
4349 Some(&tx_l0),
4350 )
4351 .await?;
4352
4353 let eid = writer.next_eid(1).await?;
4354 writer
4355 .insert_edge(
4356 vid_a,
4357 vid_b,
4358 1,
4359 eid,
4360 std::collections::HashMap::new(),
4361 None,
4362 Some(&tx_l0),
4363 )
4364 .await?;
4365
4366 let l0 = writer.l0_manager.get_current();
4368 let wal = l0.read().wal.clone().expect("Main L0 should have WAL");
4369 let mutations_before = wal.replay().await?;
4370 let count_before = mutations_before.len();
4371
4372 let writer = Arc::new(writer);
4374 writer.commit_transaction_l0(tx_l0).await?;
4375
4376 let mutations_after = wal.replay().await?;
4378 assert!(
4379 mutations_after.len() > count_before,
4380 "WAL should contain transaction mutations after commit"
4381 );
4382
4383 let new_mutations: Vec<_> = mutations_after.into_iter().skip(count_before).collect();
4385
4386 let mut saw_vertex_a = false;
4387 let mut saw_vertex_b = false;
4388 let mut saw_edge = false;
4389
4390 for mutation in &new_mutations {
4391 match mutation {
4392 crate::runtime::wal::Mutation::InsertVertex { vid, .. } => {
4393 if *vid == vid_a {
4394 saw_vertex_a = true;
4395 }
4396 if *vid == vid_b {
4397 saw_vertex_b = true;
4398 }
4399 assert!(!saw_edge, "Vertices should be logged to WAL before edges");
4401 }
4402 crate::runtime::wal::Mutation::InsertEdge { eid: e, .. } => {
4403 if *e == eid {
4404 saw_edge = true;
4405 }
4406 assert!(
4408 saw_vertex_a && saw_vertex_b,
4409 "Edge should be logged after both vertices"
4410 );
4411 }
4412 _ => {}
4413 }
4414 }
4415
4416 assert!(saw_vertex_a, "Vertex A should be in WAL");
4417 assert!(saw_vertex_b, "Vertex B should be in WAL");
4418 assert!(saw_edge, "Edge should be in WAL");
4419
4420 let l0_read = l0.read();
4422 assert!(
4423 l0_read.vertex_properties.contains_key(&vid_a),
4424 "Vertex A should be in main L0"
4425 );
4426 assert!(
4427 l0_read.vertex_properties.contains_key(&vid_b),
4428 "Vertex B should be in main L0"
4429 );
4430 assert!(
4431 l0_read.edge_endpoints.contains_key(&eid),
4432 "Edge should be in main L0"
4433 );
4434
4435 Ok(())
4436 }
4437
4438 #[tokio::test]
4440 async fn test_commit_transaction_wal_failure_rollback() -> Result<()> {
4441 use crate::runtime::wal::WriteAheadLog;
4442 use crate::storage::manager::StorageManager;
4443 use object_store::local::LocalFileSystem;
4444 use object_store::path::Path as ObjectStorePath;
4445 use uni_common::core::schema::SchemaManager;
4446
4447 let dir = tempdir()?;
4448 let path = dir.path().to_str().unwrap();
4449 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
4450 let schema_path = ObjectStorePath::from("schema.json");
4451
4452 let schema_manager =
4453 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
4454 let _label_id = schema_manager.add_label("Test")?;
4455 let _baseline_label_id = schema_manager.add_label("Baseline")?;
4456 let _txdata_label_id = schema_manager.add_label("TxData")?;
4457 schema_manager.save().await?;
4458
4459 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
4460
4461 let wal_path = ObjectStorePath::from("wal");
4463 let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
4464
4465 let writer = Writer::new_with_config(
4466 storage.clone(),
4467 schema_manager.clone(),
4468 1,
4469 UniConfig::default(),
4470 Some(wal),
4471 None,
4472 )
4473 .await?;
4474
4475 let baseline_vid = writer.next_vid().await?;
4477 writer
4478 .insert_vertex_with_labels(
4479 baseline_vid,
4480 [("baseline".to_string(), Value::Bool(true))]
4481 .into_iter()
4482 .collect(),
4483 &["Baseline".to_string()],
4484 None,
4485 )
4486 .await?;
4487
4488 let tx_l0 = writer.create_transaction_l0();
4490
4491 let tx_vid = writer.next_vid().await?;
4493 writer
4494 .insert_vertex_with_labels(
4495 tx_vid,
4496 [("tx_data".to_string(), Value::Bool(true))]
4497 .into_iter()
4498 .collect(),
4499 &["TxData".to_string()],
4500 Some(&tx_l0),
4501 )
4502 .await?;
4503
4504 let l0 = writer.l0_manager.get_current();
4506 let vertex_count_before = l0.read().vertex_properties.len();
4507
4508 drop(tx_l0);
4510
4511 let vertex_count_after = l0.read().vertex_properties.len();
4513 assert_eq!(
4514 vertex_count_before, vertex_count_after,
4515 "Main L0 should not change after rollback"
4516 );
4517
4518 assert!(
4520 l0.read().vertex_properties.contains_key(&baseline_vid),
4521 "Baseline data should remain"
4522 );
4523
4524 assert!(
4526 !l0.read().vertex_properties.contains_key(&tx_vid),
4527 "Transaction data should not be in main L0 after rollback"
4528 );
4529
4530 Ok(())
4531 }
4532
4533 #[tokio::test]
4536 async fn test_batch_insert_shared_labels() -> Result<()> {
4537 use crate::storage::manager::StorageManager;
4538 use object_store::local::LocalFileSystem;
4539 use object_store::path::Path as ObjectStorePath;
4540 use uni_common::core::schema::SchemaManager;
4541
4542 let dir = tempdir()?;
4543 let path = dir.path().to_str().unwrap();
4544 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
4545 let schema_path = ObjectStorePath::from("schema.json");
4546
4547 let schema_manager =
4548 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
4549 let _label_id = schema_manager.add_label("Person")?;
4550 schema_manager.save().await?;
4551
4552 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
4553
4554 let writer = Writer::new(storage.clone(), schema_manager.clone(), 1).await?;
4555
4556 let labels = &["Person".to_string()];
4558
4559 let mut vids = Vec::new();
4561 for i in 0..100 {
4562 let vid = writer.next_vid().await?;
4563 let mut props = std::collections::HashMap::new();
4564 props.insert("id".to_string(), Value::Int(i));
4565 writer
4566 .insert_vertex_with_labels(vid, props, labels, None)
4567 .await?;
4568 vids.push(vid);
4569 }
4570
4571 let l0 = writer.l0_manager.get_current();
4573 for vid in vids {
4574 let l0_guard = l0.read();
4575 let vertex_labels = l0_guard.vertex_labels.get(&vid);
4576 assert!(vertex_labels.is_some(), "Vertex should have labels");
4577 assert_eq!(
4578 vertex_labels.unwrap(),
4579 &vec!["Person".to_string()],
4580 "Labels should match"
4581 );
4582 }
4583
4584 Ok(())
4585 }
4586
4587 #[tokio::test]
4590 async fn test_estimated_size_tracks_mutations() -> Result<()> {
4591 use crate::storage::manager::StorageManager;
4592 use object_store::local::LocalFileSystem;
4593 use object_store::path::Path as ObjectStorePath;
4594 use uni_common::core::schema::SchemaManager;
4595
4596 let dir = tempdir()?;
4597 let path = dir.path().to_str().unwrap();
4598 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
4599 let schema_path = ObjectStorePath::from("schema.json");
4600
4601 let schema_manager =
4602 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
4603 let _label_id = schema_manager.add_label("Test")?;
4604 schema_manager.save().await?;
4605
4606 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
4607
4608 let writer = Writer::new(storage.clone(), schema_manager.clone(), 1).await?;
4609
4610 let l0 = writer.l0_manager.get_current();
4611
4612 let initial_estimated = l0.read().estimated_size;
4614 let initial_actual = l0.read().size_bytes();
4615 assert_eq!(initial_estimated, 0, "Initial estimated_size should be 0");
4616 assert_eq!(initial_actual, 0, "Initial size_bytes should be 0");
4617
4618 let mut vids = Vec::new();
4620 for i in 0..10 {
4621 let vid = writer.next_vid().await?;
4622 let mut props = std::collections::HashMap::new();
4623 props.insert("name".to_string(), Value::String(format!("vertex_{}", i)));
4624 props.insert("index".to_string(), Value::Int(i));
4625 writer
4626 .insert_vertex_with_labels(vid, props, &[], None)
4627 .await?;
4628 vids.push(vid);
4629 }
4630
4631 let after_vertices_estimated = l0.read().estimated_size;
4633 let after_vertices_actual = l0.read().size_bytes();
4634 assert!(
4635 after_vertices_estimated > 0,
4636 "estimated_size should grow after insertions"
4637 );
4638
4639 let ratio = after_vertices_estimated as f64 / after_vertices_actual as f64;
4641 assert!(
4642 (0.5..=2.0).contains(&ratio),
4643 "estimated_size ({}) should be within 2x of size_bytes ({}), ratio: {}",
4644 after_vertices_estimated,
4645 after_vertices_actual,
4646 ratio
4647 );
4648
4649 let edge_type = 1u32;
4651 for i in 0..9 {
4652 let eid = writer.next_eid(edge_type).await?;
4653 writer
4654 .insert_edge(
4655 vids[i],
4656 vids[i + 1],
4657 edge_type,
4658 eid,
4659 std::collections::HashMap::new(),
4660 Some("NEXT".to_string()),
4661 None,
4662 )
4663 .await?;
4664 }
4665
4666 let after_edges_estimated = l0.read().estimated_size;
4668 let after_edges_actual = l0.read().size_bytes();
4669 assert!(
4670 after_edges_estimated > after_vertices_estimated,
4671 "estimated_size should grow after edge insertions"
4672 );
4673
4674 let ratio = after_edges_estimated as f64 / after_edges_actual as f64;
4676 assert!(
4677 (0.5..=2.0).contains(&ratio),
4678 "estimated_size ({}) should be within 2x of size_bytes ({}), ratio: {}",
4679 after_edges_estimated,
4680 after_edges_actual,
4681 ratio
4682 );
4683
4684 Ok(())
4685 }
4686
4687 #[tokio::test]
4689 async fn test_flush_wal_empty_l0_is_noop() -> Result<()> {
4690 use crate::runtime::wal::WriteAheadLog;
4691 use crate::storage::manager::StorageManager;
4692 use object_store::local::LocalFileSystem;
4693 use object_store::path::Path as ObjectStorePath;
4694 use uni_common::core::schema::SchemaManager;
4695
4696 let dir = tempdir()?;
4697 let path = dir.path().to_str().unwrap();
4698 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
4699 let schema_path = ObjectStorePath::from("schema.json");
4700
4701 let schema_manager =
4702 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
4703 schema_manager.save().await?;
4704
4705 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
4706
4707 let wal_path = ObjectStorePath::from("wal");
4708 let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
4709
4710 let writer = Writer::new_with_config(
4711 storage.clone(),
4712 schema_manager.clone(),
4713 1,
4714 UniConfig::default(),
4715 Some(wal.clone()),
4716 None,
4717 )
4718 .await?;
4719
4720 let lsn = writer.flush_wal().await?;
4722 assert!(lsn <= 1, "Empty flush should produce low LSN, got {}", lsn);
4724
4725 Ok(())
4726 }
4727
4728 #[tokio::test]
4730 async fn test_transaction_isolation_without_commit() -> Result<()> {
4731 use crate::runtime::wal::WriteAheadLog;
4732 use crate::storage::manager::StorageManager;
4733 use object_store::local::LocalFileSystem;
4734 use object_store::path::Path as ObjectStorePath;
4735 use uni_common::core::schema::SchemaManager;
4736
4737 let dir = tempdir()?;
4738 let path = dir.path().to_str().unwrap();
4739 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
4740 let schema_path = ObjectStorePath::from("schema.json");
4741
4742 let schema_manager =
4743 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
4744 let _label_id = schema_manager.add_label("Person")?;
4745 schema_manager.save().await?;
4746
4747 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
4748
4749 let wal_path = ObjectStorePath::from("wal");
4750 let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
4751
4752 let writer = Writer::new_with_config(
4753 storage.clone(),
4754 schema_manager.clone(),
4755 1,
4756 UniConfig::default(),
4757 Some(wal),
4758 None,
4759 )
4760 .await?;
4761
4762 let tx_l0 = writer.create_transaction_l0();
4764
4765 let vid = writer.next_vid().await?;
4767 writer
4768 .insert_vertex_with_labels(
4769 vid,
4770 [("name".to_string(), Value::String("Ghost".to_string()))]
4771 .into_iter()
4772 .collect(),
4773 &["Person".to_string()],
4774 Some(&tx_l0),
4775 )
4776 .await?;
4777
4778 assert!(
4780 tx_l0.read().vertex_properties.contains_key(&vid),
4781 "Transaction L0 should contain the vertex"
4782 );
4783
4784 let main_l0 = writer.l0_manager.get_current();
4786 assert!(
4787 !main_l0.read().vertex_properties.contains_key(&vid),
4788 "Main L0 should NOT contain uncommitted transaction data"
4789 );
4790
4791 drop(tx_l0);
4793
4794 assert!(
4796 !main_l0.read().vertex_properties.contains_key(&vid),
4797 "Main L0 should remain clean after dropped transaction"
4798 );
4799
4800 Ok(())
4801 }
4802
4803 #[tokio::test]
4813 async fn fork_fragment_warn_fires_once_then_silences() -> Result<()> {
4814 use crate::storage::manager::StorageManager;
4815 use object_store::local::LocalFileSystem;
4816 use object_store::path::Path as ObjectStorePath;
4817 use uni_common::core::fork::ForkId;
4818 use uni_common::core::schema::SchemaManager;
4819
4820 let dir = tempdir()?;
4821 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
4822 let schema_path = ObjectStorePath::from("schema.json");
4823 let schema_manager =
4824 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
4825 let storage = Arc::new(
4826 StorageManager::new(dir.path().to_str().unwrap(), schema_manager.clone()).await?,
4827 );
4828
4829 let config = UniConfig {
4830 fork_fragment_warn_threshold: 3,
4831 ..Default::default()
4832 };
4833 let mut writer =
4834 Writer::new_with_config(storage, schema_manager, 1, config, None, None).await?;
4835
4836 for _ in 0..10 {
4838 writer.tick_fork_fragment_observability();
4839 }
4840 assert!(!writer.fork_fragment_warn_fired.load(Ordering::Relaxed));
4841 assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 0);
4842
4843 writer.fork_id = Some(ForkId::new());
4845 writer.tick_fork_fragment_observability();
4846 writer.tick_fork_fragment_observability();
4847 assert!(!writer.fork_fragment_warn_fired.load(Ordering::Relaxed));
4848 assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 2);
4849
4850 writer.tick_fork_fragment_observability();
4852 assert!(writer.fork_fragment_warn_fired.load(Ordering::Relaxed));
4853 assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 3);
4854
4855 let fired_after = writer.fork_fragment_warn_fired.load(Ordering::Relaxed);
4857 for _ in 0..5 {
4858 writer.tick_fork_fragment_observability();
4859 }
4860 assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 8);
4861 assert_eq!(
4862 writer.fork_fragment_warn_fired.load(Ordering::Relaxed),
4863 fired_after
4864 );
4865
4866 Ok(())
4867 }
4868
4869 #[tokio::test]
4881 async fn hot_path_mutators_do_not_change_writer_fields() -> Result<()> {
4882 use crate::storage::manager::StorageManager;
4883 use object_store::local::LocalFileSystem;
4884 use object_store::path::Path as ObjectStorePath;
4885 use uni_common::core::schema::SchemaManager;
4886
4887 let dir = tempdir()?;
4888 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
4889 let schema_path = ObjectStorePath::from("schema.json");
4890 let schema_manager =
4891 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
4892 schema_manager.add_label("Person")?;
4893 schema_manager.save().await?;
4894 let storage = Arc::new(
4895 StorageManager::new(dir.path().to_str().unwrap(), schema_manager.clone()).await?,
4896 );
4897
4898 let writer =
4899 Writer::new_with_config(storage, schema_manager, 1, UniConfig::default(), None, None)
4900 .await?;
4901
4902 #[derive(Debug, PartialEq)]
4908 struct Snapshot {
4909 last_flush_time: std::time::Instant,
4910 cached_manifest_some: bool,
4911 fork_flush_count: u64,
4912 fork_fragment_warn_fired: bool,
4913 xervo_runtime_some: bool,
4914 index_rebuild_manager_some: bool,
4915 fork_id: Option<ForkId>,
4916 }
4917
4918 fn snap(w: &Writer) -> Snapshot {
4919 Snapshot {
4920 last_flush_time: *w.last_flush_time.lock(),
4921 cached_manifest_some: w.cached_manifest.lock().is_some(),
4922 fork_flush_count: w.fork_flush_count.load(Ordering::Relaxed),
4923 fork_fragment_warn_fired: w.fork_fragment_warn_fired.load(Ordering::Relaxed),
4924 xervo_runtime_some: w.xervo_runtime.get().is_some(),
4925 index_rebuild_manager_some: w.index_rebuild_manager.get().is_some(),
4926 fork_id: w.fork_id,
4927 }
4928 }
4929
4930 let before = snap(&writer);
4932 let vid = writer.next_vid().await?;
4933 writer
4934 .insert_vertex_with_labels(vid, Properties::new(), &["Person".to_string()], None)
4935 .await?;
4936 assert_eq!(
4937 snap(&writer),
4938 before,
4939 "insert_vertex_with_labels mutated a Writer field"
4940 );
4941
4942 let before = snap(&writer);
4944 let vids = writer.allocate_vids(2).await?;
4945 writer
4946 .insert_vertices_batch(
4947 vids,
4948 vec![Properties::new(), Properties::new()],
4949 vec!["Person".into()],
4950 None,
4951 )
4952 .await?;
4953 assert_eq!(
4954 snap(&writer),
4955 before,
4956 "insert_vertices_batch mutated a Writer field"
4957 );
4958
4959 let before = snap(&writer);
4961 writer.delete_vertex(vid, None, None).await?;
4962 assert_eq!(
4963 snap(&writer),
4964 before,
4965 "delete_vertex mutated a Writer field"
4966 );
4967
4968 Ok(())
4973 }
4974}