uni_store/runtime/writer.rs
1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4#[cfg(feature = "lance-backend")]
5use crate::backend::table_names;
6use crate::runtime::context::QueryContext;
7use crate::runtime::flush_coordinator::{
8 FinalizeFn, FlushCoordinator, FlushOutcome as AsyncFlushOutcome, RotatedFlush, SharedFlushCtx,
9};
10use crate::runtime::id_allocator::IdAllocator;
11use crate::runtime::l0::{L0Buffer, serialize_constraint_key};
12use crate::runtime::l0_manager::L0Manager;
13use crate::runtime::property_manager::PropertyManager;
14use crate::runtime::wal::WriteAheadLog;
15use crate::storage::adjacency_manager::AdjacencyManager;
16use crate::storage::delta::{L1Entry, Op};
17use crate::storage::main_edge::MainEdgeDataset;
18use crate::storage::main_vertex::MainVertexDataset;
19use crate::storage::manager::StorageManager;
20use anyhow::{Result, anyhow};
21use chrono::Utc;
22use metrics;
23use parking_lot::{Mutex as PlMutex, RwLock};
24use std::collections::{BTreeMap, HashMap, HashSet};
25use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
26use std::sync::{Arc, OnceLock};
27use tracing::{debug, info, instrument};
28use uni_common::Properties;
29use uni_common::Value;
30use uni_common::config::UniConfig;
31use uni_common::core::fork::ForkId;
32use uni_common::core::id::{Eid, Vid};
33use uni_common::core::schema::{ConstraintTarget, ConstraintType, IndexDefinition};
34use uni_common::core::snapshot::{EdgeSnapshot, LabelSnapshot, SnapshotManifest};
35use uni_xervo::runtime::ModelRuntime;
36use uuid::Uuid;
37
38/// Whether `property` on `label` is a multi-vector (`List<Vector>`) column — the
39/// late-interaction (ColBERT) shape that auto-embeds per-token via the multi-vector model
40/// (issue #104). A plain `Vector` column uses the dense single-vector model.
41fn is_multivector_property(
42 schema: &uni_common::core::schema::Schema,
43 label: &str,
44 property: &str,
45) -> bool {
46 schema
47 .properties
48 .get(label)
49 .and_then(|p| p.get(property))
50 .is_some_and(|m| {
51 matches!(&m.r#type, uni_common::DataType::List(inner)
52 if matches!(**inner, uni_common::DataType::Vector { .. }))
53 })
54}
55
56/// Convert per-token embedding vectors into the stored `List<Vector>` representation:
57/// a `Value::List` whose elements are each a `Value::List<Float>` (one token vector).
58fn multivec_to_value(tokens: &[Vec<f32>]) -> Value {
59 Value::List(
60 tokens
61 .iter()
62 .map(|tok| Value::List(tok.iter().map(|f| Value::Float(*f as f64)).collect()))
63 .collect(),
64 )
65}
66
67/// One coalesced auto-embed group: all targets that share an `alias` + `source_properties`,
68/// split into dense (`Vector`) vs multi-vector (`List<Vector>`) target columns. When a group
69/// has BOTH kinds, a single hybrid inference fills both (single forward pass).
70struct EmbedGroupSpec {
71 source_properties: Vec<String>,
72 document_prefix: Option<String>,
73 dense: Vec<String>,
74 multi: Vec<String>,
75 sparse: Vec<String>,
76}
77
78/// Convert one xervo sparse embedding (`(term_id, weight)` pairs, possibly
79/// unsorted / with duplicate terms) into a `Value::SparseVector`. Sorting (via
80/// `BTreeMap`) and summing duplicates yields the sorted-unique invariant the
81/// index and codec require; non-finite weights are dropped (never poison a row).
82fn sparse_pairs_to_value(pairs: &[(u32, f32)]) -> Value {
83 let mut by_term: std::collections::BTreeMap<u32, f32> = std::collections::BTreeMap::new();
84 for &(term, weight) in pairs {
85 if weight.is_finite() {
86 *by_term.entry(term).or_insert(0.0) += weight;
87 }
88 }
89 Value::SparseVector {
90 indices: by_term.keys().copied().collect(),
91 values: by_term.values().copied().collect(),
92 }
93}
94
95/// Run ONE embedding inference for a coalesced group of auto-embed targets sharing an alias +
96/// source text, returning `(dense_per_text, multi_vector_per_text)` (each `Some` iff that head
97/// was requested). When both heads are needed this uses the hybrid model
98/// (`hybrid_embedder`), so a multi-functional model (e.g. BGE-M3) produces the dense + ColBERT
99/// heads from a SINGLE forward pass; otherwise it uses the single-head dense / multi-vector
100/// embedder (today's behavior for non-mixed groups).
101#[allow(clippy::type_complexity)]
102async fn embed_group(
103 runtime: &ModelRuntime,
104 alias: &str,
105 texts: &[&str],
106 want_dense: bool,
107 want_multi: bool,
108 want_sparse: bool,
109) -> Result<(
110 Option<Vec<Vec<f32>>>,
111 Option<Vec<Vec<Vec<f32>>>>,
112 Option<Vec<Vec<(u32, f32)>>>,
113)> {
114 use uni_xervo::traits::hybrid::HeadSet;
115 let heads_wanted = u8::from(want_dense) + u8::from(want_multi) + u8::from(want_sparse);
116 if heads_wanted > 1 {
117 // Single-pass hybrid: one model, one inference, all requested heads
118 // (e.g. BGE-M3 fills dense + sparse from one forward pass).
119 let mut heads = HeadSet::empty();
120 if want_dense {
121 heads |= HeadSet::DENSE;
122 }
123 if want_multi {
124 heads |= HeadSet::MULTI_VECTOR;
125 }
126 if want_sparse {
127 heads |= HeadSet::SPARSE;
128 }
129 let embedder = runtime.hybrid_embedder(alias).await?;
130 let res = embedder.embed(texts, heads).await?;
131 let dense = if want_dense {
132 Some(res.dense.ok_or_else(|| {
133 anyhow!("hybrid model '{alias}' returned no dense head for a Vector target")
134 })?)
135 } else {
136 None
137 };
138 let multi = if want_multi {
139 Some(res.multi_vector.ok_or_else(|| {
140 anyhow!(
141 "hybrid model '{alias}' returned no multi-vector head for a List<Vector> target"
142 )
143 })?)
144 } else {
145 None
146 };
147 let sparse = if want_sparse {
148 Some(res.sparse.ok_or_else(|| {
149 anyhow!("hybrid model '{alias}' returned no sparse head for a SparseVector target")
150 })?)
151 } else {
152 None
153 };
154 Ok((dense, multi, sparse))
155 } else if want_multi {
156 let embedder = runtime.multi_vector_embedder(alias).await?;
157 Ok((None, Some(embedder.embed(texts).await?.vectors), None))
158 } else if want_sparse {
159 let embedder = runtime.sparse_embedder(alias).await?;
160 Ok((None, None, Some(embedder.embed(texts).await?.vectors)))
161 } else {
162 let embedder = runtime.embedding(alias).await?;
163 Ok((Some(embedder.embed(texts).await?.vectors), None, None))
164 }
165}
166
167/// Group a label's auto-embed configs by `(alias, source_properties)`, classifying each target
168/// column as dense vs multi-vector. A group with both kinds is a single-pass hybrid source.
169fn collect_embed_groups(
170 schema: &uni_common::core::schema::Schema,
171 label: &str,
172) -> std::collections::BTreeMap<(String, Vec<String>), EmbedGroupSpec> {
173 use std::collections::BTreeMap;
174 let mut groups: BTreeMap<(String, Vec<String>), EmbedGroupSpec> = BTreeMap::new();
175 fn spec_for<'a>(
176 groups: &'a mut std::collections::BTreeMap<(String, Vec<String>), EmbedGroupSpec>,
177 emb: &uni_common::core::schema::EmbeddingConfig,
178 ) -> &'a mut EmbedGroupSpec {
179 groups
180 .entry((emb.alias.clone(), emb.source_properties.clone()))
181 .or_insert_with(|| EmbedGroupSpec {
182 source_properties: emb.source_properties.clone(),
183 document_prefix: emb.document_prefix.clone(),
184 dense: Vec::new(),
185 multi: Vec::new(),
186 sparse: Vec::new(),
187 })
188 }
189 for idx in &schema.indexes {
190 if let IndexDefinition::Vector(v_config) = idx
191 && v_config.label == label
192 && let Some(emb) = &v_config.embedding_config
193 {
194 let g = spec_for(&mut groups, emb);
195 if is_multivector_property(schema, label, &v_config.property) {
196 g.multi.push(v_config.property.clone());
197 } else {
198 g.dense.push(v_config.property.clone());
199 }
200 } else if let IndexDefinition::Sparse(s_config) = idx
201 && s_config.label == label
202 && let Some(emb) = &s_config.embedding_config
203 {
204 spec_for(&mut groups, emb)
205 .sparse
206 .push(s_config.property.clone());
207 }
208 }
209 groups
210}
211
212/// On a partial / `SET` write, when the write touches a *source* property of an
213/// auto-embed group, drop that group's target columns from `props` so the
214/// `!contains_key` guard in `process_embeddings_*` re-embeds them (otherwise the
215/// re-read old embedding is preserved → stale), and add the targets to
216/// `touched_keys` so the partial Lance write persists the refresh. Mirrors the
217/// MUVERA derived-column touched-keys handling. A target the caller set
218/// explicitly in the SAME write (already in `touched_keys`) is left intact.
219fn refresh_touched_embed_targets(
220 schema: &uni_common::core::schema::Schema,
221 props: &mut Properties,
222 touched_keys: &mut HashSet<String>,
223 labels: &[String],
224) {
225 let Some(label) = labels.first() else {
226 return;
227 };
228 let user_touched = touched_keys.clone();
229 for (_key, group) in collect_embed_groups(schema, label) {
230 if !group
231 .source_properties
232 .iter()
233 .any(|s| touched_keys.contains(s))
234 {
235 continue;
236 }
237 for target in group
238 .dense
239 .iter()
240 .chain(group.multi.iter())
241 .chain(group.sparse.iter())
242 {
243 if user_touched.contains(target) {
244 continue;
245 }
246 props.remove(target);
247 touched_keys.insert(target.clone());
248 }
249 }
250}
251
252#[derive(Clone, Debug)]
253pub struct WriterConfig {
254 pub max_mutations: usize,
255 /// Enable the partial-column MergeInsert path for SET-only flushes.
256 ///
257 /// When `true`, `Writer::insert_vertex_partial` records the touched
258 /// property keys into `L0Buffer::vertex_partial_keys` and the flush
259 /// routes those VIDs through Lance `MergeInsertBuilder` with a
260 /// subset-of-schema source, skipping the read of (and write of)
261 /// the unchanged columns — including wide ones like embeddings.
262 ///
263 /// When `false`, `insert_vertex_partial` falls back to the
264 /// read-modify-write `insert_vertex_with_labels` path (preserving
265 /// bit-for-bit equivalence with prior releases). Default `false`
266 /// for the first release; flip to `true` after telemetry on the
267 /// issue #72 ingest workload confirms the win.
268 ///
269 /// See the soundness probe at
270 /// `crates/uni-store/tests/common/storage/lance_merge_insert_probe.rs`.
271 pub partial_lance_writes: bool,
272}
273
274impl Default for WriterConfig {
275 fn default() -> Self {
276 Self {
277 max_mutations: 10_000,
278 partial_lance_writes: false,
279 }
280 }
281}
282
283/// Parent state captured atomically at a fork point under `flush_lock`.
284///
285/// Holds the allocator high-water marks and every existing dataset's
286/// Lance main-branch version at the instant the parent's L0 was
287/// flushed. Because [`Writer::flush_and_capture_fork_point`] reads these
288/// while still holding `flush_lock`, no concurrent commit or flush can
289/// advance the allocator or any dataset tip between the flush and the
290/// reads. A fork built from these values therefore cannot collide VIDs
291/// with the parent nor inherit rows committed after the fork point.
292#[derive(Clone, Debug, Default)]
293pub struct ForkPoint {
294 /// Next vertex id the parent would allocate at the fork point.
295 pub vid_hwm: u64,
296 /// Next edge id the parent would allocate at the fork point.
297 pub eid_hwm: u64,
298 /// `dataset_name` → Lance main-branch version at the fork point.
299 ///
300 /// Keys use the same dataset naming as the fork branch loop
301 /// (`vertices`, `edges`, `vertices_{label}`, `deltas_{type}_{dir}`,
302 /// `adjacency_{type}_{dir}`). A dataset with no `.lance` directory
303 /// on disk at the fork point has no entry.
304 pub dataset_versions: BTreeMap<String, u64>,
305 /// Parent's MVCC version high-water-mark at the fork point: the
306 /// largest `_version` any inherited row can carry. A fork bootstraps
307 /// its own version counter to this floor so a fork transaction's
308 /// `_version <= pin` read still sees inherited (base_paths) rows,
309 /// while the fork's own writes get versions above it.
310 pub version_hwm: u64,
311}
312
313/// RAII latch on [`StorageManager::flush_in_progress`].
314///
315/// Sets the flag to `true` on construction (via CAS) and back to `false` on
316/// drop, so any `?` early-exit inside `flush_to_l1` cannot leave the flag
317/// stuck. Returns `None` if a flush is already in progress, providing
318/// forward-compatible exclusion once the outer writer-RwLock is removed in
319/// Phase 4 of the concurrent-writer refactor.
320// FlushInProgressGuard moved to storage/manager.rs so flush_coordinator.rs
321// can hold it on RotatedFlush without a writer.rs back-import cycle.
322pub use crate::storage::manager::FlushInProgressGuard;
323
324/// Output of [`Writer::flush_l0_rotate`]: the to-be-flushed L0 buffer,
325/// captured WAL LSN, current_version, and the in-progress guard whose
326/// lifetime spans the full flush (including the future async stream
327/// phase that runs on a spawned task).
328struct RotateOutput {
329 old_l0_arc: Arc<RwLock<L0Buffer>>,
330 wal_lsn: u64,
331 current_version: u64,
332 flush_in_progress_guard: FlushInProgressGuard,
333}
334
335/// Project a property map to a subset selected by `keys`. Used to
336/// run `touched_needs_full_read` against just the SET-touched keys
337/// when the caller passes a fully-merged `props` map.
338fn props_subset(props: &Properties, keys: &HashSet<String>) -> Properties {
339 let mut out = Properties::new();
340 for k in keys {
341 if let Some(v) = props.get(k) {
342 out.insert(k.clone(), v.clone());
343 }
344 }
345 out
346}
347
348/// Join a storage base URI and a dataset name into a `.lance` URI.
349///
350/// Mirrors the fork branch loop's `join_uri` so the versions captured
351/// by [`Writer::flush_and_capture_fork_point`] key the exact same
352/// datasets the fork later branches.
353fn join_lance_uri(base: &str, dataset: &str) -> String {
354 if base.ends_with('/') {
355 format!("{base}{dataset}.lance")
356 } else {
357 format!("{base}/{dataset}.lance")
358 }
359}
360
361/// Cheap on-disk existence check for a dataset `.lance` directory.
362///
363/// Local-fs heuristic: a URI with a `://` scheme is assumed remote and
364/// reported present, deferring the real check to `current_version`.
365/// Mirrors the fork branch loop's `path_exists`.
366fn lance_path_exists(uri: &str) -> bool {
367 if uri.contains("://") {
368 return true;
369 }
370 std::path::Path::new(uri).exists()
371}
372
373/// Output of [`Writer::flush_stream_l1`]: the built (but not yet
374/// published) snapshot manifest and its id. Finalize is responsible
375/// for `save_snapshot` + `set_latest_snapshot` + `cached_manifest`
376/// update.
377struct FlushOutcome {
378 manifest: SnapshotManifest,
379 snapshot_id: String,
380}
381
382pub struct Writer {
383 pub l0_manager: Arc<L0Manager>,
384 pub storage: Arc<StorageManager>,
385 pub schema_manager: Arc<uni_common::core::schema::SchemaManager>,
386 pub allocator: Arc<IdAllocator>,
387 pub config: UniConfig,
388 /// Optional embedding runtime. `OnceLock` so the initializer can run
389 /// on `&self` after the `Writer` has been wrapped in `Arc<Writer>`
390 /// (Phase 4 of concurrent_writer.md). Read through
391 /// [`Writer::xervo_runtime`] — the field itself is private to keep
392 /// callers oblivious to the OnceLock representation.
393 xervo_runtime: OnceLock<Arc<ModelRuntime>>,
394 /// Property manager for cache invalidation after flush
395 pub property_manager: Option<Arc<PropertyManager>>,
396 /// Adjacency manager for dual-write (edges survive flush).
397 adjacency_manager: Arc<AdjacencyManager>,
398 /// Timestamp of last flush or creation. Interior-mutable so that
399 /// `&self` callers can update it; uncontended in practice because all
400 /// writes happen inside the single-flusher critical section.
401 /// Arc-wrapped so it can travel into the SharedFlushCtx that the
402 /// async-flush coordinator passes to spawned stream/finalize tasks.
403 last_flush_time: Arc<PlMutex<std::time::Instant>>,
404 /// Background compaction task handle (prevents concurrent compaction races)
405 compaction_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
406 /// Optional index rebuild manager for post-flush automatic rebuild scheduling.
407 /// `OnceLock` for the same reason as `xervo_runtime`.
408 /// Wrapped in `Arc` so the async-flush finalize path can read it
409 /// from a spawned task via `SharedFlushCtx`.
410 index_rebuild_manager: Arc<OnceLock<Arc<crate::storage::index_rebuild::IndexRebuildManager>>>,
411 /// Cached snapshot manifest from the last flush. Avoids re-reading from
412 /// object store on every flush_to_l1 call. Wrapped in a `Mutex` for
413 /// `&self` access; uncontended because all access is inside the
414 /// single-flusher critical section.
415 cached_manifest: Arc<PlMutex<Option<SnapshotManifest>>>,
416 /// Identifier of the fork this writer serves, if any. `None` for
417 /// primary's writer. Set by [`crate::fork::writer_factory::new_for_fork`]
418 /// and read in `flush_to_l1` to emit fork-tagged metrics and to fire
419 /// the fragment-count guard rail (Phase 2 Day 12).
420 pub fork_id: Option<ForkId>,
421 /// Number of `flush_to_l1` calls since this writer was constructed.
422 /// Used as a proxy for L1 fragment growth on the fork's branches:
423 /// each flush typically appends ~1 fragment per touched dataset, so
424 /// the count tracks the order of magnitude of fragment accumulation.
425 /// Reading the actual `Dataset::manifest().fragments.len()` per
426 /// flush would add a per-dataset object-store roundtrip on the hot
427 /// commit path; the proxy keeps the guard rail purely observational
428 /// (Phase 5 introduces fork compaction proper). Only meaningful when
429 /// `fork_id.is_some()`. `Relaxed` is sufficient — observational only.
430 fork_flush_count: Arc<AtomicU64>,
431 /// Whether the fork-fragment warning has already fired at the
432 /// configured threshold. One-shot per writer lifetime. `Relaxed` is
433 /// sufficient — observational only.
434 fork_fragment_warn_fired: Arc<AtomicBool>,
435 /// Dedicated lock for the genuinely-exclusive flush path. Acquired by
436 /// the [`Writer::flush_to_l1`] entry and by `commit_transaction_l0`
437 /// across its WAL-append + L0-merge window. Replaces the outer
438 /// `Arc<RwLock<Writer>>` for flush exclusion once Phase 4 drops it.
439 /// Arc-wrapped so async-flush coordinator's finalize path can
440 /// re-acquire it from a spawned task via SharedFlushCtx.
441 flush_lock: Arc<tokio::sync::Mutex<()>>,
442 /// Coordinator for async-flush pipeline. Owns the back-pressure
443 /// semaphore, rotate-order sequence, single-finalizer task, and
444 /// pending-flush counter. Always present even when async flush is
445 /// disabled — the sync `flush_to_l1` path uses it for the future
446 /// `FlushInProgressGuard`/permit ownership model.
447 /// Coordinator is `None` when `async_flush_enabled = false`. The
448 /// coordinator's finalizer task captures `SharedFlushCtx` which
449 /// includes `Arc<StorageManager>`; on a fork-scoped Writer that
450 /// also pins the fork's `ForkScope` via `storage.fork_scope`, so
451 /// the holder count never drops. Constructing it only when the
452 /// feature is actually on avoids that side-effect for all
453 /// existing sync-flush paths. When async-flush graduates from
454 /// opt-in to default (Commit 12), `drop_fork` (Commit 8) handles
455 /// the drain explicitly.
456 #[allow(dead_code)] // first production use lands in Commit 6/7
457 pub(crate) flush_coordinator: Option<Arc<crate::runtime::flush_coordinator::FlushCoordinator>>,
458 /// Optimistic-concurrency commit-sequence counter (SSI). Incremented once
459 /// per successful commit under `flush_lock`; a transaction captures the
460 /// current value at begin as its read sequence (`L0Buffer::occ_read_seq`).
461 ///
462 /// Always allocated; consulted only when `config.ssi_enabled` is `true`.
463 ///
464 /// Typed through the [`crate::runtime::sync`] shim so the OCC commit core can
465 /// be model-checked under loom/shuttle; aliases to `std::AtomicU64` normally.
466 commit_sequence: Arc<crate::runtime::sync::AtomicU64>,
467 /// Bounded log of recently-committed write-sets for OCC conflict detection.
468 /// Read and updated only under `flush_lock`.
469 ///
470 /// Always allocated; consulted only when `config.ssi_enabled` is `true`.
471 committed_writes: Arc<PlMutex<crate::runtime::occ::CommitRegistry>>,
472 /// Per-row pessimistic locks for `FOR UPDATE` (SSI escape hatch), keyed by
473 /// canonical (label, key-props) bytes. A transaction holds the lock from
474 /// MATCH until commit/rollback, serializing concurrent `FOR UPDATE` writers
475 /// on the same key (avoiding optimistic abort-retry on hot keys).
476 ///
477 /// Always allocated; populated only when `config.ssi_enabled` is `true`.
478 for_update_locks: Arc<dashmap::DashMap<Vec<u8>, Arc<tokio::sync::Mutex<()>>>>,
479}
480
481/// Number of recent commits retained for OCC conflict detection. Large enough
482/// that under-run — and the resulting conservative abort — is rare in practice;
483/// each entry is a small set of touched ids.
484const OCC_REGISTRY_CAPACITY: usize = 4096;
485
486impl Writer {
487 pub async fn new(
488 storage: Arc<StorageManager>,
489 schema_manager: Arc<uni_common::core::schema::SchemaManager>,
490 start_version: u64,
491 ) -> Result<Self> {
492 Self::new_with_config(
493 storage,
494 schema_manager,
495 start_version,
496 UniConfig::default(),
497 None,
498 None,
499 )
500 .await
501 }
502
503 pub async fn new_with_config(
504 storage: Arc<StorageManager>,
505 schema_manager: Arc<uni_common::core::schema::SchemaManager>,
506 start_version: u64,
507 config: UniConfig,
508 wal: Option<Arc<WriteAheadLog>>,
509 allocator: Option<Arc<IdAllocator>>,
510 ) -> Result<Self> {
511 let allocator = if let Some(a) = allocator {
512 a
513 } else {
514 let store = storage.store();
515 let path = object_store::path::Path::from("id_allocator.json");
516 Arc::new(IdAllocator::new(store, path, 1000).await?)
517 };
518
519 let l0_manager = Arc::new(L0Manager::new(start_version, wal));
520
521 let property_manager = Some(Arc::new(PropertyManager::new(
522 storage.clone(),
523 schema_manager.clone(),
524 1000,
525 )));
526
527 let adjacency_manager = storage.adjacency_manager();
528
529 // Hoist the Arc'd fields so we can both stash them on Writer and
530 // hand the same Arcs to the SharedFlushCtx that FlushCoordinator
531 // captures. Single-source-of-truth for each piece of mutable
532 // shared state.
533 let last_flush_time = Arc::new(PlMutex::new(std::time::Instant::now()));
534 let cached_manifest = Arc::new(PlMutex::new(None));
535 let fork_flush_count = Arc::new(AtomicU64::new(0));
536 let fork_fragment_warn_fired = Arc::new(AtomicBool::new(false));
537 let flush_lock = Arc::new(tokio::sync::Mutex::new(()));
538 let compaction_handle = Arc::new(RwLock::new(None));
539 let index_rebuild_manager: Arc<
540 OnceLock<Arc<crate::storage::index_rebuild::IndexRebuildManager>>,
541 > = Arc::new(OnceLock::new());
542
543 let flush_coordinator = if config.async_flush_enabled {
544 let shared = SharedFlushCtx {
545 storage: storage.clone(),
546 l0_manager: l0_manager.clone(),
547 adjacency_manager: adjacency_manager.clone(),
548 property_manager: property_manager.clone(),
549 schema_manager: schema_manager.clone(),
550 cached_manifest: cached_manifest.clone(),
551 last_flush_time: last_flush_time.clone(),
552 fork_id: None,
553 fork_flush_count: fork_flush_count.clone(),
554 fork_fragment_warn_fired: fork_fragment_warn_fired.clone(),
555 fork_fragment_warn_threshold: config.fork_fragment_warn_threshold,
556 flush_lock: flush_lock.clone(),
557 index_rebuild_manager: index_rebuild_manager.clone(),
558 compaction_handle: compaction_handle.clone(),
559 compaction_config: config.compaction.clone(),
560 index_rebuild_config: config.index_rebuild.clone(),
561 auto_rebuild_enabled: config.index_rebuild.auto_rebuild_enabled,
562 };
563 let finalize_fn: Arc<dyn FinalizeFn> = Arc::new(WriterFinalizer);
564 Some(Arc::new(FlushCoordinator::new(
565 config.max_pending_flushes,
566 shared,
567 finalize_fn,
568 )))
569 } else {
570 None
571 };
572
573 let commit_sequence = Arc::new(crate::runtime::sync::AtomicU64::new(0));
574 let committed_writes = Arc::new(PlMutex::new(crate::runtime::occ::CommitRegistry::new(
575 OCC_REGISTRY_CAPACITY,
576 )));
577 let for_update_locks = Arc::new(dashmap::DashMap::new());
578
579 Ok(Self {
580 l0_manager,
581 storage,
582 schema_manager,
583 allocator,
584 config,
585 xervo_runtime: OnceLock::new(),
586 property_manager,
587 adjacency_manager,
588 last_flush_time,
589 compaction_handle,
590 index_rebuild_manager,
591 cached_manifest,
592 fork_id: None,
593 fork_flush_count,
594 fork_fragment_warn_fired,
595 flush_lock,
596 flush_coordinator,
597 commit_sequence,
598 committed_writes,
599 for_update_locks,
600 })
601 }
602
603 /// Returns the shared pessimistic lock handle for a `FOR UPDATE` row key,
604 /// creating it on first use. The caller `.lock_owned().await`s the returned
605 /// mutex and holds the guard for the transaction's lifetime.
606 pub fn row_lock_handle(&self, key: &[u8]) -> Arc<tokio::sync::Mutex<()>> {
607 self.for_update_locks
608 .entry(key.to_vec())
609 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
610 .clone()
611 }
612
613 /// Prunes `FOR UPDATE` lock-map entries for `keys` that no live transaction
614 /// holds anymore, so the map does not grow without bound across the keyspace.
615 ///
616 /// Called when a transaction ends, **after** its guards have been dropped.
617 /// `remove_if` evaluates its predicate under the DashMap shard lock, which is
618 /// the same lock `row_lock_handle` takes to clone an entry — so the check
619 /// `strong_count == 1` (only the map holds the `Arc`) is race-free: a
620 /// concurrent acquirer either already cloned the `Arc` (count ≥ 2 → we skip
621 /// removal) or has not yet taken the shard lock (it will mint a fresh entry
622 /// after we remove). Either way no two transactions ever lock different
623 /// `Mutex` instances for the same key.
624 pub fn release_for_update_locks(&self, keys: &[Vec<u8>]) {
625 for key in keys {
626 self.for_update_locks
627 .remove_if(key, |_, handle| Arc::strong_count(handle) == 1);
628 }
629 }
630
631 /// Number of live entries in the `FOR UPDATE` lock map. Introspection for
632 /// tests that the map does not leak entries across transactions (G5).
633 pub fn for_update_lock_count(&self) -> usize {
634 self.for_update_locks.len()
635 }
636
637 /// The current OCC commit sequence. A `FOR UPDATE` acquisition re-stamps a
638 /// fresh transaction's `occ_read_seq` to this so its conflict-detection
639 /// baseline advances to lock-acquisition time (read-latest under the lock).
640 pub fn current_commit_sequence(&self) -> u64 {
641 self.commit_sequence
642 .load(crate::runtime::sync::Ordering::Relaxed)
643 }
644
645 /// Build a fresh `SharedFlushCtx` from this Writer's current state.
646 /// Used by the async-flush stream/finalize paths to pass into spawned
647 /// tasks without smuggling `Arc<Writer>` (which would create a cycle
648 /// with `flush_coordinator -> FinalizeFn -> Writer`).
649 pub(crate) fn shared_ctx(&self) -> SharedFlushCtx {
650 SharedFlushCtx {
651 storage: self.storage.clone(),
652 l0_manager: self.l0_manager.clone(),
653 adjacency_manager: self.adjacency_manager.clone(),
654 property_manager: self.property_manager.clone(),
655 schema_manager: self.schema_manager.clone(),
656 cached_manifest: self.cached_manifest.clone(),
657 last_flush_time: self.last_flush_time.clone(),
658 fork_id: self.fork_id,
659 fork_flush_count: self.fork_flush_count.clone(),
660 fork_fragment_warn_fired: self.fork_fragment_warn_fired.clone(),
661 fork_fragment_warn_threshold: self.config.fork_fragment_warn_threshold,
662 flush_lock: self.flush_lock.clone(),
663 index_rebuild_manager: self.index_rebuild_manager.clone(),
664 compaction_handle: self.compaction_handle.clone(),
665 compaction_config: self.config.compaction.clone(),
666 index_rebuild_config: self.config.index_rebuild.clone(),
667 auto_rebuild_enabled: self.config.index_rebuild.auto_rebuild_enabled,
668 }
669 }
670
671 /// Borrow the flush coordinator if async flush is enabled.
672 /// Returns `None` when `config.async_flush_enabled = false`.
673 /// External callers (`drop_fork`) use this to drain pending streams.
674 pub fn flush_coordinator(
675 &self,
676 ) -> Option<&Arc<crate::runtime::flush_coordinator::FlushCoordinator>> {
677 self.flush_coordinator.as_ref()
678 }
679
680 /// Set the index rebuild manager for post-flush automatic rebuild scheduling.
681 ///
682 /// One-shot: returns `Err` if already set. The receiver is `&self` so this
683 /// can be called after the `Writer` has been wrapped in `Arc<Writer>`.
684 pub fn set_index_rebuild_manager(
685 &self,
686 manager: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
687 ) -> Result<()> {
688 self.index_rebuild_manager
689 .set(manager)
690 .map_err(|_| anyhow!("index_rebuild_manager already set"))
691 }
692
693 /// Replay WAL mutations into the current L0 buffer.
694 pub async fn replay_wal(&self, wal_high_water_mark: u64) -> Result<usize> {
695 let l0 = self.l0_manager.get_current();
696 let wal = l0.read().wal.clone();
697
698 if let Some(wal) = wal {
699 wal.initialize().await?;
700 let mutations = wal.replay_since(wal_high_water_mark).await?;
701 let count = mutations.len();
702
703 if count > 0 {
704 log::info!(
705 "Replaying {} mutations from WAL (LSN > {})",
706 count,
707 wal_high_water_mark
708 );
709 let mut l0_guard = l0.write();
710 l0_guard.replay_mutations(mutations)?;
711 // Rebuild the UNIQUE constraint index over the recovered rows
712 // (Bug #9 Mechanism B). `replay_mutations` restores
713 // vertices/properties/labels but never repopulates
714 // `constraint_index` (its only other caller is the live insert
715 // path). Without this, a unique key that lives only in the WAL
716 // (committed but not yet flushed to Lance) is invisible to
717 // `check_unique_constraint_multi` after recovery and a
718 // duplicate of it could be created.
719 self.rebuild_constraint_index(&mut l0_guard);
720 }
721
722 Ok(count)
723 } else {
724 Ok(0)
725 }
726 }
727
728 /// Rebuild the UNIQUE constraint index on a recovered L0 buffer.
729 ///
730 /// Scans every recovered vertex's properties and, for each enabled UNIQUE
731 /// constraint whose target label the vertex carries and whose member
732 /// properties are all present, inserts the same constraint key the live
733 /// insert path builds (`serialize_constraint_key`). Tombstoned vertices are
734 /// skipped. Called after [`L0Buffer::replay_mutations`] under the buffer's
735 /// write lock; the schema is already loaded on the `Writer`.
736 fn rebuild_constraint_index(&self, l0_guard: &mut L0Buffer) {
737 let schema = self.schema_manager.schema();
738 // Collect entries first to avoid borrowing `vertex_properties`
739 // immutably while mutating `constraint_index` through the same guard.
740 let mut keys: Vec<(Vec<u8>, Vid)> = Vec::new();
741 for (&vid, props) in &l0_guard.vertex_properties {
742 if l0_guard.vertex_tombstones.contains(&vid) {
743 continue;
744 }
745 let Some(labels) = l0_guard.vertex_labels.get(&vid) else {
746 continue;
747 };
748 for label in labels {
749 for constraint in &schema.constraints {
750 if !constraint.enabled {
751 continue;
752 }
753 let ConstraintTarget::Label(l) = &constraint.target else {
754 continue;
755 };
756 if l != label {
757 continue;
758 }
759 let ConstraintType::Unique {
760 properties: unique_props,
761 } = &constraint.constraint_type
762 else {
763 continue;
764 };
765 let mut key_values = Vec::new();
766 let mut all_present = true;
767 for prop in unique_props {
768 if let Some(val) = props.get(prop) {
769 key_values.push((prop.clone(), val.clone()));
770 } else {
771 all_present = false;
772 break;
773 }
774 }
775 if all_present {
776 keys.push((serialize_constraint_key(label, &key_values), vid));
777 }
778 }
779 }
780 }
781 for (key, vid) in keys {
782 l0_guard.insert_constraint_key(key, vid);
783 }
784 }
785
786 /// Allocates the next VID (pure auto-increment).
787 pub async fn next_vid(&self) -> Result<Vid> {
788 self.allocator.allocate_vid().await
789 }
790
791 /// Allocates multiple VIDs at once for bulk operations.
792 /// This is more efficient than calling next_vid() in a loop.
793 pub async fn allocate_vids(&self, count: usize) -> Result<Vec<Vid>> {
794 self.allocator.allocate_vids(count).await
795 }
796
797 /// Allocates the next EID (pure auto-increment).
798 pub async fn next_eid(&self, _type_id: u32) -> Result<Eid> {
799 self.allocator.allocate_eid().await
800 }
801
802 /// Allocates multiple EIDs at once for bulk operations.
803 /// This is more efficient than calling next_eid() in a loop.
804 pub async fn allocate_eids(&self, count: usize) -> Result<Vec<Eid>> {
805 self.allocator.allocate_eids(count).await
806 }
807
808 /// Install the embedding runtime exactly once. Receiver is `&self` so it
809 /// can be called after the `Writer` has been wrapped in `Arc<Writer>`.
810 pub fn set_xervo_runtime(&self, runtime: Arc<ModelRuntime>) -> Result<()> {
811 self.xervo_runtime
812 .set(runtime)
813 .map_err(|_| anyhow!("xervo_runtime already set"))
814 }
815
816 pub fn xervo_runtime(&self) -> Option<Arc<ModelRuntime>> {
817 self.xervo_runtime.get().cloned()
818 }
819
820 /// Create a new empty L0 buffer for transaction-scoped mutations.
821 ///
822 /// Only reads the current version — no exclusive lock required on Writer.
823 /// The returned buffer has no WAL reference; mutations are logged at
824 /// commit time via [`Self::commit_transaction_l0`].
825 pub fn create_transaction_l0(&self) -> Arc<RwLock<L0Buffer>> {
826 let current_version = self.l0_manager.get_current().read().current_version;
827 // Transaction mutations are logged to WAL at COMMIT time, not during the transaction.
828 let buf = L0Buffer::new(current_version, None);
829 // SSI: stamp the OCC read sequence at begin so commit can detect any
830 // transaction that committed since. Gated on the runtime `ssi_enabled`
831 // toggle — when off, `occ_read_set` stays `None` and every downstream
832 // read-set recording / commit validation self-gates to a no-op.
833 let buf = if self.config.ssi_enabled {
834 let mut buf = buf;
835 buf.occ_read_seq = self
836 .commit_sequence
837 .load(crate::runtime::sync::Ordering::Relaxed);
838 // The read path records observed ids here for SSI antidependency
839 // detection; commit consults it.
840 buf.occ_read_set = Some(Arc::new(parking_lot::Mutex::new(
841 crate::runtime::l0::OccReadSet::default(),
842 )));
843 buf
844 } else {
845 buf
846 };
847 Arc::new(RwLock::new(buf))
848 }
849
850 /// Resolve the target L0 buffer for a mutation.
851 ///
852 /// When `tx_l0` is `Some`, the mutation targets a transaction-private buffer.
853 /// When `None`, it targets the global L0 from the manager.
854 fn resolve_l0(&self, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) -> Arc<RwLock<L0Buffer>> {
855 tx_l0
856 .cloned()
857 .unwrap_or_else(|| self.l0_manager.get_current())
858 }
859
860 fn update_metrics(&self) {
861 let l0 = self.l0_manager.get_current();
862 let size = l0.read().estimated_size;
863 metrics::gauge!("l0_buffer_size_bytes").set(size as f64);
864 }
865
866 /// Overlay-aware issue-#77 edge-endpoint validation.
867 ///
868 /// The current buffer alone does not hold all committed-but-unflushed
869 /// tombstones — a flush rotation moves them onto `pending_flush` until
870 /// the Lance write completes. A vertex is effectively deleted iff,
871 /// walking newest-first (tx → current → pending newest→oldest), the
872 /// first buffer that knows the vid says "tombstoned" (an insert clears
873 /// the tombstone within a buffer, so props/tombstone are mutually
874 /// exclusive per buffer).
875 ///
876 /// Must run under `flush_lock` so the overlay cannot change before the
877 /// merge, and BEFORE the durable WAL flush (see the call site).
878 fn validate_edge_endpoints_overlay(&self, tx_l0: &L0Buffer) -> Result<()> {
879 let chain = self.l0_manager.get_pending_flush();
880 let current = self.l0_manager.get_current();
881 let effectively_deleted = |vid: &Vid| -> bool {
882 if tx_l0.vertex_properties.contains_key(vid) {
883 return false;
884 }
885 if tx_l0.vertex_tombstones.contains(vid) {
886 return true;
887 }
888 {
889 let cur = current.read();
890 if cur.vertex_properties.contains_key(vid) {
891 return false;
892 }
893 if cur.vertex_tombstones.contains(vid) {
894 return true;
895 }
896 }
897 for frozen in chain.iter().rev() {
898 let g = frozen.read();
899 if g.vertex_properties.contains_key(vid) {
900 return false;
901 }
902 if g.vertex_tombstones.contains(vid) {
903 return true;
904 }
905 }
906 false
907 };
908 for (eid, (src_vid, dst_vid, _etype)) in &tx_l0.edge_endpoints {
909 if tx_l0.tombstones.contains_key(eid) {
910 continue; // a deletion, not an insertion — never resurrects a vertex
911 }
912 if effectively_deleted(src_vid) {
913 anyhow::bail!(
914 "Cannot insert edge {}: source vertex {} has been deleted (issue #77)",
915 eid,
916 src_vid
917 );
918 }
919 if effectively_deleted(dst_vid) {
920 anyhow::bail!(
921 "Cannot insert edge {}: destination vertex {} has been deleted (issue #77)",
922 eid,
923 dst_vid
924 );
925 }
926 }
927 Ok(())
928 }
929
930 /// Seed `main_l0` (the current buffer) with the newest pending-overlay
931 /// value for each CRDT property the transaction writes, so the commit
932 /// merge MERGES against the committed CRDT state instead of shadowing it
933 /// (the carve-out lets concurrent CRDT writers commit on the assumption
934 /// that the merge sees the committed value — true only while that value
935 /// lives in current, not mid-flush on `pending_flush`). No-op when
936 /// nothing is pending or the property already exists in current. Vertex
937 /// properties only, mirroring the carve-out itself.
938 fn seed_crdt_state_from_chain(&self, tx_l0: &L0Buffer, main_l0: &mut L0Buffer) {
939 let chain = self.l0_manager.get_pending_flush();
940 if chain.is_empty() {
941 return;
942 }
943 for (vid, props) in &tx_l0.vertex_properties {
944 Self::seed_crdt_props(&chain, *vid, props, main_l0);
945 }
946 }
947
948 /// Per-vertex CRDT seeding (see [`Self::seed_crdt_state_from_chain`]).
949 /// Also used by the non-transactional vertex write path, which CRDT-merges
950 /// into the current buffer directly and has the same shadowing hazard
951 /// during a flush window.
952 fn seed_crdt_props(
953 chain: &[Arc<RwLock<L0Buffer>>],
954 vid: Vid,
955 props: &Properties,
956 target: &mut L0Buffer,
957 ) {
958 for (key, value) in props {
959 if crate::runtime::l0::try_as_crdt(value).is_none() {
960 continue;
961 }
962 if target
963 .vertex_properties
964 .get(&vid)
965 .is_some_and(|p| p.contains_key(key))
966 {
967 continue;
968 }
969 // Newest generation first: the first hit is the live state.
970 for frozen in chain.iter().rev() {
971 let g = frozen.read();
972 if let Some(v) = g.vertex_properties.get(&vid).and_then(|p| p.get(key)) {
973 if crate::runtime::l0::try_as_crdt(v).is_some() {
974 target
975 .vertex_properties
976 .entry(vid)
977 .or_default()
978 .insert(key.clone(), v.clone());
979 }
980 break;
981 }
982 }
983 }
984 }
985
986 /// Commit an externally-owned transaction L0 buffer.
987 ///
988 /// Writes mutations to WAL, flushes, merges into main L0, and replays
989 /// edges into the AdjacencyManager. Returns the WAL LSN of the commit
990 /// (0 when no WAL is configured).
991 /// Commit a transaction's private L0 buffer into main L0.
992 ///
993 /// Returns `(wal_lsn, flush_pending)`. When `flush_pending == true`, the
994 /// post-commit `should_flush()` predicate fired but no flush ran — the
995 /// caller is expected to spawn a background `flush_to_l1`. This is the
996 /// shape used when `UniConfig::async_flush_enabled` is set, so commits
997 /// don't block on L1-streaming I/O.
998 pub async fn commit_transaction_l0(
999 self: &Arc<Self>,
1000 tx_l0_arc: Arc<RwLock<L0Buffer>>,
1001 ) -> Result<(u64, bool)> {
1002 // Hold `flush_lock` across WAL append + flush + main-L0 merge.
1003 // Two concurrent commits serialize here; in Phase 3 the outer
1004 // `Arc<RwLock<Writer>>` already provides this exclusion, so the
1005 // acquisition is uncontended. Phase 4 drops the outer lock and
1006 // this becomes the load-bearing serialization point.
1007 let _flush_lock_guard = self.flush_lock.lock().await;
1008
1009 // Crash-recovery seam: simulate process death immediately after winning
1010 // the commit serialization point but before any durable work. No-op
1011 // unless built with `--features failpoints`. (See ssi_resilience tests.)
1012 fail::fail_point!("commit::after-flush-lock");
1013
1014 // SSI: optimistic conflict detection. This MUST run before any WAL
1015 // write — `flush_wal()` below is the durable commit point and the WAL
1016 // has no abort marker, so aborting after it would resurrect this
1017 // transaction on crash recovery. The write-set is reused for
1018 // registration after a successful merge.
1019 // Runtime-gated on `config.ssi_enabled`. When off, no validation runs
1020 // and `occ_write_set` is `None`, so the post-merge registration below
1021 // is skipped — reproducing last-writer-wins exactly.
1022 let occ_write_set: Option<crate::runtime::occ::WriteSet> = if self.config.ssi_enabled {
1023 let tx_l0 = tx_l0_arc.read();
1024 let read_seq = tx_l0.occ_read_seq;
1025 let write_set = crate::runtime::occ::WriteSet::from_l0(&tx_l0);
1026 if !write_set.is_empty() {
1027 // Telemetry: one validation per non-empty (writing) commit. The
1028 // ratio of conflicts to validations is the headline abort rate.
1029 metrics::counter!("uni_ssi_commit_validations_total").increment(1);
1030 // Read-set is consulted only for writing transactions, so a
1031 // read-only commit (empty write-set) runs at snapshot isolation.
1032 let read_guard = tx_l0.occ_read_set.as_ref().map(|rs| rs.lock());
1033 if let Some(conflict) =
1034 self.committed_writes
1035 .lock()
1036 .check(read_seq, &write_set, read_guard.as_deref())
1037 {
1038 use crate::runtime::occ::Conflict;
1039 match &conflict {
1040 Conflict::WriteWrite { .. } => metrics::counter!(
1041 "uni_ssi_serialization_conflicts_total",
1042 "kind" => "write_write",
1043 )
1044 .increment(1),
1045 Conflict::ReadWrite { .. } => metrics::counter!(
1046 "uni_ssi_serialization_conflicts_total",
1047 "kind" => "read_write",
1048 )
1049 .increment(1),
1050 Conflict::HistoryTruncated { .. } => {
1051 metrics::counter!("uni_ssi_history_truncated_total").increment(1)
1052 }
1053 }
1054 return Err(anyhow::Error::new(
1055 uni_common::UniError::SerializationConflict {
1056 message: conflict.to_string(),
1057 },
1058 ));
1059 }
1060 }
1061
1062 // Validate against the committed-but-unflushed overlay under
1063 // `flush_lock`: serializable MERGE uniqueness + CRDT carve-out
1064 // soundness. The current buffer alone does not hold all committed
1065 // state — a flush rotation moves it onto `pending_flush` until the
1066 // Lance write completes (the Bug #9A window, here at the
1067 // commit-time layer) — so every check walks [current, pending…].
1068 {
1069 let pending = self.l0_manager.get_pending_flush();
1070 let main_l0 = self.l0_manager.get_current();
1071 let overlay: Vec<Arc<RwLock<L0Buffer>>> =
1072 std::iter::once(main_l0).chain(pending).collect();
1073
1074 // SSI / serializable MERGE: abort if a concurrent transaction has
1075 // already committed a row with one of this transaction's unique
1076 // keys. Commits serialize here, so this closes the race window
1077 // left by the per-insert check. (Empty index → no iterations.)
1078 for (key, vid) in &tx_l0.constraint_index {
1079 if overlay
1080 .iter()
1081 .any(|b| b.read().has_constraint_key(key, *vid))
1082 {
1083 metrics::counter!("uni_ssi_constraint_conflicts_total").increment(1);
1084 return Err(anyhow::Error::new(
1085 uni_common::UniError::ConstraintConflict {
1086 message: "unique key already committed by a concurrent \
1087 transaction"
1088 .to_string(),
1089 },
1090 ));
1091 }
1092 }
1093
1094 // Implicit MERGE phantom guard: a `MERGE` that *created* a node
1095 // registered its (label, key-props) here even with no declared
1096 // UNIQUE constraint. If a concurrent transaction already committed
1097 // the same MERGE key, abort retriably so the two converge to one
1098 // node on retry (the loser's MATCH then finds the committed row).
1099 // Only MERGE-creates register keys, so a plain CREATE of the same
1100 // properties never lands here. (Empty index → no iterations.)
1101 for (key, vid) in &tx_l0.merge_guard_index {
1102 if overlay
1103 .iter()
1104 .any(|b| b.read().has_merge_guard_key(key, *vid))
1105 {
1106 metrics::counter!("uni_ssi_constraint_conflicts_total").increment(1);
1107 return Err(anyhow::Error::new(
1108 uni_common::UniError::ConstraintConflict {
1109 message: "MERGE key already committed by a concurrent \
1110 transaction"
1111 .to_string(),
1112 },
1113 ));
1114 }
1115 }
1116
1117 // Same race window for global ext_id uniqueness: the per-insert
1118 // check ran against an older main L0; re-probe the committed
1119 // index here, where commits serialize.
1120 for (ext_id, vid) in &tx_l0.extid_index {
1121 let taken = overlay.iter().any(|b| {
1122 matches!(b.read().extid_index.get(ext_id), Some(&owner) if owner != *vid)
1123 });
1124 if taken {
1125 metrics::counter!("uni_ssi_constraint_conflicts_total").increment(1);
1126 return Err(anyhow::Error::new(
1127 uni_common::UniError::ConstraintConflict {
1128 message: format!(
1129 "ext_id '{ext_id}' already committed by a concurrent \
1130 transaction"
1131 ),
1132 },
1133 ));
1134 }
1135 }
1136
1137 // CRDT carve-out soundness: a pure-CRDT write was dropped from the
1138 // write-set assuming its merge commutes. If the overlay holds a
1139 // *different* CRDT variant for the same property, the merge would
1140 // silently overwrite it — abort instead of losing the update.
1141 // (Checked against every overlay buffer: conservative if an old
1142 // generation held a different variant that a newer commit already
1143 // replaced, but an abort+retry is always sound.)
1144 for buf in &overlay {
1145 if let Some(conflict) =
1146 crate::runtime::occ::crdt_carveout_overwrite(&tx_l0, &buf.read())
1147 {
1148 metrics::counter!("uni_ssi_crdt_aborts_total").increment(1);
1149 return Err(anyhow::Error::new(
1150 uni_common::UniError::SerializationConflict {
1151 message: conflict.to_string(),
1152 },
1153 ));
1154 }
1155 }
1156 }
1157 Some(write_set)
1158 } else {
1159 None
1160 };
1161
1162 // Issue #77: an edge whose endpoint is effectively deleted makes the
1163 // merge below bail. That bail MUST happen before the durable WAL flush —
1164 // after it the transaction is committed-but-unmerged (a ghost commit),
1165 // and WAL replay re-hits the same bail, making the database unopenable.
1166 // SSI validation was deliberately placed before the flush for exactly
1167 // this reason; the endpoint check belongs here too. Runs unconditionally
1168 // (issue #77 is not SSI-gated) under `flush_lock`, so the overlay
1169 // tombstone state cannot change between here and the merge. A
1170 // tombstone may live in a flush-rotated pending buffer rather than
1171 // the current buffer, so the check walks the overlay newest-first.
1172 {
1173 let tx_l0 = tx_l0_arc.read();
1174 self.validate_edge_endpoints_overlay(&tx_l0)?;
1175 }
1176
1177 // Crash-recovery seam: SSI validation has passed; the transaction is
1178 // about to become durable. A crash here must leave NO trace (validation
1179 // happens before the WAL is touched). No-op unless `failpoints`.
1180 fail::fail_point!("commit::after-validate");
1181
1182 // 1. Write transaction mutations to WAL BEFORE merging into main L0
1183 // This ensures durability before visibility.
1184 {
1185 let tx_l0 = tx_l0_arc.read();
1186 let main_l0_arc = self.l0_manager.get_current();
1187 let main_l0 = main_l0_arc.read();
1188
1189 // If WAL exists, write mutations to it for durability
1190 if let Some(wal) = main_l0.wal.as_ref() {
1191 // Order: vertices first, then edges (to ensure src/dst exist on replay)
1192
1193 // Vertex insertions
1194 for (vid, properties) in &tx_l0.vertex_properties {
1195 if !tx_l0.vertex_tombstones.contains(vid) {
1196 let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
1197 wal.append(crate::runtime::wal::Mutation::InsertVertex {
1198 vid: *vid,
1199 properties: properties.clone(),
1200 labels,
1201 })?;
1202 }
1203 }
1204
1205 // Vertex deletions
1206 for vid in &tx_l0.vertex_tombstones {
1207 let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
1208 wal.append(crate::runtime::wal::Mutation::DeleteVertex { vid: *vid, labels })?;
1209 }
1210
1211 // Label-only mutations (SET n:Label / REMOVE n:Label). After
1212 // vertex inserts (so the vertex exists on replay), before edges,
1213 // and skipping vertices deleted in this same commit.
1214 for vid in &tx_l0.vertex_label_overwrites {
1215 if tx_l0.vertex_tombstones.contains(vid) {
1216 continue;
1217 }
1218 let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
1219 wal.append(crate::runtime::wal::Mutation::SetVertexLabels {
1220 vid: *vid,
1221 labels,
1222 })?;
1223 }
1224
1225 // Crash-recovery seam: vertices appended, edges not yet. Tests
1226 // assert that a crash here (before `flush_wal`) recovers NOTHING
1227 // — the durable commit point is the flush below, not append.
1228 fail::fail_point!("commit::mid-wal");
1229
1230 // Edge insertions and deletions from edge_endpoints
1231 for (eid, (src_vid, dst_vid, edge_type)) in &tx_l0.edge_endpoints {
1232 if tx_l0.tombstones.contains_key(eid) {
1233 let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
1234 wal.append(crate::runtime::wal::Mutation::DeleteEdge {
1235 eid: *eid,
1236 src_vid: *src_vid,
1237 dst_vid: *dst_vid,
1238 edge_type: *edge_type,
1239 version,
1240 })?;
1241 } else {
1242 let properties =
1243 tx_l0.edge_properties.get(eid).cloned().unwrap_or_default();
1244 let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
1245 let edge_type_name = tx_l0.edge_types.get(eid).cloned();
1246 wal.append(crate::runtime::wal::Mutation::InsertEdge {
1247 src_vid: *src_vid,
1248 dst_vid: *dst_vid,
1249 edge_type: *edge_type,
1250 eid: *eid,
1251 version,
1252 properties,
1253 edge_type_name,
1254 })?;
1255 }
1256 }
1257
1258 // Tombstones for edges that only exist in the global L0 (not in
1259 // this transaction's edge_endpoints). Without this, deletes of
1260 // pre-existing edges would be silently lost.
1261 for (eid, tombstone) in &tx_l0.tombstones {
1262 if !tx_l0.edge_endpoints.contains_key(eid) {
1263 let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
1264 wal.append(crate::runtime::wal::Mutation::DeleteEdge {
1265 eid: *eid,
1266 src_vid: tombstone.src_vid,
1267 dst_vid: tombstone.dst_vid,
1268 edge_type: tombstone.edge_type,
1269 version,
1270 })?;
1271 }
1272 }
1273 }
1274 }
1275
1276 // 2. Flush WAL to durable storage - THIS IS THE COMMIT POINT
1277 let wal_lsn = self.flush_wal().await?;
1278
1279 // Crash-recovery seam: the WAL is durable but main L0 has NOT merged.
1280 // A crash here must RECOVER the transaction on replay (it is committed),
1281 // even though it was never made visible in-process. No-op unless `failpoints`.
1282 fail::fail_point!("commit::after-wal-flush");
1283
1284 // Component C1: if an outstanding snapshot pins the current generation,
1285 // clone it aside (lazy copy-on-write) before merging, so the pinning
1286 // transaction's reads stay isolated from this commit. No-op — and zero
1287 // cost — when nothing is pinned (the common case). We hold `flush_lock`,
1288 // so this cannot race a flush rotate or another commit's merge; the merge
1289 // below re-fetches `get_current()`, landing in the fresh post-freeze buffer.
1290 // Self-gates on the runtime SSI toggle: a snapshot is only ever pinned by
1291 // a transaction begun under `ssi_enabled`, so `is_current_pinned()` is
1292 // always false when SSI is off and this is a zero-cost no-op.
1293 if self.l0_manager.is_current_pinned() {
1294 self.l0_manager.freeze_current_for_snapshot();
1295 metrics::counter!("uni_l0_snapshot_freezes_total").increment(1);
1296 }
1297
1298 // 3. Merge into main L0 and make visible
1299 {
1300 // Write-lock the tx buffer: `merge_take` moves its property maps
1301 // into main L0 instead of cloning them. The commit consumes the
1302 // transaction, so the drained maps are never observed afterwards;
1303 // everything read below (endpoints, versions, tombstones) is left
1304 // intact.
1305 let mut tx_l0 = tx_l0_arc.write();
1306 let main_l0_arc = self.l0_manager.get_current();
1307 let mut main_l0 = main_l0_arc.write();
1308 // A CRDT property's committed state may live only in a
1309 // flush-rotated pending buffer (the post-rotation current is
1310 // empty until the Lance write completes — the Bug #9A window).
1311 // `merge_crdt_properties` merges against the CURRENT buffer's
1312 // value — without seeding, the tx's CRDT state would SHADOW the
1313 // pending buffer's at read time (newest buffer wins per property)
1314 // and concurrent increments would be lost. Seed the newest
1315 // overlay value for each CRDT property the tx writes that current
1316 // lacks, so the merge below merges instead of replaces.
1317 self.seed_crdt_state_from_chain(&tx_l0, &mut main_l0);
1318 main_l0.merge_take(&mut tx_l0)?;
1319
1320 // Replay transaction edges into the AdjacencyManager overlay
1321 for (eid, (src, dst, etype)) in &tx_l0.edge_endpoints {
1322 let edge_version = tx_l0
1323 .edge_versions
1324 .get(eid)
1325 .copied()
1326 .unwrap_or(main_l0.current_version);
1327 if tx_l0.tombstones.contains_key(eid) {
1328 self.adjacency_manager
1329 .add_tombstone(*eid, *src, *dst, *etype, edge_version);
1330 } else {
1331 self.adjacency_manager
1332 .insert_edge(*src, *dst, *eid, *etype, edge_version);
1333 }
1334 }
1335
1336 // Replay tombstones for edges that only exist in the global L0
1337 // (not in this transaction's edge_endpoints).
1338 for (eid, tombstone) in &tx_l0.tombstones {
1339 if !tx_l0.edge_endpoints.contains_key(eid) {
1340 let edge_version = tx_l0
1341 .edge_versions
1342 .get(eid)
1343 .copied()
1344 .unwrap_or(main_l0.current_version);
1345 self.adjacency_manager.add_tombstone(
1346 *eid,
1347 tombstone.src_vid,
1348 tombstone.dst_vid,
1349 tombstone.edge_type,
1350 edge_version,
1351 );
1352 }
1353 }
1354 }
1355
1356 // Crash-recovery seam: durable AND merged, but the in-memory commit
1357 // registry has not recorded this write-set yet. A crash here is
1358 // indistinguishable from one at `after-wal-flush` on reopen (the
1359 // registry is in-memory and rebuilt empty); the tx still recovers.
1360 fail::fail_point!("commit::after-merge");
1361
1362 // SSI: register this commit's write-set under a fresh commit sequence so
1363 // later transactions detect conflicts against it. Still under
1364 // `flush_lock`, before the async-flush branch can drop the guard.
1365 // `occ_write_set` is `Some` only when `config.ssi_enabled`.
1366 if let Some(write_set) = occ_write_set
1367 && !write_set.is_empty()
1368 {
1369 // Bump-then-record via the shared OCC seam (see `CommitRegistry::commit`)
1370 // so production and the loom/shuttle models exercise identical logic.
1371 self.committed_writes
1372 .lock()
1373 .commit(&self.commit_sequence, write_set);
1374 }
1375
1376 self.update_metrics();
1377
1378 // 4. Best-effort post-commit auto-flush.
1379 //
1380 // Two paths:
1381 // - async_flush_enabled = false (default): inline under our
1382 // existing flush_lock guard via flush_inline_under_lock.
1383 // - async_flush_enabled = true: rotate inline, drop flush_lock,
1384 // then submit the stream phase to the coordinator. Gated on
1385 // `pending_flush_count() < max_pending_flushes` so we don't
1386 // stack up rotations beyond the configured pipeline depth.
1387 // `try_acquire_permit` is non-blocking: if we lose the race
1388 // for the last permit, we just skip this trigger (the next
1389 // commit retries).
1390 let mut flush_pending = false;
1391 if self.should_flush() {
1392 if self.config.async_flush_enabled
1393 && let Some(coord) = self.flush_coordinator.as_ref()
1394 && coord.pending_flush_count() < self.config.max_pending_flushes
1395 {
1396 match coord.try_acquire_permit() {
1397 Some(permit) => {
1398 match self.flush_l0_rotate().await {
1399 Ok(rotate_out) => {
1400 // Allocate the rotate seq and bump pending ONLY
1401 // after the rotate succeeds (Bug #3). A failed
1402 // rotate must consume neither: the finalizer
1403 // advances strictly in consecutive seq order and
1404 // only decrements pending on finalize, so a
1405 // leaked seq/pending from a failed rotate would
1406 // wedge the finalizer forever and climb pending
1407 // toward `max_pending_flushes`. The seq is still
1408 // allocated under `flush_lock` (immediately after
1409 // the rotate, before the guard drops below), so
1410 // concurrent rotates keep seq order == rotation
1411 // order, and the seq is not used until submit.
1412 let seq = coord.next_rotate_seq();
1413 coord.note_pending();
1414 // Release flush_lock BEFORE the spawn so concurrent
1415 // commits can proceed while the stream runs.
1416 drop(_flush_lock_guard);
1417 let parent_manifest = self.cached_manifest.lock().clone();
1418 let rotated = crate::runtime::flush_coordinator::RotatedFlush {
1419 seq,
1420 old_l0_arc: rotate_out.old_l0_arc.clone(),
1421 wal_lsn: rotate_out.wal_lsn,
1422 current_version: rotate_out.current_version,
1423 name: None,
1424 parent_manifest,
1425 permit,
1426 flush_in_progress_guard: rotate_out.flush_in_progress_guard,
1427 };
1428 let writer = self.clone();
1429 let _ticket = coord.submit_for_stream(
1430 rotated,
1431 move |old_l0, wal, ver, n| async move {
1432 let outcome =
1433 writer.flush_stream_l1(old_l0, wal, ver, n).await?;
1434 Ok(crate::runtime::flush_coordinator::FlushOutcome {
1435 new_manifest: outcome.manifest,
1436 snapshot_id: outcome.snapshot_id,
1437 })
1438 },
1439 );
1440 flush_pending = true;
1441 // Early return — flush_lock already dropped.
1442 return Ok((wal_lsn, flush_pending));
1443 }
1444 Err(e) => {
1445 tracing::warn!("Async rotate failed (non-critical): {}", e);
1446 // No seq was allocated and pending was not
1447 // bumped (both moved into the Ok arm for Bug
1448 // #3), so the finalizer is not wedged. The
1449 // permit drops here, freeing the slot.
1450 }
1451 }
1452 }
1453 None => {
1454 // Race: someone else grabbed the last permit. Skip;
1455 // next commit will retry should_flush().
1456 metrics::counter!("uni_flush_trigger_skipped_total").increment(1);
1457 }
1458 }
1459 } else if let Err(e) = self.flush_inline_under_lock(None).await {
1460 tracing::warn!("Post-commit flush check failed (non-critical): {}", e);
1461 }
1462 }
1463
1464 Ok((wal_lsn, flush_pending))
1465 }
1466
1467 /// Flush the WAL buffer to durable storage.
1468 ///
1469 /// Returns the LSN of the flushed segment, or `0` when no WAL is configured.
1470 pub async fn flush_wal(&self) -> Result<u64> {
1471 let l0 = self.l0_manager.get_current();
1472 let wal = l0.read().wal.clone();
1473
1474 match wal {
1475 Some(wal) => Ok(wal.flush().await?),
1476 None => Ok(0),
1477 }
1478 }
1479
1480 /// Record property removals in the active L0 mutation stats.
1481 ///
1482 /// Routes to the transaction L0 if provided, otherwise to the main L0.
1483 pub fn track_properties_removed(&self, count: usize, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) {
1484 if count == 0 {
1485 return;
1486 }
1487 let l0 = self.resolve_l0(tx_l0);
1488 l0.write().mutation_stats.properties_removed += count;
1489 }
1490
1491 /// Validates vertex constraints for the given properties.
1492 /// In the new design, label is passed as a parameter since VID no longer embeds label.
1493 async fn validate_vertex_constraints_for_label(
1494 &self,
1495 vid: Vid,
1496 properties: &Properties,
1497 label: &str,
1498 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1499 ) -> Result<()> {
1500 self.validate_vertex_constraints_for_label_impl(vid, properties, label, tx_l0, false)
1501 .await
1502 }
1503
1504 /// Partial-update sibling: validates only constraints touching keys
1505 /// present in `properties` (the touched set). NOT NULL is checked
1506 /// only for touched keys; multi-key UNIQUE / CHECK / EXISTS are
1507 /// skipped when any referenced key is absent (the caller is
1508 /// expected to have routed to the full-row path in that case via
1509 /// `touched_needs_full_read`).
1510 async fn validate_vertex_constraints_for_label_partial(
1511 &self,
1512 vid: Vid,
1513 properties: &Properties,
1514 label: &str,
1515 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1516 ) -> Result<()> {
1517 self.validate_vertex_constraints_for_label_impl(vid, properties, label, tx_l0, true)
1518 .await
1519 }
1520
1521 async fn validate_vertex_constraints_for_label_impl(
1522 &self,
1523 vid: Vid,
1524 properties: &Properties,
1525 label: &str,
1526 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1527 partial: bool,
1528 ) -> Result<()> {
1529 let schema = self.schema_manager.schema();
1530
1531 {
1532 // 1. Check NOT NULL constraints (from Property definitions).
1533 // Under partial-update mode, skip properties NOT in
1534 // `properties` — they retain their previous (already-
1535 // validated) value.
1536 if let Some(props_meta) = schema.properties.get(label) {
1537 for (prop_name, meta) in props_meta {
1538 if !meta.nullable {
1539 let present = properties.get(prop_name);
1540 if partial && present.is_none() {
1541 continue;
1542 }
1543 if present.is_none_or(|v| v.is_null()) {
1544 log::warn!(
1545 "Constraint violation: Property '{}' cannot be null for label '{}'",
1546 prop_name,
1547 label
1548 );
1549 return Err(anyhow!(
1550 "Constraint violation: Property '{}' cannot be null",
1551 prop_name
1552 ));
1553 }
1554 }
1555 }
1556 }
1557
1558 // 2. Check Explicit Constraints (Unique, Check, etc.)
1559 for constraint in &schema.constraints {
1560 if !constraint.enabled {
1561 continue;
1562 }
1563 match &constraint.target {
1564 ConstraintTarget::Label(l) if l == label => {}
1565 _ => continue,
1566 }
1567
1568 match &constraint.constraint_type {
1569 ConstraintType::Unique {
1570 properties: unique_props,
1571 } => {
1572 // Support single and multi-property unique constraints
1573 if !unique_props.is_empty() {
1574 let mut key_values = Vec::new();
1575 let mut missing = false;
1576 for prop in unique_props {
1577 if let Some(val) = properties.get(prop) {
1578 key_values.push((prop.clone(), val.clone()));
1579 } else {
1580 missing = true; // Can't enforce if property missing (partial update?)
1581 // For INSERT, missing means null?
1582 // If property is nullable, unique constraint typically allows multiple nulls or ignores?
1583 // For now, only check if ALL keys are present
1584 }
1585 }
1586
1587 if !missing {
1588 self.check_unique_constraint_multi(label, &key_values, vid, tx_l0)
1589 .await?;
1590 }
1591 }
1592 }
1593 ConstraintType::Exists { property } => {
1594 if properties.get(property).is_none_or(|v| v.is_null()) {
1595 log::warn!(
1596 "Constraint violation: Property '{}' must exist for label '{}'",
1597 property,
1598 label
1599 );
1600 return Err(anyhow!(
1601 "Constraint violation: Property '{}' must exist",
1602 property
1603 ));
1604 }
1605 }
1606 ConstraintType::Check { expression } => {
1607 if !self.evaluate_check_constraint(expression, properties)? {
1608 return Err(anyhow!(
1609 "CHECK constraint '{}' violated: expression '{}' evaluated to false",
1610 constraint.name,
1611 expression
1612 ));
1613 }
1614 }
1615 _ => {
1616 return Err(anyhow!("Unsupported constraint type"));
1617 }
1618 }
1619 }
1620 }
1621 Ok(())
1622 }
1623
1624 /// Validates vertex constraints for a vertex with the given labels.
1625 /// Labels must be passed explicitly since the vertex may not yet be in L0.
1626 /// Unknown labels (not in schema) are skipped.
1627 async fn validate_vertex_constraints(
1628 &self,
1629 vid: Vid,
1630 properties: &Properties,
1631 labels: &[String],
1632 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1633 ) -> Result<()> {
1634 let schema = self.schema_manager.schema();
1635
1636 // Validate constraints only for known labels
1637 for label in labels {
1638 // Skip unknown labels (schemaless support)
1639 if schema.get_label_case_insensitive(label).is_none() {
1640 continue;
1641 }
1642 self.validate_vertex_constraints_for_label(vid, properties, label, tx_l0)
1643 .await?;
1644 }
1645
1646 // Check global ext_id uniqueness if ext_id is provided
1647 if let Some(ext_id) = properties.get("ext_id").and_then(|v| v.as_str()) {
1648 self.check_extid_globally_unique(ext_id, vid, tx_l0).await?;
1649 }
1650
1651 Ok(())
1652 }
1653
1654 /// Partial sibling of `validate_vertex_constraints` — validates only
1655 /// constraints touching keys present in `properties`. Used by
1656 /// `insert_vertex_partial`'s fast path; the caller pre-screens for
1657 /// multi-key UNIQUE constraints via `touched_needs_full_read`.
1658 async fn validate_vertex_constraints_partial(
1659 &self,
1660 vid: Vid,
1661 touched: &Properties,
1662 labels: &[String],
1663 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1664 ) -> Result<()> {
1665 let schema = self.schema_manager.schema();
1666 for label in labels {
1667 if schema.get_label_case_insensitive(label).is_none() {
1668 continue;
1669 }
1670 self.validate_vertex_constraints_for_label_partial(vid, touched, label, tx_l0)
1671 .await?;
1672 }
1673 if let Some(ext_id) = touched.get("ext_id").and_then(|v| v.as_str()) {
1674 self.check_extid_globally_unique(ext_id, vid, tx_l0).await?;
1675 }
1676 Ok(())
1677 }
1678
1679 /// Collect ext_ids and unique constraint keys from an iterator of vertex properties.
1680 ///
1681 /// Used to build a constraint key index from L0 buffers for batch validation.
1682 fn collect_constraint_keys_from_properties<'a>(
1683 properties_iter: impl Iterator<Item = &'a Properties>,
1684 label: &str,
1685 constraints: &[uni_common::core::schema::Constraint],
1686 existing_keys: &mut HashMap<String, HashSet<String>>,
1687 existing_extids: &mut HashSet<String>,
1688 ) {
1689 for props in properties_iter {
1690 if let Some(ext_id) = props.get("ext_id").and_then(|v| v.as_str()) {
1691 existing_extids.insert(ext_id.to_string());
1692 }
1693
1694 for constraint in constraints {
1695 if !constraint.enabled {
1696 continue;
1697 }
1698 if let ConstraintTarget::Label(l) = &constraint.target {
1699 if l != label {
1700 continue;
1701 }
1702 } else {
1703 continue;
1704 }
1705
1706 if let ConstraintType::Unique {
1707 properties: unique_props,
1708 } = &constraint.constraint_type
1709 {
1710 let mut key_parts = Vec::new();
1711 let mut all_present = true;
1712 for prop in unique_props {
1713 if let Some(val) = props.get(prop) {
1714 key_parts.push(format!("{}:{}", prop, val));
1715 } else {
1716 all_present = false;
1717 break;
1718 }
1719 }
1720 if all_present {
1721 let key = key_parts.join("|");
1722 existing_keys
1723 .entry(constraint.name.clone())
1724 .or_default()
1725 .insert(key);
1726 }
1727 }
1728 }
1729 }
1730 }
1731
1732 /// Validates constraints for a batch of vertices efficiently.
1733 ///
1734 /// This method builds an in-memory index from L0 buffers ONCE instead of scanning
1735 /// per vertex, reducing complexity from O(n²) to O(n) for bulk inserts.
1736 ///
1737 /// # Arguments
1738 /// * `vids` - VIDs of vertices being inserted
1739 /// * `properties_batch` - Properties for each vertex
1740 /// * `label` - Label for all vertices (assumes single label for now)
1741 ///
1742 /// # Performance
1743 /// For N vertices with unique constraints:
1744 /// - Old approach: O(N²) - scan L0 buffer N times
1745 /// - New approach: O(N) - scan L0 buffer once, build HashSet, check each vertex in O(1)
1746 async fn validate_vertex_batch_constraints(
1747 &self,
1748 vids: &[Vid],
1749 properties_batch: &[Properties],
1750 label: &str,
1751 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1752 ) -> Result<()> {
1753 if vids.len() != properties_batch.len() {
1754 return Err(anyhow!("VID/properties length mismatch"));
1755 }
1756
1757 let schema = self.schema_manager.schema();
1758
1759 // 1. Validate NOT NULL constraints for each vertex
1760 if let Some(props_meta) = schema.properties.get(label) {
1761 for (idx, properties) in properties_batch.iter().enumerate() {
1762 for (prop_name, meta) in props_meta {
1763 if !meta.nullable && properties.get(prop_name).is_none_or(|v| v.is_null()) {
1764 return Err(anyhow!(
1765 "Constraint violation at index {}: Property '{}' cannot be null",
1766 idx,
1767 prop_name
1768 ));
1769 }
1770 }
1771 }
1772 }
1773
1774 // 2. Build constraint key index from L0 buffers (ONCE for entire batch)
1775 let mut existing_keys: HashMap<String, HashSet<String>> = HashMap::new();
1776 let mut existing_extids: HashSet<String> = HashSet::new();
1777
1778 // Scan current L0 buffer
1779 {
1780 let l0 = self.l0_manager.get_current();
1781 let l0_guard = l0.read();
1782 Self::collect_constraint_keys_from_properties(
1783 l0_guard.vertex_properties.values(),
1784 label,
1785 &schema.constraints,
1786 &mut existing_keys,
1787 &mut existing_extids,
1788 );
1789 }
1790
1791 // Scan transaction L0 if present
1792 if let Some(tx_l0) = tx_l0 {
1793 let tx_l0_guard = tx_l0.read();
1794 Self::collect_constraint_keys_from_properties(
1795 tx_l0_guard.vertex_properties.values(),
1796 label,
1797 &schema.constraints,
1798 &mut existing_keys,
1799 &mut existing_extids,
1800 );
1801 }
1802
1803 // 3. Check batch vertices against index AND check for duplicates within batch
1804 let mut batch_keys: HashMap<String, HashMap<String, usize>> = HashMap::new();
1805 let mut batch_extids: HashMap<String, usize> = HashMap::new();
1806
1807 for (idx, (_vid, properties)) in vids.iter().zip(properties_batch.iter()).enumerate() {
1808 // Check ext_id uniqueness
1809 if let Some(ext_id) = properties.get("ext_id").and_then(|v| v.as_str()) {
1810 if existing_extids.contains(ext_id) {
1811 return Err(anyhow!(
1812 "Constraint violation at index {}: ext_id '{}' already exists",
1813 idx,
1814 ext_id
1815 ));
1816 }
1817 if let Some(first_idx) = batch_extids.get(ext_id) {
1818 return Err(anyhow!(
1819 "Constraint violation: ext_id '{}' duplicated in batch at indices {} and {}",
1820 ext_id,
1821 first_idx,
1822 idx
1823 ));
1824 }
1825 // Also check the main vertices table — the L0 scans above
1826 // miss vertices already flushed to L1, so without this a
1827 // batch insert (e.g. a fork promote onto primary) silently
1828 // twins a duplicate ext_id instead of erroring. Mirrors the
1829 // single-vertex `check_extid_globally_unique`.
1830 if let Ok(Some(found_vid)) =
1831 MainVertexDataset::find_by_ext_id(self.storage.backend(), ext_id, None).await
1832 {
1833 return Err(anyhow!(
1834 "Constraint violation at index {}: ext_id '{}' already exists (vertex {:?})",
1835 idx,
1836 ext_id,
1837 found_vid
1838 ));
1839 }
1840 batch_extids.insert(ext_id.to_string(), idx);
1841 }
1842
1843 // Check unique constraints
1844 for constraint in &schema.constraints {
1845 if !constraint.enabled {
1846 continue;
1847 }
1848 if let ConstraintTarget::Label(l) = &constraint.target {
1849 if l != label {
1850 continue;
1851 }
1852 } else {
1853 continue;
1854 }
1855
1856 match &constraint.constraint_type {
1857 ConstraintType::Unique {
1858 properties: unique_props,
1859 } => {
1860 let mut key_parts = Vec::new();
1861 let mut all_present = true;
1862 for prop in unique_props {
1863 if let Some(val) = properties.get(prop) {
1864 key_parts.push(format!("{}:{}", prop, val));
1865 } else {
1866 all_present = false;
1867 break;
1868 }
1869 }
1870
1871 if all_present {
1872 let key = key_parts.join("|");
1873
1874 // Check against existing L0 keys
1875 if let Some(keys) = existing_keys.get(&constraint.name)
1876 && keys.contains(&key)
1877 {
1878 return Err(anyhow!(
1879 "Constraint violation at index {}: Duplicate composite key for label '{}' (constraint '{}')",
1880 idx,
1881 label,
1882 constraint.name
1883 ));
1884 }
1885
1886 // Check for duplicates within batch
1887 let batch_constraint_keys =
1888 batch_keys.entry(constraint.name.clone()).or_default();
1889 if let Some(first_idx) = batch_constraint_keys.get(&key) {
1890 return Err(anyhow!(
1891 "Constraint violation: Duplicate key '{}' in batch at indices {} and {}",
1892 key,
1893 first_idx,
1894 idx
1895 ));
1896 }
1897 batch_constraint_keys.insert(key, idx);
1898 }
1899 }
1900 ConstraintType::Exists { property }
1901 if properties.get(property).is_none_or(|v| v.is_null()) =>
1902 {
1903 return Err(anyhow!(
1904 "Constraint violation at index {}: Property '{}' must exist",
1905 idx,
1906 property
1907 ));
1908 }
1909 ConstraintType::Check { expression }
1910 if !self.evaluate_check_constraint(expression, properties)? =>
1911 {
1912 return Err(anyhow!(
1913 "Constraint violation at index {}: CHECK constraint '{}' violated",
1914 idx,
1915 constraint.name
1916 ));
1917 }
1918 _ => {}
1919 }
1920 }
1921 }
1922
1923 // 4. Check storage for unique constraints (can batch this into a single query)
1924 for constraint in &schema.constraints {
1925 if !constraint.enabled {
1926 continue;
1927 }
1928 if let ConstraintTarget::Label(l) = &constraint.target {
1929 if l != label {
1930 continue;
1931 }
1932 } else {
1933 continue;
1934 }
1935
1936 if let ConstraintType::Unique {
1937 properties: unique_props,
1938 } = &constraint.constraint_type
1939 {
1940 // Build compound OR filter for all batch vertices
1941 let mut or_filters = Vec::new();
1942 for properties in properties_batch.iter() {
1943 let mut and_parts = Vec::new();
1944 let mut all_present = true;
1945 for prop in unique_props {
1946 if let Some(val) = properties.get(prop) {
1947 let val_str = match val {
1948 Value::String(s) => format!("'{}'", s.replace('\'', "''")),
1949 Value::Int(n) => n.to_string(),
1950 Value::Float(f) => f.to_string(),
1951 Value::Bool(b) => b.to_string(),
1952 _ => {
1953 all_present = false;
1954 break;
1955 }
1956 };
1957 and_parts.push(format!("{} = {}", prop, val_str));
1958 } else {
1959 all_present = false;
1960 break;
1961 }
1962 }
1963 if all_present {
1964 or_filters.push(format!("({})", and_parts.join(" AND ")));
1965 }
1966 }
1967
1968 #[cfg(feature = "lance-backend")]
1969 if !or_filters.is_empty() {
1970 let vid_list: Vec<String> =
1971 vids.iter().map(|v| v.as_u64().to_string()).collect();
1972 let filter = format!(
1973 "({}) AND _deleted = false AND _vid NOT IN ({})",
1974 or_filters.join(" OR "),
1975 vid_list.join(", ")
1976 );
1977
1978 // Count flushed duplicates through the `StorageBackend`
1979 // (branch-aware, correct `.lance` path). A missing table
1980 // means nothing is flushed yet — the L0/pending/tx checks
1981 // above already covered in-memory rows — so skip cleanly;
1982 // any other backend error must abort the write rather than
1983 // silently fail open (the prior `open_raw()` foot-gun).
1984 let backend = self.storage.backend();
1985 let table = table_names::vertex_table_name(label);
1986 if backend.table_exists(&table).await? {
1987 let count = backend.count_rows(&table, Some(filter.as_str())).await?;
1988 if count > 0 {
1989 return Err(anyhow!(
1990 "Constraint violation: Duplicate composite key for label '{}' in storage (constraint '{}')",
1991 label,
1992 constraint.name
1993 ));
1994 }
1995 }
1996 }
1997 }
1998 }
1999
2000 Ok(())
2001 }
2002
2003 /// Checks that ext_id is globally unique across all vertices.
2004 ///
2005 /// Searches L0 buffers (current, transaction, pending) and the main vertices table
2006 /// to ensure no other vertex uses this ext_id.
2007 ///
2008 /// # Errors
2009 ///
2010 /// Returns error if another vertex with the same ext_id exists.
2011 async fn check_extid_globally_unique(
2012 &self,
2013 ext_id: &str,
2014 current_vid: Vid,
2015 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2016 ) -> Result<()> {
2017 // Check L0 buffers: current, transaction, and pending flush
2018 let l0_buffers_to_check: Vec<Arc<RwLock<L0Buffer>>> = {
2019 let mut buffers = vec![self.l0_manager.get_current()];
2020 if let Some(tx_l0) = tx_l0 {
2021 buffers.push(tx_l0.clone());
2022 }
2023 buffers.extend(self.l0_manager.get_pending_flush());
2024 buffers
2025 };
2026
2027 for l0 in &l0_buffers_to_check {
2028 // O(1) per buffer via the maintained `extid_index` (the previous
2029 // full `vertex_properties` scan made constrained ingest O(n²)).
2030 if let Some(&vid) = l0.read().extid_index.get(ext_id)
2031 && vid != current_vid
2032 {
2033 return Err(anyhow!(
2034 "Constraint violation: ext_id '{}' already exists (vertex {:?})",
2035 ext_id,
2036 vid
2037 ));
2038 }
2039 }
2040
2041 // Check main vertices table (if it exists)
2042 // Pass None for global uniqueness check (not snapshot-isolated)
2043 let backend = self.storage.backend();
2044 if let Ok(Some(found_vid)) = MainVertexDataset::find_by_ext_id(backend, ext_id, None).await
2045 && found_vid != current_vid
2046 {
2047 return Err(anyhow!(
2048 "Constraint violation: ext_id '{}' already exists (vertex {:?})",
2049 ext_id,
2050 found_vid
2051 ));
2052 }
2053
2054 Ok(())
2055 }
2056
2057 /// Helper to get vertex labels from L0 buffer.
2058 fn get_vertex_labels_from_l0(&self, vid: Vid) -> Option<Vec<String>> {
2059 let l0 = self.l0_manager.get_current();
2060 let l0_guard = l0.read();
2061 // Check if vertex is tombstoned (deleted) - if so, return None
2062 if l0_guard.vertex_tombstones.contains(&vid) {
2063 return None;
2064 }
2065 l0_guard.get_vertex_labels(vid).map(|l| l.to_vec())
2066 }
2067
2068 /// Get vertex labels from all sources: current L0, pending L0s, and storage.
2069 /// This is the proper way to read vertex labels after a flush, as it checks both
2070 /// in-memory buffers and persisted storage.
2071 pub async fn get_vertex_labels(
2072 &self,
2073 vid: Vid,
2074 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2075 ) -> Option<Vec<String>> {
2076 // 1. Check current L0
2077 if let Some(labels) = self.get_vertex_labels_from_l0(vid) {
2078 return Some(labels);
2079 }
2080
2081 // 2. Check transaction L0 if present
2082 if let Some(tx_l0) = tx_l0 {
2083 let guard = tx_l0.read();
2084 if guard.vertex_tombstones.contains(&vid) {
2085 return None;
2086 }
2087 if let Some(labels) = guard.get_vertex_labels(vid) {
2088 return Some(labels.to_vec());
2089 }
2090 }
2091
2092 // 3. Check pending flush L0s
2093 for pending_l0 in self.l0_manager.get_pending_flush() {
2094 let guard = pending_l0.read();
2095 if guard.vertex_tombstones.contains(&vid) {
2096 return None;
2097 }
2098 if let Some(labels) = guard.get_vertex_labels(vid) {
2099 return Some(labels.to_vec());
2100 }
2101 }
2102
2103 // 4. Check storage
2104 self.find_vertex_labels_in_storage(vid).await.ok().flatten()
2105 }
2106
2107 /// Helper to get edge type from L0 buffer.
2108 fn get_edge_type_from_l0(&self, eid: Eid) -> Option<String> {
2109 let l0 = self.l0_manager.get_current();
2110 let l0_guard = l0.read();
2111 l0_guard.get_edge_type(eid).map(|s| s.to_string())
2112 }
2113
2114 /// Look up the edge type ID (u32) for an EID from the L0 buffer's edge endpoints.
2115 /// Falls back to the transaction L0 if available.
2116 pub fn get_edge_type_id_from_l0(
2117 &self,
2118 eid: Eid,
2119 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2120 ) -> Option<u32> {
2121 // Check transaction L0 first
2122 if let Some(tx_l0) = tx_l0 {
2123 let guard = tx_l0.read();
2124 if let Some((_, _, etype)) = guard.get_edge_endpoint_full(eid) {
2125 return Some(etype);
2126 }
2127 }
2128 // Fall back to main L0
2129 let l0 = self.l0_manager.get_current();
2130 let l0_guard = l0.read();
2131 l0_guard
2132 .get_edge_endpoint_full(eid)
2133 .map(|(_, _, etype)| etype)
2134 }
2135
2136 /// Set the type name for an edge (used for schemaless edge types).
2137 /// This is called during CREATE for edge types not found in the schema.
2138 pub fn set_edge_type(
2139 &self,
2140 eid: Eid,
2141 type_name: String,
2142 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2143 ) {
2144 self.resolve_l0(tx_l0).write().set_edge_type(eid, type_name);
2145 }
2146
2147 /// Evaluate a simple CHECK constraint expression.
2148 /// Supports: "property op value" (e.g., "age > 18", "status = 'active'")
2149 fn evaluate_check_constraint(&self, expression: &str, properties: &Properties) -> Result<bool> {
2150 let parts: Vec<&str> = expression.split_whitespace().collect();
2151 if parts.len() != 3 {
2152 // For now, only support "prop op val"
2153 // Fallback to true if too complex to avoid breaking, but warn
2154 log::warn!(
2155 "Complex CHECK constraint expression '{}' not fully supported yet; allowing write.",
2156 expression
2157 );
2158 return Ok(true);
2159 }
2160
2161 let prop_part = parts[0].trim_start_matches('(');
2162 // Handle "variable.property" format - take the part after the dot
2163 let prop_name = if let Some(idx) = prop_part.find('.') {
2164 &prop_part[idx + 1..]
2165 } else {
2166 prop_part
2167 };
2168
2169 let op = parts[1];
2170 let val_str = parts[2].trim_end_matches(')');
2171
2172 let prop_val = match properties.get(prop_name) {
2173 Some(v) => v,
2174 None => return Ok(true), // If property missing, CHECK usually passes (unless NOT NULL)
2175 };
2176
2177 // Parse value string (handle quotes for strings)
2178 let target_val = if (val_str.starts_with('\'') && val_str.ends_with('\''))
2179 || (val_str.starts_with('"') && val_str.ends_with('"'))
2180 {
2181 Value::String(val_str[1..val_str.len() - 1].to_string())
2182 } else if let Ok(n) = val_str.parse::<i64>() {
2183 Value::Int(n)
2184 } else if let Ok(n) = val_str.parse::<f64>() {
2185 Value::Float(n)
2186 } else if let Ok(b) = val_str.parse::<bool>() {
2187 Value::Bool(b)
2188 } else {
2189 // Check for internal format wrappers if they somehow leaked through
2190 if val_str.starts_with("Number(") && val_str.ends_with(')') {
2191 let n_str = &val_str[7..val_str.len() - 1];
2192 if let Ok(n) = n_str.parse::<i64>() {
2193 Value::Int(n)
2194 } else if let Ok(n) = n_str.parse::<f64>() {
2195 Value::Float(n)
2196 } else {
2197 Value::String(val_str.to_string())
2198 }
2199 } else {
2200 Value::String(val_str.to_string())
2201 }
2202 };
2203
2204 match op {
2205 "=" | "==" => Ok(prop_val == &target_val),
2206 "!=" | "<>" => Ok(prop_val != &target_val),
2207 ">" => self
2208 .compare_values(prop_val, &target_val)
2209 .map(|o| o.is_gt()),
2210 "<" => self
2211 .compare_values(prop_val, &target_val)
2212 .map(|o| o.is_lt()),
2213 ">=" => self
2214 .compare_values(prop_val, &target_val)
2215 .map(|o| o.is_ge()),
2216 "<=" => self
2217 .compare_values(prop_val, &target_val)
2218 .map(|o| o.is_le()),
2219 _ => {
2220 log::warn!("Unsupported operator '{}' in CHECK constraint", op);
2221 Ok(true)
2222 }
2223 }
2224 }
2225
2226 fn compare_values(&self, a: &Value, b: &Value) -> Result<std::cmp::Ordering> {
2227 use std::cmp::Ordering;
2228
2229 fn cmp_f64(x: f64, y: f64) -> Ordering {
2230 x.partial_cmp(&y).unwrap_or(Ordering::Equal)
2231 }
2232
2233 match (a, b) {
2234 (Value::Int(n1), Value::Int(n2)) => Ok(n1.cmp(n2)),
2235 (Value::Float(f1), Value::Float(f2)) => Ok(cmp_f64(*f1, *f2)),
2236 (Value::Int(n), Value::Float(f)) => Ok(cmp_f64(*n as f64, *f)),
2237 (Value::Float(f), Value::Int(n)) => Ok(cmp_f64(*f, *n as f64)),
2238 (Value::String(s1), Value::String(s2)) => Ok(s1.cmp(s2)),
2239 _ => Err(anyhow!(
2240 "Cannot compare incompatible types: {:?} vs {:?}",
2241 a,
2242 b
2243 )),
2244 }
2245 }
2246
2247 async fn check_unique_constraint_multi(
2248 &self,
2249 label: &str,
2250 key_values: &[(String, Value)],
2251 current_vid: Vid,
2252 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2253 ) -> Result<()> {
2254 // Serialize constraint key once for O(1) lookups
2255 let key = serialize_constraint_key(label, key_values);
2256
2257 // 1. Check L0 (in-memory) using O(1) constraint index
2258 {
2259 let l0 = self.l0_manager.get_current();
2260 let l0_guard = l0.read();
2261 if l0_guard.has_constraint_key(&key, current_vid) {
2262 return Err(anyhow!(
2263 "Constraint violation: Duplicate composite key for label '{}'",
2264 label
2265 ));
2266 }
2267 }
2268
2269 // 1b. Check pending-flush buffers (Bug #9A). A flush rotates a key's
2270 // buffer onto `pending_flush` and installs a fresh empty current
2271 // buffer; until the rotated rows reach Lance the key is invisible to
2272 // both the current-buffer check above and the storage check below, so
2273 // a duplicate could slip through that flush window. Mirror the read
2274 // paths (e.g. `check_extid_globally_unique`, `get_vertex_labels`) that
2275 // already consult `pending_flush`.
2276 for pending_l0 in self.l0_manager.get_pending_flush() {
2277 if pending_l0.read().has_constraint_key(&key, current_vid) {
2278 return Err(anyhow!(
2279 "Constraint violation: Duplicate composite key for label '{}' (in pending flush)",
2280 label
2281 ));
2282 }
2283 }
2284
2285 // Check Transaction L0
2286 if let Some(tx_l0) = tx_l0 {
2287 let tx_l0_guard = tx_l0.read();
2288 if tx_l0_guard.has_constraint_key(&key, current_vid) {
2289 return Err(anyhow!(
2290 "Constraint violation: Duplicate composite key for label '{}' (in tx)",
2291 label
2292 ));
2293 }
2294 }
2295
2296 // 2. Check Storage (L1/L2)
2297 let filters: Vec<String> = key_values
2298 .iter()
2299 .map(|(prop, val)| {
2300 let val_str = match val {
2301 Value::String(s) => format!("'{}'", s.replace('\'', "''")),
2302 Value::Int(n) => n.to_string(),
2303 Value::Float(f) => f.to_string(),
2304 Value::Bool(b) => b.to_string(),
2305 _ => "NULL".to_string(),
2306 };
2307 format!("{} = {}", prop, val_str)
2308 })
2309 .collect();
2310
2311 let mut filter = filters.join(" AND ");
2312 filter.push_str(&format!(
2313 " AND _deleted = false AND _vid != {}",
2314 current_vid.as_u64()
2315 ));
2316
2317 // 2. Check Storage (L1/L2) through the `StorageBackend` (branch-aware,
2318 // correct `.lance` path). Skip cleanly when the table is not yet
2319 // flushed; propagate any real backend error instead of failing open.
2320 #[cfg(feature = "lance-backend")]
2321 {
2322 let backend = self.storage.backend();
2323 let table = table_names::vertex_table_name(label);
2324 if backend.table_exists(&table).await? {
2325 let count = backend.count_rows(&table, Some(filter.as_str())).await?;
2326 if count > 0 {
2327 return Err(anyhow!(
2328 "Constraint violation: Duplicate composite key for label '{}' (in storage). Filter: {}",
2329 label,
2330 filter
2331 ));
2332 }
2333 }
2334 }
2335
2336 Ok(())
2337 }
2338
2339 async fn check_write_pressure(&self) -> Result<()> {
2340 let status = self
2341 .storage
2342 .compaction_status()
2343 .map_err(|e| anyhow::anyhow!("Failed to get compaction status: {}", e))?;
2344 let l1_runs = status.l1_runs;
2345 let throttle = &self.config.throttle;
2346
2347 if l1_runs >= throttle.hard_limit {
2348 log::warn!("Write stalled: L1 runs ({}) at hard limit", l1_runs);
2349 // Simple polling for now
2350 while self
2351 .storage
2352 .compaction_status()
2353 .map_err(|e| anyhow::anyhow!("Failed to get compaction status: {}", e))?
2354 .l1_runs
2355 >= throttle.hard_limit
2356 {
2357 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2358 }
2359 } else if l1_runs >= throttle.soft_limit {
2360 let excess = l1_runs - throttle.soft_limit;
2361 // Cap multiplier to avoid overflow
2362 let excess = std::cmp::min(excess, 31);
2363 let multiplier = 2_u32.pow(excess as u32);
2364 let delay = throttle.base_delay * multiplier;
2365 tokio::time::sleep(delay).await;
2366 }
2367 Ok(())
2368 }
2369
2370 /// Check transaction memory limit to prevent OOM.
2371 /// No-op when no transaction is active.
2372 fn check_transaction_memory(&self, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) -> Result<()> {
2373 if let Some(tx_l0) = tx_l0 {
2374 let size = tx_l0.read().estimated_size;
2375 if size > self.config.max_transaction_memory {
2376 return Err(anyhow!(
2377 "Transaction memory limit exceeded: {} bytes used, limit is {} bytes. \
2378 Roll back or commit the current transaction.",
2379 size,
2380 self.config.max_transaction_memory
2381 ));
2382 }
2383 }
2384 Ok(())
2385 }
2386
2387 async fn get_query_context(
2388 &self,
2389 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2390 ) -> Option<QueryContext> {
2391 Some(QueryContext::new_with_pending(
2392 self.l0_manager.get_current(),
2393 tx_l0.cloned(),
2394 self.l0_manager.get_pending_flush(),
2395 ))
2396 }
2397
2398 /// Layer-1 CRDT variant enforcement, shared by the single-vertex and batch
2399 /// write paths.
2400 ///
2401 /// Rejects a declared CRDT property written as a parsed CRDT value
2402 /// (`Value::Map`) whose variant differs from the schema's declared variant.
2403 /// A mismatch would make the commit-time merge silently overwrite instead of
2404 /// merge, and the OCC CRDT carve-out (`occ::crdt_carveout_overwrite` /
2405 /// `WriteSet::from_l0`) would hide it as a lost update — so it must be caught
2406 /// at write time, on *every* write path. `try_as_crdt` is `Map`-gated, so the
2407 /// JSON-string (Cypher) form and non-CRDT values pass through untouched: they
2408 /// are never carved out and stay conflictable.
2409 fn enforce_crdt_variants(
2410 props_meta: &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
2411 properties: &Properties,
2412 ) -> Result<()> {
2413 for (key, value) in properties {
2414 let Some(meta) = props_meta.get(key) else {
2415 continue;
2416 };
2417 let uni_common::core::schema::DataType::Crdt(expected) = &meta.r#type else {
2418 continue;
2419 };
2420 if let Some(crdt) = crate::runtime::l0::try_as_crdt(value)
2421 && crdt.type_name() != expected.type_name()
2422 {
2423 return Err(anyhow::Error::new(uni_common::UniError::Constraint {
2424 message: format!(
2425 "CRDT property '{key}' must be written as a {} value",
2426 expected.type_name()
2427 ),
2428 }));
2429 }
2430 }
2431 Ok(())
2432 }
2433
2434 /// Prepare a vertex for upsert by merging CRDT properties with existing values.
2435 ///
2436 /// When `label` is provided, uses it directly to look up property metadata.
2437 /// Otherwise falls back to discovering the label from L0 buffers and storage.
2438 ///
2439 /// # Errors
2440 ///
2441 /// Returns an error if CRDT property merging fails.
2442 async fn prepare_vertex_upsert(
2443 &self,
2444 vid: Vid,
2445 properties: &mut Properties,
2446 label: Option<&str>,
2447 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2448 ) -> Result<()> {
2449 let Some(pm) = &self.property_manager else {
2450 return Ok(());
2451 };
2452
2453 let schema = self.schema_manager.schema();
2454
2455 // Resolve label: use provided label or discover from L0/storage
2456 let discovered_labels;
2457 let label_name = if let Some(l) = label {
2458 Some(l)
2459 } else {
2460 discovered_labels = self.get_vertex_labels(vid, tx_l0).await;
2461 discovered_labels
2462 .as_ref()
2463 .and_then(|l| l.first().map(|s| s.as_str()))
2464 };
2465
2466 let Some(label_str) = label_name else {
2467 return Ok(());
2468 };
2469 let Some(props_meta) = schema.properties.get(label_str) else {
2470 return Ok(());
2471 };
2472
2473 // Identify CRDT properties in the insert data
2474 let crdt_keys: Vec<String> = properties
2475 .keys()
2476 .filter(|key| {
2477 props_meta.get(*key).is_some_and(|meta| {
2478 matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2479 })
2480 })
2481 .cloned()
2482 .collect();
2483
2484 if crdt_keys.is_empty() {
2485 return Ok(());
2486 }
2487
2488 // Enforce that each declared CRDT property written as a parsed CRDT value
2489 // (`Value::Map`) carries its declared variant. A mismatched variant makes
2490 // `merge_crdt_properties` overwrite rather than merge at commit, and the
2491 // OCC carve-out (`occ::crdt_carveout_overwrite` / `WriteSet::from_l0`)
2492 // would hide that as a silent lost update — reject it at the source.
2493 //
2494 // Only the `Map` form is checked: it is exactly the form the carve-out
2495 // applies to (`try_as_crdt` is `Map`-gated). A CRDT written as a JSON
2496 // string (the Cypher form) or a non-CRDT value is never carved out — it
2497 // stays conflictable — so it poses no carve-out soundness risk and is left
2498 // to the existing merge/parse path. This is the declared-property half of
2499 // the layered fix; the commit-time check covers undeclared CRDT-shaped values.
2500 Self::enforce_crdt_variants(props_meta, properties)?;
2501
2502 let ctx = self.get_query_context(tx_l0).await;
2503 for key in crdt_keys {
2504 let existing = pm.get_vertex_prop_with_ctx(vid, &key, ctx.as_ref()).await?;
2505 if !existing.is_null()
2506 && let Some(val) = properties.get_mut(&key)
2507 {
2508 *val = pm.merge_crdt_values(&existing, val)?;
2509 }
2510 }
2511
2512 Ok(())
2513 }
2514
2515 async fn prepare_edge_upsert(
2516 &self,
2517 eid: Eid,
2518 properties: &mut Properties,
2519 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2520 ) -> Result<()> {
2521 if let Some(pm) = &self.property_manager {
2522 let schema = self.schema_manager.schema();
2523 // Get edge type from L0 buffer instead of from EID
2524 let type_name = self.get_edge_type_from_l0(eid);
2525
2526 if let Some(ref t_name) = type_name
2527 && let Some(props_meta) = schema.properties.get(t_name)
2528 {
2529 let mut crdt_keys = Vec::new();
2530 for (key, _) in properties.iter() {
2531 if let Some(meta) = props_meta.get(key)
2532 && matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2533 {
2534 crdt_keys.push(key.clone());
2535 }
2536 }
2537
2538 if !crdt_keys.is_empty() {
2539 let ctx = self.get_query_context(tx_l0).await;
2540 for key in crdt_keys {
2541 let existing = pm.get_edge_prop(eid, &key, ctx.as_ref()).await?;
2542
2543 if !existing.is_null()
2544 && let Some(val) = properties.get_mut(&key)
2545 {
2546 *val = pm.merge_crdt_values(&existing, val)?;
2547 }
2548 }
2549 }
2550 }
2551 }
2552 Ok(())
2553 }
2554
2555 #[instrument(skip(self, properties), level = "trace")]
2556 pub async fn insert_vertex(
2557 &self,
2558 vid: Vid,
2559 properties: Properties,
2560 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2561 ) -> Result<()> {
2562 self.insert_vertex_with_labels(vid, properties, &[], tx_l0)
2563 .await?;
2564 Ok(())
2565 }
2566
2567 /// Component C1 (G4): before a non-transactional mutation merges into main
2568 /// L0, if an outstanding snapshot pins the current generation, freeze it
2569 /// aside so snapshots taken *before* this write stay isolated from it.
2570 ///
2571 /// `flush_lock` (acquired and released here) serializes the freeze against
2572 /// concurrent commit-time freezes/merges, matching the atomicity the tx
2573 /// commit path gets. No-op for transactional writes (their freeze happens at
2574 /// commit) and — the common case — when nothing is pinned, where it costs one
2575 /// atomic load. Freezes at most once per pinned generation: the freeze
2576 /// installs a fresh unpinned `current`, so later writes in the same bulk
2577 /// import see no pin and merge in place, and the snapshot keeps reading the
2578 /// frozen pre-import buffer.
2579 async fn freeze_for_non_tx_write_if_pinned(&self, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) {
2580 // Self-gates on the runtime SSI toggle: nothing pins a snapshot unless a
2581 // transaction began under `ssi_enabled`, so `is_current_pinned()` is
2582 // always false (one atomic load) when SSI is off.
2583 if tx_l0.is_none() && self.l0_manager.is_current_pinned() {
2584 let _flush_lock_guard = self.flush_lock.lock().await;
2585 // Re-check under the lock: a concurrent commit may have frozen first.
2586 if self.l0_manager.is_current_pinned() {
2587 self.l0_manager.freeze_current_for_snapshot();
2588 metrics::counter!("uni_l0_snapshot_freezes_total").increment(1);
2589 }
2590 }
2591 }
2592
2593 #[instrument(skip(self, properties, labels), level = "trace")]
2594 pub async fn insert_vertex_with_labels(
2595 &self,
2596 vid: Vid,
2597 mut properties: Properties,
2598 labels: &[String],
2599 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2600 ) -> Result<Properties> {
2601 let start = std::time::Instant::now();
2602 self.check_write_pressure().await?;
2603 self.check_transaction_memory(tx_l0)?;
2604
2605 // Component C1 (G4): a non-transactional write (`tx_l0 == None`, e.g. bulk
2606 // import / LOAD CSV) mutates main L0 directly, outside the commit-time
2607 // snapshot freeze. Freeze the pinned generation aside first so snapshots
2608 // taken before this write stay isolated from it.
2609 self.freeze_for_non_tx_write_if_pinned(tx_l0).await;
2610
2611 if !self.try_defer_embedding(labels, &properties, vid, tx_l0) {
2612 self.process_embeddings_for_labels(labels, &mut properties)
2613 .await?;
2614 }
2615 self.validate_vertex_constraints(vid, &properties, labels, tx_l0)
2616 .await?;
2617 self.prepare_vertex_upsert(
2618 vid,
2619 &mut properties,
2620 labels.first().map(|s| s.as_str()),
2621 tx_l0,
2622 )
2623 .await?;
2624
2625 // Clone properties and labels before moving into L0 to return them and populate constraint index
2626 let properties_copy = properties.clone();
2627 let labels_copy = labels.to_vec();
2628
2629 {
2630 // For a non-tx write, re-resolve the live `current` buffer and hold
2631 // `flush_lock` across the (synchronous) write so a concurrent flush
2632 // rotate (which takes `flush_lock` via `begin_flush`) cannot install
2633 // a fresh `current` between resolve and write, dropping our write
2634 // (Bug #4). For a tx write `resolve_l0` returns the tx-private
2635 // buffer, never the rotating `current`, so no `flush_lock` is needed
2636 // (and taking it would risk re-entrancy with the commit path).
2637 let _flush_lock_guard = if tx_l0.is_none() {
2638 Some(self.flush_lock.lock().await)
2639 } else {
2640 None
2641 };
2642 let l0 = self.resolve_l0(tx_l0);
2643 let mut l0_guard = l0.write();
2644 // Generation chaining: a non-tx CRDT write into the (post-freeze,
2645 // possibly empty) current buffer must merge against the chained
2646 // committed state, not shadow it. No-op when the chain is empty
2647 // or this is a tx-private write.
2648 if tx_l0.is_none() {
2649 let pending = self.l0_manager.get_pending_flush();
2650 if !pending.is_empty() {
2651 Self::seed_crdt_props(&pending, vid, &properties, &mut l0_guard);
2652 }
2653 }
2654 l0_guard.insert_vertex_with_labels(vid, properties, labels);
2655
2656 // Populate constraint index for O(1) duplicate detection
2657 let schema = self.schema_manager.schema();
2658 for label in &labels_copy {
2659 if schema.get_label_case_insensitive(label).is_none() {
2660 if self.config.strict_schema {
2661 return Err(anyhow::anyhow!(
2662 "Label '{}' is not defined in the schema \
2663 (strict_schema is enabled).",
2664 label
2665 ));
2666 }
2667 continue; // Schemaless: skip unknown labels.
2668 }
2669
2670 // For each unique constraint on this label, insert into constraint index
2671 for constraint in &schema.constraints {
2672 if !constraint.enabled {
2673 continue;
2674 }
2675 if let ConstraintTarget::Label(l) = &constraint.target {
2676 if l != label {
2677 continue;
2678 }
2679 } else {
2680 continue;
2681 }
2682
2683 if let ConstraintType::Unique {
2684 properties: unique_props,
2685 } = &constraint.constraint_type
2686 {
2687 let mut key_values = Vec::new();
2688 let mut all_present = true;
2689 for prop in unique_props {
2690 if let Some(val) = properties_copy.get(prop) {
2691 key_values.push((prop.clone(), val.clone()));
2692 } else {
2693 all_present = false;
2694 break;
2695 }
2696 }
2697
2698 if all_present {
2699 let key = serialize_constraint_key(label, &key_values);
2700 l0_guard.insert_constraint_key(key, vid);
2701 }
2702 }
2703 }
2704 }
2705 }
2706
2707 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2708 self.update_metrics();
2709
2710 if tx_l0.is_none() {
2711 self.check_flush().await?;
2712 }
2713 if start.elapsed().as_millis() > 100 {
2714 log::warn!("Slow insert_vertex: {}ms", start.elapsed().as_millis());
2715 }
2716 Ok(properties_copy)
2717 }
2718
2719 /// True iff routing this partial write through MergeInsert would
2720 /// miss a constraint check. Specifically: a multi-key UNIQUE
2721 /// constraint where the touched-set doesn't cover all member keys
2722 /// requires the unchanged keys from the existing row to compute
2723 /// the composite. Conservative: also returns true if any touched
2724 /// key is `ext_id` (uniqueness checked globally — handled in the
2725 /// full-row path).
2726 fn touched_needs_full_read(&self, touched: &Properties, labels: &[String]) -> bool {
2727 if touched.contains_key("ext_id") {
2728 return true;
2729 }
2730 let schema = self.schema_manager.schema();
2731 for label in labels {
2732 if schema.get_label_case_insensitive(label).is_none() {
2733 continue;
2734 }
2735 for constraint in &schema.constraints {
2736 if !constraint.enabled {
2737 continue;
2738 }
2739 if let ConstraintTarget::Label(l) = &constraint.target {
2740 if !l.eq_ignore_ascii_case(label) {
2741 continue;
2742 }
2743 } else {
2744 continue;
2745 }
2746 if let ConstraintType::Unique {
2747 properties: unique_props,
2748 } = &constraint.constraint_type
2749 {
2750 if unique_props.len() < 2 {
2751 continue; // single-key UNIQUE — partial path sees the key
2752 }
2753 if unique_props.iter().any(|p| touched.contains_key(p)) {
2754 return true;
2755 }
2756 }
2757 }
2758 }
2759 false
2760 }
2761
2762 /// Insert a vertex's FULL property row plus a touched-keys hint so
2763 /// the flush emits ONLY those columns via Lance MergeInsert.
2764 ///
2765 /// Caller must have read the full row (via PropertyManager) and
2766 /// applied SET-touched values on top before calling — same input
2767 /// shape as `insert_vertex_with_labels`. The new arg `touched_keys`
2768 /// is the set of property keys this SET statement actually
2769 /// assigned; L0 records it in `vertex_partial_keys[vid]` and the
2770 /// flush filters the MergeInsert source schema down to those keys.
2771 /// When `UniConfig::partial_lance_writes == false`, falls through
2772 /// to `insert_vertex_with_labels` (Append) — preserving bit-for-bit
2773 /// equivalence with prior releases.
2774 /// Refresh auto-embed targets for a partial / `SET` write whose `touched_keys`
2775 /// include an embed *source* column: drop stale target embeddings from `props`
2776 /// (so they re-embed) and add them to `touched_keys`. Public so the query
2777 /// executor can apply it on the coalesced write **before** the partial-vs-full
2778 /// branch — both branches need it. No-op for non-SET writes / non-embed labels.
2779 pub fn refresh_embed_targets(
2780 &self,
2781 props: &mut Properties,
2782 touched_keys: &mut HashSet<String>,
2783 labels: &[String],
2784 ) {
2785 refresh_touched_embed_targets(&self.schema_manager.schema(), props, touched_keys, labels);
2786 }
2787
2788 #[instrument(skip(self, props, touched_keys, labels), level = "trace")]
2789 pub async fn insert_vertex_partial_full(
2790 &self,
2791 vid: Vid,
2792 mut props: Properties,
2793 touched_keys: HashSet<String>,
2794 labels: &[String],
2795 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2796 ) -> Result<()> {
2797 if !self.config.partial_lance_writes
2798 || self.touched_needs_full_read(&props_subset(&props, &touched_keys), labels)
2799 {
2800 self.insert_vertex_with_labels(vid, props, labels, tx_l0)
2801 .await?;
2802 return Ok(());
2803 }
2804
2805 self.check_write_pressure().await?;
2806 self.check_transaction_memory(tx_l0)?;
2807 if !self.try_defer_embedding(labels, &props, vid, tx_l0) {
2808 self.process_embeddings_for_labels(labels, &mut props)
2809 .await?;
2810 }
2811 // Full-row validation runs because we have the complete map;
2812 // no need for the partial-only validator.
2813 self.validate_vertex_constraints(vid, &props, labels, tx_l0)
2814 .await?;
2815 {
2816 let l0 = self.resolve_l0(tx_l0);
2817 let mut l0_guard = l0.write();
2818 l0_guard.insert_vertex_partial_full(vid, props, touched_keys, labels);
2819 }
2820 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2821 metrics::counter!("uni_partial_writes_total").increment(1);
2822 self.update_metrics();
2823 if tx_l0.is_none() {
2824 self.check_flush().await?;
2825 }
2826 Ok(())
2827 }
2828
2829 /// Insert a vertex's *partial* property set without first reading the
2830 /// full row.
2831 ///
2832 /// When `WriterConfig::partial_lance_writes` is `true`, the touched
2833 /// keys flow into `L0Buffer::vertex_partial_keys` so the next flush
2834 /// emits them via Lance `MergeInsertBuilder` against a subset-of-
2835 /// schema source — preserving untouched columns (e.g., embeddings)
2836 /// byte-equal in Lance with no read at the caller and no write of
2837 /// those columns.
2838 ///
2839 /// When the flag is `false`, this falls back to the existing
2840 /// `insert_vertex_with_labels` path after merging `touched` with
2841 /// the current properties from L0/storage. The caller can therefore
2842 /// use this entry point unconditionally; the optimization activates
2843 /// only when the flag is on.
2844 #[instrument(skip(self, touched, labels), level = "trace")]
2845 pub async fn insert_vertex_partial(
2846 &self,
2847 vid: Vid,
2848 touched: Properties,
2849 labels: &[String],
2850 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2851 ) -> Result<()> {
2852 let needs_full_read =
2853 !self.config.partial_lance_writes || self.touched_needs_full_read(&touched, labels);
2854 if needs_full_read {
2855 // Flag-off fallback (or constraint-driven fallback): merge
2856 // `touched` with the current full property snapshot from
2857 // L0/storage and route through the existing path. Preserves
2858 // bit-for-bit equivalence with the pre-Round-11 release.
2859 let existing = if let Some(pm) = &self.property_manager {
2860 pm.get_all_vertex_props_with_ctx(vid, None)
2861 .await
2862 .unwrap_or_default()
2863 .unwrap_or_default()
2864 } else {
2865 Properties::new()
2866 };
2867 let mut merged = existing;
2868 for (k, v) in touched {
2869 merged.insert(k, v);
2870 }
2871 self.insert_vertex_with_labels(vid, merged, labels, tx_l0)
2872 .await?;
2873 return Ok(());
2874 }
2875
2876 // Flag-on fast path: stage the partial update directly. Pressure
2877 // checks, embedding generation, constraint validation all still
2878 // run — but the validator is the partial-aware variant that
2879 // skips NOT NULL / multi-key UNIQUE / CHECK / EXISTS for
2880 // properties not present in `touched`. Multi-key UNIQUE that
2881 // overlaps the touched set forces a fallback above via
2882 // `touched_needs_full_read`.
2883 let mut touched = touched;
2884 self.check_write_pressure().await?;
2885 self.check_transaction_memory(tx_l0)?;
2886 if !self.try_defer_embedding(labels, &touched, vid, tx_l0) {
2887 self.process_embeddings_for_labels(labels, &mut touched)
2888 .await?;
2889 }
2890 self.validate_vertex_constraints_partial(vid, &touched, labels, tx_l0)
2891 .await?;
2892
2893 {
2894 let l0 = self.resolve_l0(tx_l0);
2895 let mut l0_guard = l0.write();
2896 l0_guard.insert_vertex_partial(vid, touched, labels);
2897 }
2898
2899 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2900 metrics::counter!("uni_partial_writes_total").increment(1);
2901 self.update_metrics();
2902 if tx_l0.is_none() {
2903 self.check_flush().await?;
2904 }
2905 Ok(())
2906 }
2907
2908 /// Insert multiple vertices with batched operations.
2909 ///
2910 /// This method uses batched operations to achieve O(N) complexity instead of O(N²)
2911 /// for bulk inserts with unique constraints.
2912 ///
2913 /// # Performance Improvements
2914 /// - Batch VID allocation: 1 call instead of N calls
2915 /// - Batch constraint validation: O(N) instead of O(N²)
2916 /// - Batch embedding generation: 1 API call per config instead of N calls
2917 /// - Transaction wrapping: Automatic flush deferral, atomicity
2918 ///
2919 /// # Arguments
2920 /// * `vids` - Pre-allocated VIDs for the vertices
2921 /// * `properties_batch` - Properties for each vertex
2922 /// * `labels` - Labels for all vertices (assumes single label for simplicity)
2923 ///
2924 /// # Errors
2925 /// Returns error if:
2926 /// - VID/properties length mismatch
2927 /// - Constraint violation detected
2928 /// - Embedding generation fails
2929 /// - Transaction commit fails
2930 ///
2931 /// # Atomicity
2932 /// If this method fails, all changes are rolled back (if transaction was started here).
2933 pub async fn insert_vertices_batch(
2934 &self,
2935 vids: Vec<Vid>,
2936 mut properties_batch: Vec<Properties>,
2937 labels: Vec<String>,
2938 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2939 ) -> Result<Vec<Properties>> {
2940 let start = std::time::Instant::now();
2941
2942 // Validate inputs
2943 if vids.len() != properties_batch.len() {
2944 return Err(anyhow!(
2945 "VID/properties size mismatch: {} vids, {} properties",
2946 vids.len(),
2947 properties_batch.len()
2948 ));
2949 }
2950
2951 if vids.is_empty() {
2952 return Ok(Vec::new());
2953 }
2954
2955 // Batch operations — writes go directly to the resolved L0.
2956 // Atomicity is guaranteed by the caller holding the writer lock.
2957 let result = async {
2958 self.check_write_pressure().await?;
2959 self.check_transaction_memory(tx_l0)?;
2960
2961 // Component C1 (G4): batch bulk-import is the canonical non-tx write —
2962 // freeze the pinned generation aside before merging so snapshot
2963 // readers stay isolated. No-op when unpinned or transactional.
2964 self.freeze_for_non_tx_write_if_pinned(tx_l0).await;
2965
2966 // Batch embedding generation (1 API call per config)
2967 self.process_embeddings_for_batch(&labels, &mut properties_batch)
2968 .await?;
2969
2970 // Batch constraint validation (O(N) instead of O(N²))
2971 let label = labels
2972 .first()
2973 .ok_or_else(|| anyhow!("No labels provided"))?;
2974 self.validate_vertex_batch_constraints(&vids, &properties_batch, label, tx_l0)
2975 .await?;
2976
2977 // Batch prepare (CRDT merging if needed)
2978 // Check schema once: skip entirely if no CRDT properties for this label.
2979 // For new vertices (freshly allocated VIDs), there are no existing CRDT
2980 // values to merge, so the per-vertex lookup is unnecessary in that case.
2981 let has_crdt_fields = {
2982 let schema = self.schema_manager.schema();
2983 schema
2984 .properties
2985 .get(label.as_str())
2986 .is_some_and(|props_meta| {
2987 props_meta.values().any(|meta| {
2988 matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2989 })
2990 })
2991 };
2992
2993 if has_crdt_fields {
2994 // Layer-1 variant enforcement (G3): the batch path must reject a
2995 // declared-CRDT variant mismatch exactly as the single-vertex
2996 // `prepare_vertex_upsert` does. Without this, a wrong-variant CRDT
2997 // written via batch import slips past write-time validation and
2998 // the OCC carve-out then masks the overwrite as a lost update.
2999 {
3000 let schema = self.schema_manager.schema();
3001 if let Some(props_meta) = schema.properties.get(label.as_str()) {
3002 for props in &properties_batch {
3003 Self::enforce_crdt_variants(props_meta, props)?;
3004 }
3005 }
3006 }
3007
3008 // Batch fetch existing CRDT values: collect VIDs that need merging,
3009 // then query once via PropertyManager instead of per-vertex lookups.
3010 let schema = self.schema_manager.schema();
3011 let crdt_keys: Vec<String> = schema
3012 .properties
3013 .get(label.as_str())
3014 .map(|props_meta| {
3015 props_meta
3016 .iter()
3017 .filter(|(_, meta)| {
3018 matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
3019 })
3020 .map(|(key, _)| key.clone())
3021 .collect()
3022 })
3023 .unwrap_or_default();
3024
3025 if let Some(pm) = &self.property_manager {
3026 let ctx = self.get_query_context(tx_l0).await;
3027 for (vid, props) in vids.iter().zip(&mut properties_batch) {
3028 for key in &crdt_keys {
3029 if props.contains_key(key) {
3030 let existing =
3031 pm.get_vertex_prop_with_ctx(*vid, key, ctx.as_ref()).await?;
3032 if !existing.is_null()
3033 && let Some(val) = props.get_mut(key)
3034 {
3035 *val = pm.merge_crdt_values(&existing, val)?;
3036 }
3037 }
3038 }
3039 }
3040 }
3041 }
3042
3043 // Batch L0 writes — route to active L0 (transaction L0 if active, else current).
3044 let target_l0 = self.resolve_l0(tx_l0);
3045
3046 let properties_result = properties_batch.clone();
3047 {
3048 let mut l0_guard = target_l0.write();
3049 for (vid, props) in vids.iter().zip(properties_batch.iter()) {
3050 l0_guard.insert_vertex_with_labels(*vid, props.clone(), &labels);
3051 }
3052 }
3053
3054 // Update metrics (batch increment)
3055 metrics::counter!("uni_l0_buffer_mutations_total").increment(vids.len() as u64);
3056 self.update_metrics();
3057
3058 Ok::<Vec<Properties>, anyhow::Error>(properties_result)
3059 }
3060 .await;
3061
3062 let props = result?;
3063
3064 if start.elapsed().as_millis() > 100 {
3065 log::warn!(
3066 "Slow insert_vertices_batch ({} vertices): {}ms",
3067 vids.len(),
3068 start.elapsed().as_millis()
3069 );
3070 }
3071
3072 Ok(props)
3073 }
3074
3075 /// Delete a vertex by VID.
3076 ///
3077 /// When `labels` is provided, uses them directly to populate L0 for
3078 /// correct tombstone flushing. Otherwise discovers labels from L0
3079 /// buffers and storage (which can be slow for many vertices).
3080 ///
3081 /// # Errors
3082 ///
3083 /// Returns an error if write pressure stalls, label lookup fails, or
3084 /// the L0 delete operation fails.
3085 #[instrument(skip(self, labels), level = "trace")]
3086 pub async fn delete_vertex(
3087 &self,
3088 vid: Vid,
3089 labels: Option<Vec<String>>,
3090 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
3091 ) -> Result<()> {
3092 let start = std::time::Instant::now();
3093 self.check_write_pressure().await?;
3094 self.check_transaction_memory(tx_l0)?;
3095 self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
3096
3097 // Before deleting, ensure we have the vertex's labels stored in L0 so
3098 // the tombstone can be flushed to the correct label datasets. Discover
3099 // them up front (this may await storage) WITHOUT pinning the buffer we
3100 // will eventually mutate — for non-tx writes the live `current` buffer
3101 // is re-resolved below under `flush_lock`, so a concurrent rotate can't
3102 // drop our write (Bug #4). `resolve_l0` here is only used for cheap
3103 // reads that tolerate a racing rotate.
3104 let has_labels = {
3105 let l0_guard = self.resolve_l0(tx_l0);
3106 let guard = l0_guard.read();
3107 guard.vertex_labels.contains_key(&vid)
3108 };
3109
3110 let backfill_labels = if has_labels {
3111 None
3112 } else if let Some(provided) = labels {
3113 // Caller provided labels — skip the lookup entirely
3114 Some(provided)
3115 } else {
3116 // Discover labels from pending flush L0s, then storage
3117 let mut found = None;
3118 for pending_l0 in self.l0_manager.get_pending_flush() {
3119 let pending_guard = pending_l0.read();
3120 if let Some(l) = pending_guard.get_vertex_labels(vid) {
3121 found = Some(l.to_vec());
3122 break;
3123 }
3124 }
3125 if found.is_none() {
3126 found = self.find_vertex_labels_in_storage(vid).await?;
3127 }
3128 found
3129 };
3130
3131 // Test-only seam (no-op without the `failpoints` feature): pause a
3132 // non-transactional delete AFTER the awaited label discovery but BEFORE
3133 // it re-resolves the live buffer and writes the tombstone. A concurrent
3134 // flush can rotate+complete a buffer in this window; the fix re-resolves
3135 // `get_current()` and mutates it under `flush_lock`, so the tombstone
3136 // always lands in the live buffer (Bug #4 — silent lost delete across
3137 // L0 rotation).
3138 fail::fail_point!("nontx::after-capture");
3139
3140 // Apply the label backfill and the tombstone together. For a non-tx
3141 // write, hold `flush_lock` across the (synchronous) re-resolve + write
3142 // so a concurrent flush rotate (which takes `flush_lock` via
3143 // `begin_flush`) cannot install a fresh `current` between our resolve
3144 // and our write. For a tx write `resolve_l0` returns the tx-private
3145 // buffer (never the rotating `current`), so no `flush_lock` is needed
3146 // — and taking it there would risk re-entrancy with the commit path.
3147 if tx_l0.is_none() {
3148 let _flush_lock_guard = self.flush_lock.lock().await;
3149 let l0 = self.l0_manager.get_current();
3150 let mut guard = l0.write();
3151 if let Some(found_labels) = backfill_labels {
3152 guard.vertex_labels.insert(vid, found_labels);
3153 }
3154 guard.delete_vertex(vid)?;
3155 } else {
3156 let l0 = self.resolve_l0(tx_l0);
3157 let mut guard = l0.write();
3158 if let Some(found_labels) = backfill_labels {
3159 guard.vertex_labels.insert(vid, found_labels);
3160 }
3161 guard.delete_vertex(vid)?;
3162 }
3163 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
3164 self.update_metrics();
3165
3166 if tx_l0.is_none() {
3167 self.check_flush().await?;
3168 }
3169 if start.elapsed().as_millis() > 100 {
3170 log::warn!("Slow delete_vertex: {}ms", start.elapsed().as_millis());
3171 }
3172 Ok(())
3173 }
3174
3175 /// Find vertex labels from storage by querying the main vertices table.
3176 /// Returns the labels from the latest non-deleted version of the vertex.
3177 async fn find_vertex_labels_in_storage(&self, vid: Vid) -> Result<Option<Vec<String>>> {
3178 use crate::backend::types::ScanRequest;
3179 use arrow_array::Array;
3180 use arrow_array::cast::AsArray;
3181
3182 let backend = self.storage.backend();
3183 let table_name = MainVertexDataset::table_name();
3184
3185 // Check if table exists first; if not, vertex hasn't been flushed to storage yet
3186 if !backend.table_exists(table_name).await? {
3187 return Ok(None);
3188 }
3189
3190 // Query for this specific vid (don't filter by _deleted yet - we need to find the latest version first)
3191 let filter = format!("_vid = {}", vid.as_u64());
3192 let batches = backend
3193 .scan(
3194 ScanRequest::all(table_name)
3195 .with_filter(filter)
3196 .with_columns(vec![
3197 "_vid".to_string(),
3198 "labels".to_string(),
3199 "_version".to_string(),
3200 "_deleted".to_string(),
3201 ]),
3202 )
3203 .await
3204 .unwrap_or_default();
3205
3206 // Find the row with the highest version number
3207 let mut max_version: Option<u64> = None;
3208 let mut labels: Option<Vec<String>> = None;
3209 let mut is_deleted = false;
3210
3211 for batch in batches {
3212 if batch.num_rows() == 0 {
3213 continue;
3214 }
3215
3216 let version_array = batch
3217 .column_by_name("_version")
3218 .unwrap()
3219 .as_primitive::<arrow_array::types::UInt64Type>();
3220
3221 let deleted_array = batch.column_by_name("_deleted").unwrap().as_boolean();
3222
3223 let labels_array = batch.column_by_name("labels").unwrap().as_list::<i32>();
3224
3225 for row_idx in 0..batch.num_rows() {
3226 let version = version_array.value(row_idx);
3227
3228 if max_version.is_none_or(|mv| version > mv) {
3229 is_deleted = deleted_array.value(row_idx);
3230
3231 let labels_list = labels_array.value(row_idx);
3232 let string_array = labels_list.as_string::<i32>();
3233 let vertex_labels: Vec<String> = (0..string_array.len())
3234 .filter(|&i| !string_array.is_null(i))
3235 .map(|i| string_array.value(i).to_string())
3236 .collect();
3237
3238 max_version = Some(version);
3239 labels = Some(vertex_labels);
3240 }
3241 }
3242 }
3243
3244 // If the latest version is deleted, return None
3245 if is_deleted { Ok(None) } else { Ok(labels) }
3246 }
3247
3248 #[expect(clippy::too_many_arguments)]
3249 #[instrument(skip(self, props, touched_keys), level = "trace")]
3250 pub async fn insert_edge_partial_full(
3251 &self,
3252 src_vid: Vid,
3253 dst_vid: Vid,
3254 edge_type: u32,
3255 eid: Eid,
3256 props: Properties,
3257 edge_type_name: Option<String>,
3258 touched_keys: HashSet<String>,
3259 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
3260 ) -> Result<()> {
3261 self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
3262 if !self.config.partial_lance_writes {
3263 return self
3264 .insert_edge(
3265 src_vid,
3266 dst_vid,
3267 edge_type,
3268 eid,
3269 props,
3270 edge_type_name,
3271 tx_l0,
3272 )
3273 .await;
3274 }
3275
3276 let start = std::time::Instant::now();
3277 self.check_write_pressure().await?;
3278 self.check_transaction_memory(tx_l0)?;
3279 let mut props = props;
3280 self.prepare_edge_upsert(eid, &mut props, tx_l0).await?;
3281
3282 let l0 = self.resolve_l0(tx_l0);
3283 l0.write().insert_edge_partial_full(
3284 src_vid,
3285 dst_vid,
3286 edge_type,
3287 eid,
3288 props,
3289 edge_type_name,
3290 touched_keys,
3291 )?;
3292
3293 if tx_l0.is_none() {
3294 let version = l0.read().current_version;
3295 self.adjacency_manager
3296 .insert_edge(src_vid, dst_vid, eid, edge_type, version);
3297 }
3298
3299 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
3300 metrics::counter!("uni_partial_writes_total").increment(1);
3301 self.update_metrics();
3302 if tx_l0.is_none() {
3303 self.check_flush().await?;
3304 }
3305 if start.elapsed().as_millis() > 100 {
3306 log::warn!(
3307 "Slow insert_edge_partial_full: {}ms",
3308 start.elapsed().as_millis()
3309 );
3310 }
3311 Ok(())
3312 }
3313
3314 #[expect(clippy::too_many_arguments)]
3315 pub async fn insert_edge(
3316 &self,
3317 src_vid: Vid,
3318 dst_vid: Vid,
3319 edge_type: u32,
3320 eid: Eid,
3321 mut properties: Properties,
3322 edge_type_name: Option<String>,
3323 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
3324 ) -> Result<()> {
3325 let start = std::time::Instant::now();
3326 self.check_write_pressure().await?;
3327 self.check_transaction_memory(tx_l0)?;
3328 self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
3329 self.prepare_edge_upsert(eid, &mut properties, tx_l0)
3330 .await?;
3331
3332 let l0 = self.resolve_l0(tx_l0);
3333 l0.write()
3334 .insert_edge(src_vid, dst_vid, edge_type, eid, properties, edge_type_name)?;
3335
3336 // Dual-write to AdjacencyManager overlay (survives flush).
3337 // Skip for transaction-local L0 -- transaction edges are overlaid separately.
3338 if tx_l0.is_none() {
3339 let version = l0.read().current_version;
3340 self.adjacency_manager
3341 .insert_edge(src_vid, dst_vid, eid, edge_type, version);
3342 }
3343
3344 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
3345 self.update_metrics();
3346
3347 if tx_l0.is_none() {
3348 self.check_flush().await?;
3349 }
3350 if start.elapsed().as_millis() > 100 {
3351 log::warn!("Slow insert_edge: {}ms", start.elapsed().as_millis());
3352 }
3353 Ok(())
3354 }
3355
3356 #[instrument(skip(self), level = "trace")]
3357 pub async fn delete_edge(
3358 &self,
3359 eid: Eid,
3360 src_vid: Vid,
3361 dst_vid: Vid,
3362 edge_type: u32,
3363 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
3364 ) -> Result<()> {
3365 let start = std::time::Instant::now();
3366 self.check_write_pressure().await?;
3367 self.check_transaction_memory(tx_l0)?;
3368 self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
3369 let l0 = self.resolve_l0(tx_l0);
3370
3371 l0.write().delete_edge(eid, src_vid, dst_vid, edge_type)?;
3372
3373 // Dual-write tombstone to AdjacencyManager overlay.
3374 if tx_l0.is_none() {
3375 let version = l0.read().current_version;
3376 self.adjacency_manager
3377 .add_tombstone(eid, src_vid, dst_vid, edge_type, version);
3378 }
3379 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
3380 self.update_metrics();
3381
3382 if tx_l0.is_none() {
3383 self.check_flush().await?;
3384 }
3385 if start.elapsed().as_millis() > 100 {
3386 log::warn!("Slow delete_edge: {}ms", start.elapsed().as_millis());
3387 }
3388 Ok(())
3389 }
3390
3391 /// Decide whether a flush should be triggered based on mutation count
3392 /// or elapsed time since the last flush.
3393 ///
3394 /// Extracted from [`Writer::check_flush`] so `commit_transaction_l0` can
3395 /// reuse the decision while bypassing the lock-acquiring entry point
3396 /// (it already holds `flush_lock`).
3397 fn should_flush(&self) -> bool {
3398 let count = self.l0_manager.get_current().read().mutation_count;
3399 if count == 0 {
3400 return false;
3401 }
3402 if count >= self.config.auto_flush_threshold {
3403 return true;
3404 }
3405 if let Some(interval) = self.config.auto_flush_interval
3406 && self.last_flush_time.lock().elapsed() >= interval
3407 && count >= self.config.auto_flush_min_mutations
3408 {
3409 return true;
3410 }
3411 false
3412 }
3413
3414 /// Check if flush should be triggered based on mutation count or time elapsed.
3415 /// This method is called after each write operation and can also be called
3416 /// by a background task for time-based flushing.
3417 pub async fn check_flush(&self) -> Result<()> {
3418 if self.should_flush() {
3419 self.flush_to_l1(None).await?;
3420 }
3421 Ok(())
3422 }
3423
3424 /// Process embeddings for a vertex using labels passed directly.
3425 /// Use this when labels haven't been stored to L0 yet.
3426 async fn process_embeddings_for_labels(
3427 &self,
3428 labels: &[String],
3429 properties: &mut Properties,
3430 ) -> Result<()> {
3431 let label_name = labels.first().map(|s| s.as_str());
3432 self.process_embeddings_impl(label_name, properties).await
3433 }
3434
3435 /// Phase B: if `defer_embeddings` is enabled in `UniConfig` and the
3436 /// vertex has an embedding config that hasn't been satisfied by the
3437 /// caller-provided properties, enqueue the VID in
3438 /// `L0Buffer::pending_embeddings` and return `true`. The caller then
3439 /// skips `process_embeddings_for_labels` and the embedding is computed
3440 /// in a single batched call at flush time via
3441 /// `drain_pending_embeddings`.
3442 ///
3443 /// Returns `false` (caller falls back to today's per-row eager embed)
3444 /// if any of:
3445 /// - the flag is off,
3446 /// - no label has an embedding config,
3447 /// - the user already provided the target property (matches the
3448 /// existing skip-if-present semantics at writer.rs:2727).
3449 ///
3450 /// Trade-off: when deferral is active, in-tx reads of the embedding
3451 /// column return only what was already in storage (or nothing for
3452 /// brand-new vertices). Existing tests that RETURN n.embedding in
3453 /// the same tx as a SET on the source column must run with the flag
3454 /// off; opt in only when no such reads happen between write and
3455 /// commit.
3456 fn try_defer_embedding(
3457 &self,
3458 labels: &[String],
3459 properties: &Properties,
3460 vid: Vid,
3461 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
3462 ) -> bool {
3463 if !self.config.defer_embeddings {
3464 return false;
3465 }
3466 let Some(label) = labels.first() else {
3467 return false;
3468 };
3469
3470 let schema = self.schema_manager.schema();
3471 let mut has_unsatisfied_cfg = false;
3472 for idx in &schema.indexes {
3473 let unsatisfied = match idx {
3474 IndexDefinition::Vector(v_cfg) => {
3475 v_cfg.label == *label
3476 && v_cfg.embedding_config.is_some()
3477 && !properties.contains_key(&v_cfg.property)
3478 }
3479 IndexDefinition::Sparse(s_cfg) => {
3480 s_cfg.label == *label
3481 && s_cfg.embedding_config.is_some()
3482 && !properties.contains_key(&s_cfg.property)
3483 }
3484 _ => false,
3485 };
3486 if unsatisfied {
3487 has_unsatisfied_cfg = true;
3488 break;
3489 }
3490 }
3491 if !has_unsatisfied_cfg {
3492 return false;
3493 }
3494
3495 let l0 = self.resolve_l0(tx_l0);
3496 let mut guard = l0.write();
3497 guard.pending_embeddings.insert(vid, label.clone());
3498 true
3499 }
3500
3501 /// Drain `pending_embeddings` from the rotated old-L0 right before
3502 /// `flush_stream_l1` reads it. Groups by label, issues one batched
3503 /// `process_embeddings_for_batch` call per label, and writes the
3504 /// resulting embedding vectors into each VID's `vertex_properties`
3505 /// map. After this returns, the flush proceeds against an L0 that
3506 /// looks no different from one whose embeddings were generated
3507 /// per-row at insert.
3508 ///
3509 /// Idempotent: a VID whose embedding was already materialized
3510 /// (e.g., by on-demand read paths in a future Phase B revision) is
3511 /// detected via `properties.contains_key(target_prop)` inside
3512 /// `process_embeddings_for_batch` (writer.rs:~2650), so re-running
3513 /// the drain is safe.
3514 async fn drain_pending_embeddings(&self, old_l0_arc: &Arc<RwLock<L0Buffer>>) -> Result<()> {
3515 let by_label: HashMap<String, Vec<Vid>> = {
3516 let guard = old_l0_arc.read();
3517 if guard.pending_embeddings.is_empty() {
3518 return Ok(());
3519 }
3520 let mut m: HashMap<String, Vec<Vid>> = HashMap::new();
3521 for (vid, label) in &guard.pending_embeddings {
3522 m.entry(label.clone()).or_default().push(*vid);
3523 }
3524 m
3525 };
3526
3527 for (label, vids) in by_label {
3528 let mut properties_batch: Vec<Properties> = {
3529 let guard = old_l0_arc.read();
3530 vids.iter()
3531 .map(|vid| {
3532 guard
3533 .vertex_properties
3534 .get(vid)
3535 .cloned()
3536 .unwrap_or_default()
3537 })
3538 .collect()
3539 };
3540
3541 self.process_embeddings_for_batch(std::slice::from_ref(&label), &mut properties_batch)
3542 .await?;
3543
3544 let mut guard = old_l0_arc.write();
3545 for (vid, props) in vids.iter().zip(properties_batch) {
3546 let target = guard.vertex_properties.entry(*vid).or_default();
3547 for (k, v) in props {
3548 target.insert(k, v);
3549 }
3550 guard.pending_embeddings.remove(vid);
3551 }
3552 }
3553 Ok(())
3554 }
3555
3556 /// Materialise MUVERA FDE columns for the about-to-flush L0. Mirrors
3557 /// [`Self::drain_pending_embeddings`]: for each MUVERA index, compute the derived
3558 /// Fixed-Dimensional Encoding from each row's source multi-vector and inject it into
3559 /// that row's `vertex_properties` (so the normal column builder writes the
3560 /// `__fde_*` column with no hot-path change). For partial-write rows that touched the
3561 /// source column, the derived column is added to `vertex_partial_keys` so the partial
3562 /// MergeInsert batch carries the recomputed FDE (avoids staleness on `SET`).
3563 ///
3564 /// No-op when the schema has no MUVERA index. Unlike auto-embed, the FDE is a pure,
3565 /// deterministic, in-process transform — no runtime/embedding service needed.
3566 fn materialize_fde_columns(&self, old_l0_arc: &Arc<RwLock<L0Buffer>>) -> Result<()> {
3567 let schema = self.schema_manager.schema();
3568 let specs = crate::storage::muvera_index::fde_specs(&schema);
3569 if specs.is_empty() {
3570 return Ok(());
3571 }
3572 let mut guard = old_l0_arc.write();
3573 for spec in &specs {
3574 let encoder = uni_common::muvera::FdeEncoder::new(&spec.params)
3575 .map_err(|e| anyhow!("MUVERA index '{}': {e}", spec.index_name))?;
3576 // VIDs of this label currently in L0 (collect first to avoid a borrow
3577 // conflict with the per-row mutation below).
3578 let vids: Vec<Vid> = guard
3579 .vertex_labels
3580 .iter()
3581 .filter(|(_, labels)| labels.contains(&spec.label))
3582 .map(|(vid, _)| *vid)
3583 .collect();
3584 for vid in vids {
3585 // Decode the source multi-vector tokens (borrow ends before the mutation).
3586 let tokens = match guard
3587 .vertex_properties
3588 .get(&vid)
3589 .and_then(|p| p.get(&spec.source_prop))
3590 {
3591 Some(v) => crate::storage::muvera_index::value_to_multivec(v),
3592 None => continue, // source absent → leave the FDE column NULL
3593 };
3594 // A source token with the wrong dimension makes `encode_doc` error.
3595 // Skipping the row (leaving the FDE column NULL → ranks last under the
3596 // mandatory Dot metric; harmless) keeps one malformed document from
3597 // wedging *every* flush of this label, matching the normal multi-vector
3598 // column path, which null-fills a dimension mismatch rather than failing
3599 // the flush (issue #96).
3600 let fde = match encoder.encode_doc(&tokens) {
3601 Ok(fde) => fde,
3602 Err(e) => {
3603 tracing::warn!(
3604 index = %spec.index_name,
3605 vid = ?vid,
3606 error = %e,
3607 "muvera.fde.skip_malformed: leaving FDE NULL for a source \
3608 multi-vector that failed encoding"
3609 );
3610 continue;
3611 }
3612 };
3613 if let Some(props) = guard.vertex_properties.get_mut(&vid) {
3614 props.insert(spec.derived_col.clone(), Value::Vector(fde));
3615 }
3616 if let Some(touched) = guard.vertex_partial_keys.get_mut(&vid)
3617 && touched.contains(&spec.source_prop)
3618 {
3619 touched.insert(spec.derived_col.clone());
3620 }
3621 }
3622 }
3623 Ok(())
3624 }
3625
3626 /// Process embeddings for a batch of vertices efficiently.
3627 ///
3628 /// Groups vertices by embedding config and makes batched API calls to the
3629 /// embedding service instead of calling once per vertex.
3630 ///
3631 /// # Performance
3632 /// For N vertices with embedding config:
3633 /// - Old approach: N API calls to embedding service
3634 /// - New approach: 1 API call per embedding config (usually 1 total)
3635 async fn process_embeddings_for_batch(
3636 &self,
3637 labels: &[String],
3638 properties_batch: &mut [Properties],
3639 ) -> Result<()> {
3640 let Some(label) = labels.first().map(|s| s.as_str()) else {
3641 return Ok(());
3642 };
3643 let schema = self.schema_manager.schema();
3644
3645 // Group auto-embed targets by (alias, source). A group with both a dense Vector and a
3646 // multi-vector List<Vector> column is a single-pass hybrid source: one inference fills
3647 // both. Non-mixed groups use the dense / multi-vector embedder as before.
3648 let groups = collect_embed_groups(&schema, label);
3649 if groups.is_empty() {
3650 return Ok(());
3651 }
3652
3653 for (key, group) in groups {
3654 let alias = &key.0;
3655 let want_dense = !group.dense.is_empty();
3656 let want_multi = !group.multi.is_empty();
3657 let want_sparse = !group.sparse.is_empty();
3658
3659 // A row needs this group's inference if it has the source text and is still missing
3660 // at least one of the group's target columns (user-supplied values are preserved).
3661 let mut input_texts: Vec<String> = Vec::new();
3662 let mut needs: Vec<usize> = Vec::new();
3663 for (idx, properties) in properties_batch.iter().enumerate() {
3664 let all_present = group
3665 .dense
3666 .iter()
3667 .chain(group.multi.iter())
3668 .chain(group.sparse.iter())
3669 .all(|t| properties.contains_key(t));
3670 if all_present {
3671 continue;
3672 }
3673 let mut inputs = Vec::new();
3674 for src in &group.source_properties {
3675 if let Some(val) = properties.get(src)
3676 && let Some(s) = val.as_str()
3677 {
3678 inputs.push(s.to_string());
3679 }
3680 }
3681 if inputs.is_empty() {
3682 continue;
3683 }
3684 let text = inputs.join(" ");
3685 let text = match &group.document_prefix {
3686 Some(prefix) => format!("{prefix}{text}"),
3687 None => text,
3688 };
3689 input_texts.push(text);
3690 needs.push(idx);
3691 }
3692 if input_texts.is_empty() {
3693 continue;
3694 }
3695
3696 let runtime = self
3697 .xervo_runtime
3698 .get()
3699 .ok_or_else(|| anyhow!("Uni-Xervo runtime not configured for auto-embedding"))?;
3700 let input_refs: Vec<&str> = input_texts.iter().map(|s| s.as_str()).collect();
3701 let (dense, multi, sparse) = embed_group(
3702 runtime,
3703 alias,
3704 &input_refs,
3705 want_dense,
3706 want_multi,
3707 want_sparse,
3708 )
3709 .await?;
3710
3711 for (i, &row) in needs.iter().enumerate() {
3712 if let Some(vec) = dense.as_ref().and_then(|d| d.get(i)) {
3713 let vals: Vec<Value> = vec.iter().map(|f| Value::Float(*f as f64)).collect();
3714 for t in &group.dense {
3715 if !properties_batch[row].contains_key(t) {
3716 properties_batch[row].insert(t.clone(), Value::List(vals.clone()));
3717 }
3718 }
3719 }
3720 if let Some(tokens) = multi.as_ref().and_then(|m| m.get(i)) {
3721 let mv = multivec_to_value(tokens);
3722 for t in &group.multi {
3723 if !properties_batch[row].contains_key(t) {
3724 properties_batch[row].insert(t.clone(), mv.clone());
3725 }
3726 }
3727 }
3728 if let Some(pairs) = sparse.as_ref().and_then(|s| s.get(i)) {
3729 let sv = sparse_pairs_to_value(pairs);
3730 for t in &group.sparse {
3731 if !properties_batch[row].contains_key(t) {
3732 properties_batch[row].insert(t.clone(), sv.clone());
3733 }
3734 }
3735 }
3736 }
3737 }
3738
3739 Ok(())
3740 }
3741
3742 async fn process_embeddings_impl(
3743 &self,
3744 label_name: Option<&str>,
3745 properties: &mut Properties,
3746 ) -> Result<()> {
3747 let schema = self.schema_manager.schema();
3748
3749 let Some(label) = label_name else {
3750 return Ok(());
3751 };
3752
3753 // Same (alias, source) grouping as the deferred path: a mixed dense + multi-vector
3754 // group is a single-pass hybrid source (one inference fills both columns).
3755 let groups = collect_embed_groups(&schema, label);
3756 if groups.is_empty() {
3757 log::info!("No embedding config found for label {}", label);
3758 return Ok(());
3759 }
3760
3761 for (key, group) in groups {
3762 let alias = &key.0;
3763 // Skip if every target already present (user-supplied values win).
3764 if group
3765 .dense
3766 .iter()
3767 .chain(group.multi.iter())
3768 .chain(group.sparse.iter())
3769 .all(|t| properties.contains_key(t))
3770 {
3771 continue;
3772 }
3773
3774 let mut inputs = Vec::new();
3775 for src in &group.source_properties {
3776 if let Some(val) = properties.get(src)
3777 && let Some(s) = val.as_str()
3778 {
3779 inputs.push(s.to_string());
3780 }
3781 }
3782 if inputs.is_empty() {
3783 continue;
3784 }
3785 let text = inputs.join(" ");
3786 let text = match &group.document_prefix {
3787 Some(prefix) => format!("{prefix}{text}"),
3788 None => text,
3789 };
3790
3791 let runtime = self
3792 .xervo_runtime
3793 .get()
3794 .ok_or_else(|| anyhow!("Uni-Xervo runtime not configured for auto-embedding"))?;
3795 let want_dense = !group.dense.is_empty();
3796 let want_multi = !group.multi.is_empty();
3797 let want_sparse = !group.sparse.is_empty();
3798 let (dense, multi, sparse) = embed_group(
3799 runtime,
3800 alias,
3801 &[text.as_str()],
3802 want_dense,
3803 want_multi,
3804 want_sparse,
3805 )
3806 .await?;
3807
3808 if let Some(vec) = dense.as_ref().and_then(|d| d.first()) {
3809 let vals: Vec<Value> = vec.iter().map(|f| Value::Float(*f as f64)).collect();
3810 for t in &group.dense {
3811 if !properties.contains_key(t) {
3812 properties.insert(t.clone(), Value::List(vals.clone()));
3813 }
3814 }
3815 }
3816 if let Some(tokens) = multi.as_ref().and_then(|m| m.first()) {
3817 let mv = multivec_to_value(tokens);
3818 for t in &group.multi {
3819 if !properties.contains_key(t) {
3820 properties.insert(t.clone(), mv.clone());
3821 }
3822 }
3823 }
3824 if let Some(pairs) = sparse.as_ref().and_then(|s| s.first()) {
3825 let sv = sparse_pairs_to_value(pairs);
3826 for t in &group.sparse {
3827 if !properties.contains_key(t) {
3828 properties.insert(t.clone(), sv.clone());
3829 }
3830 }
3831 }
3832 }
3833 Ok(())
3834 }
3835
3836 /// Flushes the current in-memory L0 buffer to L1 storage.
3837 ///
3838 /// # Lock Ordering
3839 ///
3840 /// To prevent deadlocks, locks must be acquired in the following order:
3841 /// 1. `Writer` lock (held by caller via outer `Arc<RwLock<Writer>>`; removed in Phase 4)
3842 /// 2. `flush_lock` (acquired by this entry point; held across the whole flush)
3843 /// 3. `L0Manager` lock (via `begin_flush` / `get_current`)
3844 /// 4. `L0Buffer` lock (individual buffer RWLocks)
3845 /// 5. `Index` / `Storage` locks (during actual flush)
3846 ///
3847 /// Callers that already hold `flush_lock` (today only `commit_transaction_l0`)
3848 /// must call `flush_inline_under_lock` (private) directly to avoid a re-entrant
3849 /// `tokio::sync::Mutex` deadlock — see concurrent_writer.md §5.5.
3850 pub async fn flush_to_l1(&self, name: Option<String>) -> Result<String> {
3851 // Drain any in-flight async flushes first. `flush_to_l1` is a
3852 // SYNCHRONIZATION BARRIER — callers (test fixtures, fork
3853 // setup, shutdown paths) rely on it as "all writes are now
3854 // durably in Lance". Without the drain, an async stream from
3855 // a recent commit might still be writing to Lance when
3856 // `flush_to_l1` returns, leaving a window where forks branch
3857 // off pre-write Lance state and lose data.
3858 if let Some(coord) = self.flush_coordinator.as_ref() {
3859 let _ = coord.drain(self.config.drop_fork_drain_timeout).await;
3860 }
3861 let _flush_lock_guard = self.flush_lock.lock().await;
3862 self.flush_inline_under_lock(name).await
3863 }
3864
3865 /// Flush L0→L1 and capture the fork point under one held `flush_lock`.
3866 ///
3867 /// Drains in-flight async flushes, takes `flush_lock`, runs the inline
3868 /// flush, and then — still holding the lock — reads the allocator
3869 /// high-water marks and each existing candidate dataset's Lance
3870 /// version. Capturing under the held lock is what makes the fork point
3871 /// atomic: no concurrent commit can advance the allocator and no
3872 /// concurrent flush can advance a dataset tip between the flush and the
3873 /// reads. See [`ForkPoint`].
3874 ///
3875 /// `candidate_dataset_names` are resolved to `{base_uri}/{name}.lance`;
3876 /// names with no `.lance` directory on disk are skipped (returned map
3877 /// has no entry for them), matching the fork branch loop's existence
3878 /// check.
3879 ///
3880 /// # Errors
3881 /// Propagates flush failures from `flush_inline_under_lock` and any
3882 /// per-dataset version read failure from `lance_branch::current_version`.
3883 ///
3884 /// # Deadlocks
3885 /// Must not be called by a task already holding `flush_lock` (e.g.
3886 /// `commit_transaction_l0`); the `tokio::sync::Mutex` is not reentrant.
3887 /// Fork creation never holds the lock, so the sole call site is safe.
3888 pub async fn flush_and_capture_fork_point(
3889 &self,
3890 candidate_dataset_names: &[String],
3891 ) -> Result<ForkPoint> {
3892 if let Some(coord) = self.flush_coordinator.as_ref() {
3893 let _ = coord.drain(self.config.drop_fork_drain_timeout).await;
3894 }
3895 let _flush_lock_guard = self.flush_lock.lock().await;
3896 self.flush_inline_under_lock(None).await?;
3897
3898 // Still under `flush_lock`: capture the allocator HWM, the MVCC
3899 // version HWM, and every existing dataset's Lance version so
3900 // nothing can interleave.
3901 let (vid_hwm, eid_hwm) = self.allocator.current_hwm().await;
3902 // The parent's current L0 version is the largest `_version` any
3903 // inherited row can carry (flushed or in-memory). A fork bootstraps
3904 // its version floor to this so a fork tx read still sees inherited
3905 // rows. Cheap read lock; no buffer clone.
3906 let version_hwm = self.l0_manager.get_current().read().current_version;
3907
3908 let base = self.storage.base_uri();
3909 let mut dataset_versions = BTreeMap::new();
3910 for name in candidate_dataset_names {
3911 let uri = join_lance_uri(base, name);
3912 if !lance_path_exists(&uri) {
3913 continue;
3914 }
3915 let version = crate::backend::lance_branch::current_version(&uri).await?;
3916 dataset_versions.insert(name.clone(), version);
3917 }
3918
3919 Ok(ForkPoint {
3920 vid_hwm,
3921 eid_hwm,
3922 dataset_versions,
3923 version_hwm,
3924 })
3925 }
3926
3927 /// Async-flush entry point: rotate under `flush_lock`, release the
3928 /// lock, then submit the stream phase to the [`FlushCoordinator`].
3929 /// Returns a [`FlushTicket`](crate::runtime::flush_coordinator::FlushTicket)
3930 /// that resolves when finalize completes.
3931 ///
3932 /// Errors if `config.async_flush_enabled = false` (the coordinator
3933 /// is `None` in that case — see `flush_coordinator` field doc).
3934 pub async fn flush_to_l1_async(
3935 self: &Arc<Self>,
3936 name: Option<String>,
3937 ) -> Result<crate::runtime::flush_coordinator::FlushTicket> {
3938 let coord = self
3939 .flush_coordinator
3940 .as_ref()
3941 .ok_or_else(|| anyhow!("async flush not enabled (config.async_flush_enabled=false)"))?
3942 .clone();
3943 // 1. Acquire permit FIRST (outside flush_lock) so we don't
3944 // introduce a permit-while-holding-flush-lock convoy.
3945 let permit = coord.acquire_permit().await?;
3946 // 2. Rotate under flush_lock (µs work), then allocate the rotate seq
3947 // and bump pending ONLY after the rotate succeeds (Bug #3). A failed
3948 // rotate (the `?` below) must consume neither: the finalizer
3949 // advances in strictly consecutive seq order and only decrements
3950 // pending on finalize, so a leaked seq/pending would wedge it
3951 // forever. The seq is allocated under `flush_lock`, immediately
3952 // after the rotate and before the guard drops, so concurrent rotates
3953 // keep seq order == rotation order, and the seq is unused until
3954 // submit. On the `?` error path the permit drops, freeing the slot.
3955 let (
3956 RotateOutput {
3957 old_l0_arc,
3958 wal_lsn,
3959 current_version,
3960 flush_in_progress_guard,
3961 },
3962 seq,
3963 ) = {
3964 let _flush_lock_guard = self.flush_lock.lock().await;
3965 let rotate_out = self.flush_l0_rotate().await?;
3966 let seq = coord.next_rotate_seq();
3967 coord.note_pending();
3968 (rotate_out, seq)
3969 };
3970 // 3. Build the coordinator's RotatedFlush. parent_manifest is the
3971 // cached_manifest snapshot at this moment.
3972 let parent_manifest = self.cached_manifest.lock().clone();
3973 let rotated = RotatedFlush {
3974 seq,
3975 old_l0_arc: old_l0_arc.clone(),
3976 wal_lsn,
3977 current_version,
3978 name: name.clone(),
3979 parent_manifest,
3980 permit,
3981 flush_in_progress_guard,
3982 };
3983 // 4. Spawn the stream phase via the coordinator. The closure
3984 // captures Arc<Writer> transiently — drops when stream
3985 // completes (bounded, ~50-500 ms).
3986 let writer = self.clone();
3987 let ticket = coord.submit_for_stream(rotated, move |old_l0, wal, ver, n| async move {
3988 let outcome = writer.flush_stream_l1(old_l0, wal, ver, n).await?;
3989 Ok(crate::runtime::flush_coordinator::FlushOutcome {
3990 new_manifest: outcome.manifest,
3991 snapshot_id: outcome.snapshot_id,
3992 })
3993 });
3994 Ok(ticket)
3995 }
3996
3997 /// Phase A+B+C of the flush: flush the WAL, rotate L0 (so the
3998 /// to-be-flushed buffer moves to `pending_flush` and a fresh L0 takes
3999 /// its place), and hand off the WAL to the new L0.
4000 ///
4001 /// Runs in microseconds. Must be called under `flush_lock` (the caller
4002 /// is responsible). The returned [`RotateOutput`] carries everything
4003 /// the subsequent stream + finalize phases need; in particular the
4004 /// [`FlushInProgressGuard`] is bound to the return value so it stays
4005 /// alive for the full flush lifetime — including any future async
4006 /// path where stream runs on a spawned task.
4007 async fn flush_l0_rotate(&self) -> Result<RotateOutput> {
4008 // Acquire the in-progress counter BEFORE any heavy work. The
4009 // guard lives on RotateOutput; dropping RotateOutput drops the
4010 // guard, so the counter goes back to zero exactly when the flush
4011 // is fully done.
4012 let flush_in_progress_guard = FlushInProgressGuard::new(&self.storage);
4013
4014 // A. Flush WAL BEFORE rotating L0. If WAL flush fails, the
4015 // current L0 is still active and mutations are retained in
4016 // memory until restart/retry.
4017 let wal_for_truncate = {
4018 let current_l0 = self.l0_manager.get_current();
4019 let l0_guard = current_l0.read();
4020 l0_guard.wal.clone()
4021 };
4022 // Test-only seam (no-op without the `failpoints` feature): inject a
4023 // WAL-flush failure here to drive the "failed async rotate wedges the
4024 // finalizer" regression (Bug #3). When configured to "return" it makes
4025 // `flush_l0_rotate` return Err exactly as a real WAL-flush failure would.
4026 fail::fail_point!("flush::rotate-fail", |_| {
4027 Err(anyhow!("flush::rotate-fail injected WAL-flush failure"))
4028 });
4029 let wal_lsn = if let Some(ref w) = wal_for_truncate {
4030 w.flush().await?
4031 } else {
4032 0
4033 };
4034
4035 // B. Begin flush: rotate L0 and keep old L0 visible to reads via
4036 // pending_flush until complete_flush is called by finalize.
4037 let old_l0_arc = self.l0_manager.begin_flush(0, None);
4038 metrics::counter!("uni_l0_buffer_rotations_total").increment(1);
4039
4040 // C. WAL handoff: record wal_lsn on old L0, transfer WAL handle
4041 // and current_version to the new L0.
4042 let current_version;
4043 {
4044 let mut old_l0_guard = old_l0_arc.write();
4045 current_version = old_l0_guard.current_version;
4046 old_l0_guard.wal_lsn_at_flush = wal_lsn;
4047 let wal = old_l0_guard.wal.take();
4048 let new_l0_arc = self.l0_manager.get_current();
4049 let mut new_l0_guard = new_l0_arc.write();
4050 new_l0_guard.wal = wal;
4051 new_l0_guard.current_version = current_version;
4052 // The new active buffer starts accumulating strictly above the
4053 // rotation point: everything <= `wal_lsn` is now owned by the old
4054 // (being-flushed) buffer or earlier. This start watermark is the
4055 // floor that keeps WAL truncation / checkpoint publication from
4056 // discarding this buffer's data if its eventual flush fails.
4057 new_l0_guard.wal_lsn_at_start = wal_lsn;
4058 }
4059
4060 Ok(RotateOutput {
4061 old_l0_arc,
4062 wal_lsn,
4063 current_version,
4064 flush_in_progress_guard,
4065 })
4066 }
4067
4068 /// Phases D, E, F, G of the flush: L1 collect, orphan resolve,
4069 /// manifest seed, Lance writes. Reads from `old_l0_arc` (kept in
4070 /// pending_flush by Phase B); writes append-only Lance datasets; does
4071 /// NOT call save_snapshot / set_latest_snapshot — those are
4072 /// finalize's job, so the manifest doesn't get published until the
4073 /// next phase.
4074 ///
4075 /// Today takes `&self`; in a follow-up commit this becomes a
4076 /// static `Send + 'static` function over `SharedFlushCtx` so it can
4077 /// run on a spawned task while concurrent commits proceed.
4078 async fn flush_stream_l1(
4079 &self,
4080 old_l0_arc: Arc<RwLock<L0Buffer>>,
4081 wal_lsn: u64,
4082 current_version: u64,
4083 name: Option<String>,
4084 ) -> Result<FlushOutcome> {
4085 // Test-only seam (no-op without the `failpoints` feature): the rotate
4086 // (begin_flush) already moved the to-be-flushed buffer onto
4087 // pending_flush and installed a fresh empty current buffer, but the
4088 // rotated rows are NOT yet durable in Lance. Pausing here holds that
4089 // window open to drive the unique-constraint-hole regression
4090 // (Bug #9 Mechanism A).
4091 fail::fail_point!("flush::after-rotate-before-lance");
4092
4093 // Phase B: materialize any deferred embeddings before column
4094 // extraction. No-op when `defer_embeddings` is off (the set will
4095 // be empty). On-demand reads of the embedding column are a TODO
4096 // for a future revision (see UniConfig::defer_embeddings docs).
4097 self.drain_pending_embeddings(&old_l0_arc).await?;
4098
4099 // Materialise MUVERA FDE columns from each row's source multi-vector (pure/sync;
4100 // no-op without a MUVERA index). Runs after embeddings so a row can be both
4101 // auto-embedded and FDE-encoded.
4102 self.materialize_fde_columns(&old_l0_arc)?;
4103
4104 let schema = self.schema_manager.schema();
4105 // 2. Acquire Read lock on Old L0 for flushing
4106 let mut entries_by_type: HashMap<u32, Vec<L1Entry>> = HashMap::new();
4107 // (Vid, labels, properties, deleted, version)
4108 type VertexEntry = (Vid, Vec<String>, Properties, bool, u64);
4109 let mut vertices_by_label: HashMap<u16, Vec<VertexEntry>> = HashMap::new();
4110 // Partial-column updates (Lance MergeInsert path). Per-VID tuple:
4111 // (vid, full L0 properties map, version, set of keys to update).
4112 // Only the keys in the HashSet are emitted to the partial source;
4113 // the full props map is retained so the per-row column extractor
4114 // can read each touched key's value.
4115 type PartialEntry = (Vid, Properties, u64, std::collections::HashSet<String>);
4116 let mut partial_by_label: HashMap<u16, Vec<PartialEntry>> = HashMap::new();
4117 // DELETE-via-MergeInsert (Round-12 §B): tombstones flush as a
4118 // partial source with just `_vid`, `_deleted=true`, `_version`,
4119 // `_updated_at`. Skips the wide-row Append payload that adds
4120 // nothing on a soft-delete.
4121 let mut tombstones_by_label: HashMap<u16, Vec<(Vid, u64)>> = HashMap::new();
4122 let mut main_vertex_tombstones: Vec<(Vid, u64)> = Vec::new();
4123 // Collect vertex timestamps from L0 for flushing to storage
4124 let mut vertex_created_at: HashMap<Vid, i64> = HashMap::new();
4125 let mut vertex_updated_at: HashMap<Vid, i64> = HashMap::new();
4126 // Track tombstones missing labels for storage query fallback
4127 let mut orphaned_tombstones: Vec<(Vid, u64)> = Vec::new();
4128
4129 {
4130 let old_l0 = old_l0_arc.read();
4131
4132 // 1. Collect all edges and tombstones from L0
4133 for edge in old_l0.graph.edges() {
4134 let properties = old_l0
4135 .edge_properties
4136 .get(&edge.eid)
4137 .cloned()
4138 .unwrap_or_default();
4139 let version = old_l0.edge_versions.get(&edge.eid).copied().unwrap_or(0);
4140
4141 // Get timestamps from L0 buffer (populated during insert)
4142 let created_at = old_l0.edge_created_at.get(&edge.eid).copied();
4143 let updated_at = old_l0.edge_updated_at.get(&edge.eid).copied();
4144
4145 entries_by_type
4146 .entry(edge.edge_type)
4147 .or_default()
4148 .push(L1Entry {
4149 src_vid: edge.src_vid,
4150 dst_vid: edge.dst_vid,
4151 eid: edge.eid,
4152 op: Op::Insert,
4153 version,
4154 properties,
4155 created_at,
4156 updated_at,
4157 });
4158 }
4159
4160 // From tombstones
4161 for tombstone in old_l0.tombstones.values() {
4162 let version = old_l0
4163 .edge_versions
4164 .get(&tombstone.eid)
4165 .copied()
4166 .unwrap_or(0);
4167 // Get timestamps - for deletes, updated_at reflects deletion time
4168 let created_at = old_l0.edge_created_at.get(&tombstone.eid).copied();
4169 let updated_at = old_l0.edge_updated_at.get(&tombstone.eid).copied();
4170
4171 entries_by_type
4172 .entry(tombstone.edge_type)
4173 .or_default()
4174 .push(L1Entry {
4175 src_vid: tombstone.src_vid,
4176 dst_vid: tombstone.dst_vid,
4177 eid: tombstone.eid,
4178 op: Op::Delete,
4179 version,
4180 properties: HashMap::new(),
4181 created_at,
4182 updated_at,
4183 });
4184 }
4185
4186 // 1b. Collect vertices by label (using vertex_labels from L0)
4187 //
4188 // Helper: fan-out a single vertex entry into per-label buckets.
4189 // Each per-label table row carries the full label set so multi-label
4190 // info is preserved after flush.
4191 let push_vertex_to_labels =
4192 |vid: Vid,
4193 all_labels: &[String],
4194 props: Properties,
4195 deleted: bool,
4196 version: u64,
4197 out: &mut HashMap<u16, Vec<VertexEntry>>| {
4198 for label in all_labels {
4199 if let Some(label_id) = schema.label_id_by_name(label) {
4200 out.entry(label_id).or_default().push((
4201 vid,
4202 all_labels.to_vec(),
4203 props.clone(),
4204 deleted,
4205 version,
4206 ));
4207 }
4208 }
4209 };
4210
4211 for (vid, props) in &old_l0.vertex_properties {
4212 let version = old_l0.vertex_versions.get(vid).copied().unwrap_or(0);
4213 // Collect timestamps for this vertex
4214 if let Some(&ts) = old_l0.vertex_created_at.get(vid) {
4215 vertex_created_at.insert(*vid, ts);
4216 }
4217 if let Some(&ts) = old_l0.vertex_updated_at.get(vid) {
4218 vertex_updated_at.insert(*vid, ts);
4219 }
4220 if let Some(labels) = old_l0.vertex_labels.get(vid) {
4221 // Partial-write routing: when this VID was last
4222 // touched via `insert_vertex_partial` AND the
4223 // partial_lance_writes flag is on, send only the
4224 // touched columns to a MergeInsert batch. Otherwise
4225 // (CREATE, MERGE-ON-CREATE, full-replace SET, DELETE
4226 // — or flag off) use the existing full-row Append.
4227 let is_partial = self.config.partial_lance_writes
4228 && old_l0.vertex_partial_keys.contains_key(vid);
4229 if is_partial {
4230 if let Some(touched) = old_l0.vertex_partial_keys.get(vid) {
4231 for label in labels {
4232 if let Some(label_id) = schema.label_id_by_name(label) {
4233 partial_by_label.entry(label_id).or_default().push((
4234 *vid,
4235 props.clone(),
4236 version,
4237 touched.clone(),
4238 ));
4239 }
4240 }
4241 }
4242 } else {
4243 push_vertex_to_labels(
4244 *vid,
4245 labels,
4246 props.clone(),
4247 false,
4248 version,
4249 &mut vertices_by_label,
4250 );
4251 }
4252 }
4253 }
4254 for &vid in &old_l0.vertex_tombstones {
4255 let version = old_l0.vertex_versions.get(&vid).copied().unwrap_or(0);
4256 if let Some(&ts) = old_l0.vertex_updated_at.get(&vid) {
4257 vertex_updated_at.insert(vid, ts);
4258 }
4259 if let Some(labels) = old_l0.vertex_labels.get(&vid) {
4260 // Round-12 §B: tombstones flush via Lance MergeInsert
4261 // (just `_vid`, `_deleted=true`, `_version`,
4262 // `_updated_at`) — skipping the wide-row Append.
4263 // Unconditional (no `partial_lance_writes` gating);
4264 // tombstone Append carries no useful payload.
4265 for label in labels {
4266 if let Some(label_id) = schema.label_id_by_name(label) {
4267 tombstones_by_label
4268 .entry(label_id)
4269 .or_default()
4270 .push((vid, version));
4271 }
4272 }
4273 } else {
4274 // Tombstone missing labels (old WAL format) - collect for storage query fallback
4275 orphaned_tombstones.push((vid, version));
4276 }
4277 }
4278 } // Drop read lock
4279
4280 // Resolve orphaned tombstones (missing labels) from storage
4281 if !orphaned_tombstones.is_empty() {
4282 tracing::warn!(
4283 count = orphaned_tombstones.len(),
4284 "Tombstones missing labels in L0, querying storage as fallback"
4285 );
4286 for (vid, version) in orphaned_tombstones {
4287 if let Ok(Some(labels)) = self.find_vertex_labels_in_storage(vid).await
4288 && !labels.is_empty()
4289 {
4290 for label in &labels {
4291 if let Some(label_id) = schema.label_id_by_name(label) {
4292 // Round-12 §B: route through partial tombstone too.
4293 tombstones_by_label
4294 .entry(label_id)
4295 .or_default()
4296 .push((vid, version));
4297 }
4298 }
4299 }
4300 }
4301 }
4302
4303 // 1. Load previous snapshot from cache, or fall back to storage.
4304 //
4305 // Use clone() not take(): for the async path, multiple
4306 // concurrent streams may run; if we take() here, a sibling
4307 // stream sees cached_manifest = None and seeds from
4308 // load_latest_snapshot (stale), losing the chain. clone()
4309 // preserves the parent. Finalize writes back the new manifest
4310 // unconditionally.
4311 let mut manifest = if let Some(cached) = self.cached_manifest.lock().clone() {
4312 cached
4313 } else {
4314 self.storage
4315 .snapshot_manager()
4316 .load_latest_snapshot()
4317 .await?
4318 .unwrap_or_else(|| {
4319 SnapshotManifest::new(Uuid::new_v4().to_string(), schema.schema_version)
4320 })
4321 };
4322
4323 // Update snapshot metadata
4324 // Save parent snapshot ID before generating new one (for lineage tracking)
4325 let parent_id = manifest.snapshot_id.clone();
4326 manifest.parent_snapshot = Some(parent_id);
4327 manifest.snapshot_id = Uuid::new_v4().to_string();
4328 manifest.name = name;
4329 manifest.created_at = Utc::now();
4330 manifest.version_high_water_mark = current_version;
4331 // Cap the published WAL checkpoint at the floor of any OTHER pending
4332 // flush. A still-pending flush (notably one that FAILED and left its
4333 // buffer in `pending_flush`) holds committed WAL entries above its start
4334 // that are NOT in this snapshot; recovery replays from this mark, so
4335 // claiming durability past that floor would skip them (lost commit). A
4336 // normal flush with no other pending buffer keeps `wal_lsn` unchanged.
4337 manifest.wal_high_water_mark = self
4338 .l0_manager
4339 .min_pending_wal_lsn_start(&old_l0_arc)
4340 .map_or(wal_lsn, |floor| floor.min(wal_lsn));
4341 let snapshot_id = manifest.snapshot_id.clone();
4342
4343 tracing::Span::current().record("snapshot_id", &snapshot_id);
4344
4345 // 2. Write main unified tables FIRST (before deltas).
4346 // Ensures the dual-write invariant: by the time an EID appears in a
4347 // delta table, it already exists in main_edges. This prevents the
4348 // compaction debug_assert from firing when compaction interleaves
4349 // with flush at async yield points.
4350 //
4351 // 2.1 Main edges table
4352 let (main_edges, edge_created_at_map, edge_updated_at_map) = {
4353 let _old_l0 = old_l0_arc.read();
4354 let mut main_edges: Vec<(
4355 uni_common::core::id::Eid,
4356 Vid,
4357 Vid,
4358 String,
4359 Properties,
4360 bool,
4361 u64,
4362 )> = Vec::new();
4363 let mut edge_created_at_map: HashMap<uni_common::core::id::Eid, i64> = HashMap::new();
4364 let mut edge_updated_at_map: HashMap<uni_common::core::id::Eid, i64> = HashMap::new();
4365
4366 for (&edge_type_id, entries) in entries_by_type.iter() {
4367 for entry in entries {
4368 let edge_type_name = self
4369 .storage
4370 .schema_manager()
4371 .edge_type_name_by_id_unified(edge_type_id)
4372 .unwrap_or_else(|| "unknown".to_string());
4373
4374 let deleted = matches!(entry.op, Op::Delete);
4375 main_edges.push((
4376 entry.eid,
4377 entry.src_vid,
4378 entry.dst_vid,
4379 edge_type_name,
4380 entry.properties.clone(),
4381 deleted,
4382 entry.version,
4383 ));
4384
4385 if let Some(ts) = entry.created_at {
4386 edge_created_at_map.insert(entry.eid, ts);
4387 }
4388 if let Some(ts) = entry.updated_at {
4389 edge_updated_at_map.insert(entry.eid, ts);
4390 }
4391 }
4392 }
4393
4394 (main_edges, edge_created_at_map, edge_updated_at_map)
4395 };
4396
4397 if !main_edges.is_empty() {
4398 let main_edge_batch = MainEdgeDataset::build_record_batch(
4399 &main_edges,
4400 Some(&edge_created_at_map),
4401 Some(&edge_updated_at_map),
4402 )?;
4403 MainEdgeDataset::write_batch(self.storage.backend(), main_edge_batch).await?;
4404 MainEdgeDataset::ensure_default_indexes(self.storage.backend()).await?;
4405 }
4406
4407 // 2.2 Main vertices table
4408 let mut main_vertices: Vec<(Vid, Vec<String>, Properties, bool, u64)> = {
4409 let old_l0 = old_l0_arc.read();
4410 let mut vertices = Vec::new();
4411
4412 // Live vertices: full-row Append on the main table (the
4413 // props_json blob is required for global ID lookups). For
4414 // partial-row VIDs (vertex_partial_keys non-empty), the
4415 // main table still needs the full props for the
4416 // ext_id-uniqueness path; we keep the Append here. The
4417 // per-label Lance write IS partial via MergeInsert.
4418 for (vid, props) in &old_l0.vertex_properties {
4419 let version = old_l0.vertex_versions.get(vid).copied().unwrap_or(0);
4420 let labels = old_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
4421 vertices.push((*vid, labels, props.clone(), false, version));
4422 }
4423
4424 // Tombstones: collected into `main_vertex_tombstones` for
4425 // the MergeInsert path below; skipping the wide-row Append.
4426 for &vid in &old_l0.vertex_tombstones {
4427 let version = old_l0.vertex_versions.get(&vid).copied().unwrap_or(0);
4428 main_vertex_tombstones.push((vid, version));
4429 }
4430
4431 vertices
4432 };
4433
4434 // M8: durable label-only mutations across flush windows.
4435 //
4436 // `SET n:Label` / `REMOVE n:Label` mark the vid in
4437 // `vertex_label_overwrites` and update `vertex_labels`, but for a
4438 // vid flushed in a PRIOR window they never re-add it to
4439 // `vertex_properties`. The loops above key off `vertex_properties`,
4440 // so such a relabel would be silently lost: absent from the main
4441 // table, the per-label datasets, and the VidLabelsIndex (and so
4442 // `rebuild_vid_labels_index` reads stale labels after a restart).
4443 // The same-window create+relabel case already works because the
4444 // create put the vid in `vertex_properties`.
4445 //
4446 // Re-derive each overwrite-only vid by fetching its persisted props
4447 // and labels, then route it into `main_vertices` (main table +
4448 // index), the new per-label datasets, and a tombstone in any
4449 // per-label dataset it left. `MATCH (n:OldLabel)` scans the
4450 // per-label table directly, so the old-label tombstone is required.
4451 let overwrite_only: Vec<(Vid, Vec<String>, u64)> = {
4452 let old_l0 = old_l0_arc.read();
4453 old_l0
4454 .vertex_label_overwrites
4455 .iter()
4456 .filter(|vid| {
4457 !old_l0.vertex_properties.contains_key(*vid)
4458 && !old_l0.vertex_tombstones.contains(*vid)
4459 })
4460 .map(|vid| {
4461 let labels = old_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
4462 let version = old_l0.vertex_versions.get(vid).copied().unwrap_or(0);
4463 (*vid, labels, version)
4464 })
4465 .collect()
4466 };
4467 for (vid, new_labels, version) in overwrite_only {
4468 // Persisted props of the prior-window row — required so the
4469 // re-Appended main row does not blank the vertex's properties.
4470 let Some(props) = MainVertexDataset::find_props_by_vid(
4471 self.storage.backend(),
4472 vid,
4473 self.storage.version_high_water_mark(),
4474 )
4475 .await?
4476 else {
4477 tracing::warn!(
4478 vid = vid.as_u64(),
4479 "label-only mutation for a vid with no persisted main row; skipping flush \
4480 of its relabel"
4481 );
4482 continue;
4483 };
4484 // Labels the vid carried BEFORE this relabel; the storage read
4485 // reflects pre-flush state. Any label no longer present must be
4486 // tombstoned in its per-label dataset.
4487 let old_labels = self
4488 .find_vertex_labels_in_storage(vid)
4489 .await?
4490 .unwrap_or_default();
4491
4492 main_vertices.push((vid, new_labels.clone(), props.clone(), false, version));
4493 for label in &new_labels {
4494 if let Some(label_id) = schema.label_id_by_name(label) {
4495 vertices_by_label.entry(label_id).or_default().push((
4496 vid,
4497 new_labels.clone(),
4498 props.clone(),
4499 false,
4500 version,
4501 ));
4502 }
4503 }
4504 for label in &old_labels {
4505 if !new_labels.contains(label)
4506 && let Some(label_id) = schema.label_id_by_name(label)
4507 {
4508 tombstones_by_label
4509 .entry(label_id)
4510 .or_default()
4511 .push((vid, version));
4512 }
4513 }
4514 }
4515
4516 if !main_vertices.is_empty() {
4517 let main_vertex_batch = MainVertexDataset::build_record_batch(
4518 &main_vertices,
4519 Some(&vertex_created_at),
4520 Some(&vertex_updated_at),
4521 )?;
4522 MainVertexDataset::write_batch(self.storage.backend(), main_vertex_batch).await?;
4523 }
4524 // Round-12 §B: tombstones via MergeInsert on the main vertices
4525 // table. Independent of `vertex_properties` length.
4526 if !main_vertex_tombstones.is_empty() {
4527 let tomb_batch = MainVertexDataset::build_tombstone_partial_batch(
4528 &main_vertex_tombstones,
4529 Some(&vertex_updated_at),
4530 )?;
4531 MainVertexDataset::merge_insert_tombstone_batch(self.storage.backend(), tomb_batch)
4532 .await?;
4533 }
4534 if !main_vertices.is_empty() || !main_vertex_tombstones.is_empty() {
4535 MainVertexDataset::ensure_default_indexes(self.storage.backend()).await?;
4536 }
4537
4538 // Keep the VidLabelsIndex current for every flushed vertex. This is the
4539 // single place that sees all vertices: the per-label fan-out below skips
4540 // undeclared (schemaless) labels, so updating the index there would miss
4541 // them. Traversal-time label predicates read this index to resolve
4542 // labels for vertices that live only in Lance — notably on a fork, whose
4543 // data is flushed to Lance before branching. (GitHub #99)
4544 for (vid, labels, _props, _deleted, _version) in &main_vertices {
4545 self.storage.update_vid_labels_index(*vid, labels.clone());
4546 }
4547 for (vid, _version) in &main_vertex_tombstones {
4548 self.storage.remove_from_vid_labels_index(*vid);
4549 }
4550
4551 // 3. For each edge type, write FWD and BWD delta runs
4552 for (&edge_type_id, entries) in entries_by_type.iter() {
4553 // Get edge type name from unified lookup (handles both schema'd and schemaless)
4554 let edge_type_name = self
4555 .storage
4556 .schema_manager()
4557 .edge_type_name_by_id_unified(edge_type_id)
4558 .ok_or_else(|| anyhow!("Edge type ID {} not found", edge_type_id))?;
4559
4560 // FWD Run (sorted by src_vid)
4561 // Round-12 §A: split entries into full-row Append and
4562 // partial MergeInsert routes based on `edge_partial_keys`.
4563 // Edges in `edge_partial_keys` were last written via
4564 // `insert_edge_partial_full`; the per-edge-type delta
4565 // tables receive only the touched schema columns plus
4566 // (when any overflow key was touched) the regenerated
4567 // `overflow_json` blob. Untouched columns retain their
4568 // previous-version value via Lance MergeInsert.
4569 let partial_eids: std::collections::HashSet<Eid> = {
4570 let old_l0 = old_l0_arc.read();
4571 entries
4572 .iter()
4573 .filter(|e| {
4574 self.config.partial_lance_writes
4575 && old_l0.edge_partial_keys.contains_key(&e.eid)
4576 })
4577 .map(|e| e.eid)
4578 .collect()
4579 };
4580 let touched_union_by_eid: HashMap<Eid, std::collections::HashSet<String>> = {
4581 let old_l0 = old_l0_arc.read();
4582 partial_eids
4583 .iter()
4584 .filter_map(|eid| old_l0.edge_partial_keys.get(eid).map(|s| (*eid, s.clone())))
4585 .collect()
4586 };
4587 let (full_entries, partial_entries): (Vec<L1Entry>, Vec<L1Entry>) = entries
4588 .clone()
4589 .into_iter()
4590 .partition(|e| !partial_eids.contains(&e.eid));
4591
4592 let backend = self.storage.backend();
4593
4594 // FWD run (sorted by src_vid)
4595 let mut fwd_full = full_entries.clone();
4596 fwd_full.sort_by_key(|e| e.src_vid);
4597 let mut fwd_partial = partial_entries.clone();
4598 fwd_partial.sort_by_key(|e| e.src_vid);
4599 let fwd_ds = self.storage.delta_dataset(&edge_type_name, "fwd")?;
4600 if !fwd_full.is_empty() {
4601 let fwd_batch = fwd_ds.build_record_batch(&fwd_full, &schema)?;
4602 fwd_ds.write_run(backend, fwd_batch).await?;
4603 }
4604 if !fwd_partial.is_empty() {
4605 let touched_union: std::collections::HashSet<String> = fwd_partial
4606 .iter()
4607 .flat_map(|e| {
4608 touched_union_by_eid
4609 .get(&e.eid)
4610 .cloned()
4611 .unwrap_or_default()
4612 .into_iter()
4613 })
4614 .collect();
4615 let fwd_partial_batch =
4616 fwd_ds.build_partial_record_batch(&fwd_partial, &touched_union, &schema)?;
4617 fwd_ds
4618 .merge_insert_partial_run(backend, fwd_partial_batch)
4619 .await?;
4620 }
4621 fwd_ds.ensure_eid_index(backend).await?;
4622
4623 // BWD Run (sorted by dst_vid)
4624 let mut bwd_full = full_entries.clone();
4625 bwd_full.sort_by_key(|e| e.dst_vid);
4626 let mut bwd_partial = partial_entries.clone();
4627 bwd_partial.sort_by_key(|e| e.dst_vid);
4628 let bwd_ds = self.storage.delta_dataset(&edge_type_name, "bwd")?;
4629 if !bwd_full.is_empty() {
4630 let bwd_batch = bwd_ds.build_record_batch(&bwd_full, &schema)?;
4631 bwd_ds.write_run(backend, bwd_batch).await?;
4632 }
4633 if !bwd_partial.is_empty() {
4634 let touched_union: std::collections::HashSet<String> = bwd_partial
4635 .iter()
4636 .flat_map(|e| {
4637 touched_union_by_eid
4638 .get(&e.eid)
4639 .cloned()
4640 .unwrap_or_default()
4641 .into_iter()
4642 })
4643 .collect();
4644 let bwd_partial_batch =
4645 bwd_ds.build_partial_record_batch(&bwd_partial, &touched_union, &schema)?;
4646 bwd_ds
4647 .merge_insert_partial_run(backend, bwd_partial_batch)
4648 .await?;
4649 }
4650 bwd_ds.ensure_eid_index(backend).await?;
4651
4652 // Update Manifest
4653 let current_snap =
4654 manifest
4655 .edges
4656 .entry(edge_type_name.to_string())
4657 .or_insert(EdgeSnapshot {
4658 version: 0,
4659 count: 0,
4660 lance_version: 0,
4661 });
4662 current_snap.version += 1;
4663 current_snap.count += entries.len() as u64;
4664 // LanceDB tables don't expose Lance version directly
4665 current_snap.lance_version = 0;
4666
4667 // Note: No CSR invalidation needed. AdjacencyManager's overlay
4668 // already has these edges via dual-write in insert_edge/delete_edge.
4669 }
4670
4671 // 4. Per-label vertex table writes
4672 // Iterate all labels that have either full-row OR partial-write
4673 // data pending. A label may appear in only one of the two maps
4674 // (e.g., all updates on this label were partial-only).
4675 let all_label_ids: std::collections::HashSet<u16> = vertices_by_label
4676 .keys()
4677 .chain(partial_by_label.keys())
4678 .chain(tombstones_by_label.keys())
4679 .copied()
4680 .collect();
4681 for label_id in all_label_ids {
4682 let vertices = vertices_by_label.remove(&label_id).unwrap_or_default();
4683 let label_name = schema
4684 .label_name_by_id(label_id)
4685 .ok_or_else(|| anyhow!("Label ID {} not found", label_id))?;
4686
4687 let ds = self.storage.vertex_dataset(label_name)?;
4688
4689 // Collect inverted index updates before consuming vertices
4690 // Maps: cfg.property -> (added, removed)
4691 type InvertedUpdateMap = HashMap<String, (HashMap<Vid, Vec<String>>, HashSet<Vid>)>;
4692 let mut inverted_updates: InvertedUpdateMap = HashMap::new();
4693
4694 for idx in &schema.indexes {
4695 if let IndexDefinition::Inverted(cfg) = idx
4696 && cfg.label == label_name
4697 {
4698 let mut added: HashMap<Vid, Vec<String>> = HashMap::new();
4699 let mut removed: HashSet<Vid> = HashSet::new();
4700
4701 for (vid, _labels, props, deleted, _version) in &vertices {
4702 if *deleted {
4703 removed.insert(*vid);
4704 } else if let Some(prop_value) = props.get(&cfg.property) {
4705 // Extract terms from the property value (List<String>)
4706 if let Some(arr) = prop_value.as_array() {
4707 let terms: Vec<String> = arr
4708 .iter()
4709 .filter_map(|v| v.as_str().map(ToString::to_string))
4710 .collect();
4711 if !terms.is_empty() {
4712 added.insert(*vid, terms);
4713 }
4714 }
4715 }
4716 }
4717 // Round-12 §B: tombstones no longer in `vertices`;
4718 // pull them from `tombstones_by_label` for inverted
4719 // index removal.
4720 if let Some(tomb_rows) = tombstones_by_label.get(&label_id) {
4721 for (vid, _) in tomb_rows {
4722 removed.insert(*vid);
4723 }
4724 }
4725
4726 if !added.is_empty() || !removed.is_empty() {
4727 inverted_updates.insert(cfg.property.clone(), (added, removed));
4728 }
4729 }
4730 }
4731
4732 // Collect sparse-vector index updates before consuming vertices.
4733 // Maps: cfg.property -> (added [(term_id, weight)] per vid, removed).
4734 type SparseUpdateMap = HashMap<String, (HashMap<Vid, Vec<(u32, f32)>>, HashSet<Vid>)>;
4735 let mut sparse_updates: SparseUpdateMap = HashMap::new();
4736
4737 for idx in &schema.indexes {
4738 if let IndexDefinition::Sparse(cfg) = idx
4739 && cfg.label == label_name
4740 {
4741 let mut added: HashMap<Vid, Vec<(u32, f32)>> = HashMap::new();
4742 let mut removed: HashSet<Vid> = HashSet::new();
4743
4744 for (vid, _labels, props, deleted, _version) in &vertices {
4745 if *deleted {
4746 removed.insert(*vid);
4747 } else if let Some(uni_common::Value::SparseVector { indices, values }) =
4748 props.get(&cfg.property)
4749 {
4750 let pairs: Vec<(u32, f32)> = indices
4751 .iter()
4752 .copied()
4753 .zip(values.iter().copied())
4754 .collect();
4755 added.insert(*vid, pairs);
4756 // An in-place SET re-flushes an already-indexed vid. The
4757 // sparse postings are a `Vec` with no per-vid dedup, so unless
4758 // the vid is also marked removed, `apply_incremental_updates`
4759 // appends the new postings *alongside* the stale ones — leaking
4760 // duplicates that grow unboundedly on hot-updated docs and
4761 // double-count the advisory `query_topk` score (issue #95).
4762 // Mark every updated vid removed so its prior postings are
4763 // purged before the new ones are appended (remove-then-add).
4764 removed.insert(*vid);
4765 }
4766 }
4767 // Tombstones are not in `vertices`; pull from tombstones_by_label.
4768 if let Some(tomb_rows) = tombstones_by_label.get(&label_id) {
4769 for (vid, _) in tomb_rows {
4770 removed.insert(*vid);
4771 }
4772 }
4773
4774 if !added.is_empty() || !removed.is_empty() {
4775 sparse_updates.insert(cfg.property.clone(), (added, removed));
4776 }
4777 }
4778 }
4779
4780 let mut v_data = Vec::new();
4781 let mut d_data = Vec::new();
4782 let mut ver_data = Vec::new();
4783 for (vid, labels, props, deleted, version) in vertices {
4784 v_data.push((vid, labels, props));
4785 d_data.push(deleted);
4786 ver_data.push(version);
4787 }
4788
4789 let backend = self.storage.backend();
4790
4791 // Skip the full-row Append entirely if this label only has
4792 // partial-write rows pending.
4793 if !v_data.is_empty() {
4794 let batch = ds.build_record_batch_with_timestamps(
4795 &v_data,
4796 &d_data,
4797 &ver_data,
4798 &schema,
4799 Some(&vertex_created_at),
4800 Some(&vertex_updated_at),
4801 )?;
4802 ds.write_batch(backend, batch, &schema).await?;
4803 }
4804
4805 // Partial-column batch (Lance MergeInsert path). The flag
4806 // gates whether the routing classified any VIDs as partial;
4807 // outside the flag this collection is always empty so the
4808 // call below is a cheap no-op.
4809 if let Some(partial_rows) = partial_by_label.remove(&label_id)
4810 && !partial_rows.is_empty()
4811 {
4812 let touched_union: std::collections::HashSet<String> = partial_rows
4813 .iter()
4814 .flat_map(|(_, _, _, keys)| keys.iter().cloned())
4815 .collect();
4816 let pairs: Vec<(Vid, Properties)> = partial_rows
4817 .iter()
4818 .map(|(vid, props, _, _)| (*vid, props.clone()))
4819 .collect();
4820 let versions: Vec<u64> = partial_rows.iter().map(|(_, _, v, _)| *v).collect();
4821 let partial_batch = ds.build_partial_record_batch(
4822 &pairs,
4823 &versions,
4824 &touched_union,
4825 &schema,
4826 Some(&vertex_updated_at),
4827 )?;
4828 if partial_batch.num_rows() > 0 {
4829 ds.merge_insert_batch(backend, partial_batch).await?;
4830 }
4831 }
4832
4833 // Tombstone batch (Round-12 §B): always MergeInsert with
4834 // just `_vid`, `_deleted=true`, `_version`, `_updated_at`.
4835 // No partial_lance_writes gating — tombstones never carry
4836 // useful property payload to write. Captured tombstone vids
4837 // also drive `remove_from_vid_labels_index` below.
4838 let tombstone_rows = tombstones_by_label.remove(&label_id).unwrap_or_default();
4839 if !tombstone_rows.is_empty() {
4840 let tomb_batch =
4841 ds.build_tombstone_partial_batch(&tombstone_rows, Some(&vertex_updated_at))?;
4842 if tomb_batch.num_rows() > 0 {
4843 ds.merge_insert_batch(backend, tomb_batch).await?;
4844 }
4845 }
4846
4847 ds.ensure_default_indexes(backend).await?;
4848
4849 // VidLabelsIndex maintenance is centralized at the main-vertex
4850 // flush above (it sees both schema'd and schemaless vertices).
4851
4852 // Update Manifest
4853 let current_snap =
4854 manifest
4855 .vertices
4856 .entry(label_name.to_string())
4857 .or_insert(LabelSnapshot {
4858 version: 0,
4859 count: 0,
4860 lance_version: 0,
4861 });
4862 current_snap.version += 1;
4863 current_snap.count += v_data.len() as u64;
4864 // LanceDB tables don't expose Lance version directly
4865 current_snap.lance_version = 0;
4866
4867 // Invalidate table cache to ensure next read picks up new version
4868 self.storage.invalidate_table_cache(label_name);
4869
4870 // Apply inverted index updates incrementally
4871 #[cfg(feature = "lance-backend")]
4872 for idx in &schema.indexes {
4873 if let IndexDefinition::Inverted(cfg) = idx
4874 && cfg.label == label_name
4875 && let Some((added, removed)) = inverted_updates.get(&cfg.property)
4876 {
4877 self.storage
4878 .index_manager()
4879 .update_inverted_index_incremental(cfg, added, removed)
4880 .await?;
4881 }
4882 }
4883
4884 // Apply sparse-vector index updates incrementally
4885 #[cfg(feature = "lance-backend")]
4886 for idx in &schema.indexes {
4887 if let IndexDefinition::Sparse(cfg) = idx
4888 && cfg.label == label_name
4889 && let Some((added, removed)) = sparse_updates.get(&cfg.property)
4890 {
4891 self.storage
4892 .index_manager()
4893 .update_sparse_vector_index_incremental(cfg, added, removed)
4894 .await?;
4895 }
4896 }
4897
4898 // Update UID index with new vertex mappings
4899 // Collect (UniId, Vid) mappings from non-deleted vertices
4900 #[cfg(feature = "lance-backend")]
4901 {
4902 let mut uid_mappings: Vec<(uni_common::core::id::UniId, Vid)> = Vec::new();
4903 for (vid, _labels, props) in &v_data {
4904 let ext_id = props.get("ext_id").and_then(|v| v.as_str());
4905 let uid = crate::storage::vertex::VertexDataset::compute_vertex_uid(
4906 label_name, ext_id, props,
4907 );
4908 uid_mappings.push((uid, *vid));
4909 }
4910
4911 if !uid_mappings.is_empty()
4912 && let Ok(uid_index) = self.storage.uid_index(label_name)
4913 {
4914 // Stamp mappings with this flush's MVCC version so a later
4915 // re-create of the same UID deterministically outranks the
4916 // stale mapping (review C3).
4917 uid_index
4918 .write_mapping_versioned(&uid_mappings, current_version)
4919 .await?;
4920 }
4921 }
4922 }
4923 Ok(FlushOutcome {
4924 manifest,
4925 snapshot_id,
4926 })
4927 }
4928
4929 /// Composition entry that assumes the caller already holds `flush_lock`.
4930 /// Runs rotate + stream + finalize_locked in sequence. Used by
4931 /// [`Writer::flush_to_l1`] (acquires the lock first) and by
4932 /// `commit_transaction_l0`'s post-merge auto-flush branch (which already
4933 /// holds the lock from the commit critical section).
4934 #[instrument(
4935 skip(self),
4936 fields(snapshot_id, mutations_count, size_bytes),
4937 level = "info"
4938 )]
4939 async fn flush_inline_under_lock(&self, name: Option<String>) -> Result<String> {
4940 let start = std::time::Instant::now();
4941
4942 let (initial_size, initial_count) = {
4943 let l0_arc = self.l0_manager.get_current();
4944 let l0 = l0_arc.read();
4945 (l0.estimated_size, l0.mutation_count)
4946 };
4947 tracing::Span::current().record("size_bytes", initial_size);
4948 tracing::Span::current().record("mutations_count", initial_count);
4949
4950 debug!("Starting L0 flush to L1");
4951
4952 // Phases A (WAL pre-flush), B (rotate), C (WAL handoff).
4953 // FlushInProgressGuard lives on RotateOutput and stays alive for
4954 // the full flush — including the finalize_locked call below.
4955 let RotateOutput {
4956 old_l0_arc,
4957 wal_lsn,
4958 current_version,
4959 flush_in_progress_guard: _flush_guard,
4960 } = self.flush_l0_rotate().await?;
4961
4962 // Phases D (L1 collect), E (orphan resolve), F (manifest seed),
4963 // G (Lance writes). Builds the manifest but does NOT publish it.
4964 let FlushOutcome {
4965 manifest,
4966 snapshot_id,
4967 } = self
4968 .flush_stream_l1(old_l0_arc.clone(), wal_lsn, current_version, name)
4969 .await?;
4970
4971 // Phases H..S: publish manifest, complete_flush, WAL truncate,
4972 // property cache clear, last_flush_time, metrics, l1_runs++,
4973 // compaction trigger, index-rebuild scheduling, fork tick.
4974 self.flush_finalize_locked(
4975 old_l0_arc,
4976 wal_lsn,
4977 manifest,
4978 snapshot_id,
4979 initial_size,
4980 initial_count,
4981 start,
4982 )
4983 .await
4984 }
4985
4986 /// Phases H..S of the flush: publish the manifest and run all
4987 /// post-publish bookkeeping. Assumes the caller already holds
4988 /// `flush_lock` — see [`Writer::flush_finalize_now`] for the
4989 /// lock-acquiring variant used by the async finalize path.
4990 #[allow(clippy::too_many_arguments)]
4991 async fn flush_finalize_locked(
4992 &self,
4993 old_l0_arc: Arc<RwLock<L0Buffer>>,
4994 wal_lsn: u64,
4995 manifest: SnapshotManifest,
4996 snapshot_id: String,
4997 initial_size: usize,
4998 initial_count: usize,
4999 start: std::time::Instant,
5000 ) -> Result<String> {
5001 Self::flush_finalize_body(
5002 &self.shared_ctx(),
5003 old_l0_arc,
5004 wal_lsn,
5005 manifest,
5006 snapshot_id,
5007 initial_size,
5008 initial_count,
5009 start,
5010 )
5011 .await
5012 }
5013
5014 /// Phases H..S of the flush, lock-acquiring variant. Used by the
5015 /// async-flush finalizer task (running on a spawned tokio task),
5016 /// which holds neither `&self` nor `flush_lock`. Briefly re-acquires
5017 /// `flush_lock` to serialize the publish boundary, then runs the
5018 /// same body as `flush_finalize_locked` but over a SharedFlushCtx.
5019 #[allow(clippy::too_many_arguments)]
5020 pub(crate) async fn flush_finalize_now(
5021 shared: SharedFlushCtx,
5022 old_l0_arc: Arc<RwLock<L0Buffer>>,
5023 wal_lsn: u64,
5024 manifest: SnapshotManifest,
5025 snapshot_id: String,
5026 initial_size: usize,
5027 initial_count: usize,
5028 start: std::time::Instant,
5029 ) -> Result<String> {
5030 let _flush_lock_guard = shared.flush_lock.clone().lock_owned().await;
5031 Self::flush_finalize_body(
5032 &shared,
5033 old_l0_arc,
5034 wal_lsn,
5035 manifest,
5036 snapshot_id,
5037 initial_size,
5038 initial_count,
5039 start,
5040 )
5041 .await
5042 }
5043
5044 /// Shared body of `flush_finalize_locked` and `flush_finalize_now`.
5045 /// Static over `SharedFlushCtx`; the caller is responsible for
5046 /// holding `flush_lock`.
5047 #[allow(clippy::too_many_arguments)]
5048 async fn flush_finalize_body(
5049 shared: &SharedFlushCtx,
5050 old_l0_arc: Arc<RwLock<L0Buffer>>,
5051 wal_lsn: u64,
5052 mut manifest: SnapshotManifest,
5053 snapshot_id: String,
5054 initial_size: usize,
5055 initial_count: usize,
5056 start: std::time::Instant,
5057 ) -> Result<String> {
5058 // Parent-snapshot fixup. The stream phase built `manifest` with
5059 // parent_snapshot set from cached_manifest at stream time. If
5060 // OTHER flushes (sync or async) have finalized since then,
5061 // cached_manifest has advanced. Re-link this manifest to the
5062 // current cached chain so we don't orphan their data when we
5063 // overwrite cached_manifest below.
5064 let current_parent_id = shared
5065 .cached_manifest
5066 .lock()
5067 .as_ref()
5068 .map(|m| m.snapshot_id.clone());
5069 if current_parent_id.is_some() && manifest.parent_snapshot != current_parent_id {
5070 manifest.parent_snapshot = current_parent_id;
5071 metrics::counter!("uni_flush_parent_chain_fixups_total").increment(1);
5072 }
5073
5074 // H. Publish manifest (body first, then pointer — recovery is
5075 // idempotent if we crash between the two).
5076 // A fork writer must publish to a fork-scoped namespace, never the
5077 // global `catalog/latest` that the primary reopen reads (review C1).
5078 debug_assert_eq!(
5079 shared.fork_id.is_some(),
5080 shared.storage.snapshot_manager().is_fork_scoped(),
5081 "fork writer must publish to a fork-scoped snapshot namespace (review C1)"
5082 );
5083 shared
5084 .storage
5085 .snapshot_manager()
5086 .save_snapshot(&manifest)
5087 .await?;
5088 shared
5089 .storage
5090 .snapshot_manager()
5091 .set_latest_snapshot(&manifest.snapshot_id)
5092 .await?;
5093
5094 // H2. Durability barrier (review C4). `save_snapshot` / `set_latest_snapshot`
5095 // wrote the manifest body and the `catalog/latest` pointer through the
5096 // object store, which does NOT fsync. WAL truncation (K, below) removes
5097 // the only other durable copy of this flush's data, so a crash after K
5098 // but before the OS flushed those writes would lose the snapshot —
5099 // recovery could not resolve `latest`. Make them durable now (local-fs
5100 // only; remote stores provide their own durability on `put`).
5101 crate::snapshot::manager::fsync_snapshot_pointer(
5102 shared.storage.local_fs_root().as_deref(),
5103 shared.fork_id.as_ref(),
5104 &manifest.snapshot_id,
5105 )
5106 .map_err(|e| {
5107 anyhow!(
5108 "fsync snapshot {} before WAL truncate: {}",
5109 manifest.snapshot_id,
5110 e
5111 )
5112 })?;
5113
5114 // I. Cache manifest for next flush to avoid re-reading from object store.
5115 *shared.cached_manifest.lock() = Some(manifest.clone());
5116
5117 // L. Invalidate the property cache BEFORE removing the flushed buffer
5118 // from the L0 chain (Bug #10). `clear_cache` has no dependency on the
5119 // complete_flush (J) / WAL-truncate (K) steps below, so clearing it
5120 // first closes the non-monotonic-read window: once the buffer leaves
5121 // the L0 chain at J a freshly-written value would otherwise miss the
5122 // chain and fall through to a stale cache entry. By the time finalize
5123 // runs the streamed rows are already durable in L1, so a post-clear
5124 // read falls through to fresh storage instead. The finalizer holds
5125 // `flush_lock` throughout, so reordering L ahead of J is safe.
5126 if let Some(ref pm) = shared.property_manager {
5127 pm.clear_cache().await;
5128 }
5129
5130 // J. Complete flush: remove old L0 from pending_flush. MUST happen
5131 // BEFORE WAL truncation so min_pending_wal_lsn is accurate.
5132 shared.l0_manager.complete_flush(&old_l0_arc);
5133
5134 // Test-only seam (no-op without the `failpoints` feature): pause AFTER
5135 // complete_flush removed the buffer from the L0 chain (J) but BEFORE
5136 // WAL truncation (K). The property cache is already cleared (L moved
5137 // ahead of J above), so a read in this window falls through to fresh
5138 // L1 storage rather than a stale cache entry (Bug #10 — non-monotonic
5139 // read after flush finalize).
5140 fail::fail_point!("flush::after-complete-before-cache-clear");
5141
5142 // K. Truncate WAL up to the safe LSN. The floor is the START watermark
5143 // of any OTHER pending flush (this flush's own buffer was removed from
5144 // pending by `complete_flush` in J, so it is excluded): a pending — e.g.
5145 // failed — flush's committed entries live above its start and are not yet
5146 // in L1, so truncating to its high watermark would delete its own data
5147 // (the lost-commit-on-graceful-close bug).
5148 let wal_handle = shared.l0_manager.get_current().read().wal.clone();
5149 if let Some(w) = wal_handle {
5150 let safe_lsn = shared
5151 .l0_manager
5152 .min_pending_wal_lsn_start(&old_l0_arc)
5153 .map_or(wal_lsn, |floor| floor.min(wal_lsn));
5154 w.truncate_before(safe_lsn).await?;
5155 }
5156
5157 // M. Reset last flush time for time-based auto-flush.
5158 *shared.last_flush_time.lock() = std::time::Instant::now();
5159
5160 info!(
5161 snapshot_id,
5162 mutations_count = initial_count,
5163 size_bytes = initial_size,
5164 "L0 flush to L1 completed successfully"
5165 );
5166 metrics::histogram!("uni_flush_duration_seconds").record(start.elapsed().as_secs_f64());
5167 metrics::counter!("uni_flush_bytes_total").increment(initial_size as u64);
5168 metrics::counter!("uni_flush_rows_total").increment(initial_count as u64);
5169
5170 // P. Increment flush generation counter for write throttling.
5171 {
5172 let mut status = uni_common::sync::acquire_mutex(
5173 &shared.storage.compaction_status,
5174 "compaction_status",
5175 )?;
5176 status.l1_runs += 1;
5177 }
5178
5179 // Q. Trigger CSR compaction if enough frozen segments have accumulated.
5180 let am = shared.adjacency_manager.clone();
5181 if am.should_compact(shared.compaction_config.frozen_segments_compact_threshold) {
5182 let previous_still_running = {
5183 let guard = shared.compaction_handle.read();
5184 guard.as_ref().is_some_and(|h| !h.is_finished())
5185 };
5186 if previous_still_running {
5187 info!("Skipping compaction: previous compaction still in progress");
5188 } else {
5189 let handle = tokio::spawn(async move {
5190 am.compact();
5191 });
5192 *shared.compaction_handle.write() = Some(handle);
5193 }
5194 }
5195
5196 // R. Post-flush: check if any indexes need rebuilding based on thresholds.
5197 if shared.auto_rebuild_enabled
5198 && let Some(rebuild_mgr) = shared.index_rebuild_manager.get()
5199 {
5200 Self::schedule_index_rebuilds_if_needed_static(
5201 &manifest,
5202 rebuild_mgr.clone(),
5203 shared.schema_manager.clone(),
5204 shared.index_rebuild_config.clone(),
5205 );
5206 }
5207
5208 // S. Emit fork-fragment observability after a successful forked flush.
5209 Self::tick_fork_fragment_observability_static(
5210 shared.fork_id,
5211 shared.fork_flush_count.clone(),
5212 shared.fork_fragment_warn_fired.clone(),
5213 shared.fork_fragment_warn_threshold,
5214 );
5215
5216 Ok(snapshot_id)
5217 }
5218
5219 /// Increment fork-flush bookkeeping and fire the fragment warn
5220 /// once if the threshold is crossed.
5221 ///
5222 /// Each flush typically appends ~1 fragment per touched dataset on
5223 /// the fork's branches; without compaction (deferred to Phase 5)
5224 /// long-lived heavy-write forks degrade. The flush count is a
5225 /// proxy for actual fragment growth — reading
5226 /// `Dataset::manifest().fragments.len()` per dataset would add a
5227 /// per-flush object-store roundtrip on the hot commit path, which
5228 /// is too costly for a purely observational guard rail.
5229 ///
5230 /// No-op for primary writers (`fork_id == None`).
5231 #[allow(dead_code)] // called by tests; production path uses _static
5232 pub(crate) fn tick_fork_fragment_observability(&self) {
5233 Self::tick_fork_fragment_observability_static(
5234 self.fork_id,
5235 self.fork_flush_count.clone(),
5236 self.fork_fragment_warn_fired.clone(),
5237 self.config.fork_fragment_warn_threshold,
5238 );
5239 }
5240
5241 /// Static variant of [`Writer::tick_fork_fragment_observability`].
5242 /// Used by the async-flush finalize path, where we hold a
5243 /// [`SharedFlushCtx`] bundle of Arcs rather than `&Writer`.
5244 pub(crate) fn tick_fork_fragment_observability_static(
5245 fork_id: Option<ForkId>,
5246 fork_flush_count: Arc<AtomicU64>,
5247 fork_fragment_warn_fired: Arc<AtomicBool>,
5248 warn_threshold: usize,
5249 ) {
5250 let Some(fork_id) = fork_id else { return };
5251 // `Relaxed` is sufficient: observational counter, no synchronizes-with.
5252 let new_count = fork_flush_count.fetch_add(1, Ordering::Relaxed) + 1;
5253 let fork_label = fork_id.to_string();
5254 metrics::gauge!(
5255 "uni_fork_l1_flushes",
5256 "fork" => fork_label.clone(),
5257 )
5258 .set(new_count as f64);
5259 let threshold = warn_threshold as u64;
5260 if !fork_fragment_warn_fired.load(Ordering::Relaxed)
5261 && threshold > 0
5262 && new_count >= threshold
5263 {
5264 fork_fragment_warn_fired.store(true, Ordering::Relaxed);
5265 tracing::warn!(
5266 fork = %fork_label,
5267 flush_count = new_count,
5268 threshold,
5269 "fork has exceeded the L1 flush-count threshold; \
5270 fork compaction is deferred to Phase 5 — consider \
5271 drop+recreate or promotion to bound fragment growth"
5272 );
5273 }
5274 }
5275
5276 /// Check rebuild thresholds and schedule background index rebuilds for
5277 /// labels that exceed growth or age limits. Marks affected indexes as
5278 /// `Stale` and spawns an async task to schedule the rebuild.
5279 #[allow(dead_code)] // production path uses _static; kept as the
5280 // documented instance entry point.
5281 fn schedule_index_rebuilds_if_needed(
5282 &self,
5283 manifest: &SnapshotManifest,
5284 rebuild_mgr: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
5285 ) {
5286 Self::schedule_index_rebuilds_if_needed_static(
5287 manifest,
5288 rebuild_mgr,
5289 self.schema_manager.clone(),
5290 self.config.index_rebuild.clone(),
5291 );
5292 }
5293
5294 /// Static variant of [`Writer::schedule_index_rebuilds_if_needed`].
5295 /// Used by the async-flush finalize path, where we hold the
5296 /// [`SchemaManager`] via `SharedFlushCtx` rather than `&Writer`.
5297 pub(crate) fn schedule_index_rebuilds_if_needed_static(
5298 manifest: &SnapshotManifest,
5299 rebuild_mgr: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
5300 schema_manager: Arc<uni_common::core::schema::SchemaManager>,
5301 index_rebuild_config: uni_common::config::IndexRebuildConfig,
5302 ) {
5303 let checker =
5304 crate::storage::index_rebuild::RebuildTriggerChecker::new(index_rebuild_config);
5305 let schema = schema_manager.schema();
5306 let labels = checker.labels_needing_rebuild(manifest, &schema.indexes);
5307
5308 if labels.is_empty() {
5309 return;
5310 }
5311
5312 // Mark affected indexes as Stale
5313 for label in &labels {
5314 for idx in &schema.indexes {
5315 if idx.label() == label {
5316 let _ = schema_manager.update_index_metadata(idx.name(), |m| {
5317 m.status = uni_common::core::schema::IndexStatus::Stale;
5318 });
5319 }
5320 }
5321 }
5322
5323 tokio::spawn(async move {
5324 if let Err(e) = rebuild_mgr.schedule(labels).await {
5325 tracing::warn!("Failed to schedule index rebuild: {e}");
5326 }
5327 });
5328 }
5329}
5330
5331/// `FinalizeFn` implementation that the `FlushCoordinator` invokes from
5332/// its single-task finalizer loop. Unit struct on purpose: it must NOT
5333/// hold `Arc<Writer>` (that would create a reference cycle Writer ->
5334/// FlushCoordinator -> Arc<dyn FinalizeFn> -> Writer). All state needed
5335/// for finalize travels in via `SharedFlushCtx`.
5336pub(crate) struct WriterFinalizer;
5337
5338impl FinalizeFn for WriterFinalizer {
5339 fn finalize<'a>(
5340 &'a self,
5341 rotated: RotatedFlush,
5342 outcome: AsyncFlushOutcome,
5343 shared: SharedFlushCtx,
5344 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<String>> + Send + 'a>> {
5345 Box::pin(async move {
5346 // Read initial_size / initial_count from the rotated L0 so
5347 // we don't have to plumb them through the coordinator
5348 // submission. The buffer is still alive in pending_flush
5349 // until `complete_flush` (J) below pops it.
5350 let (initial_size, initial_count) = {
5351 let l0 = rotated.old_l0_arc.read();
5352 (l0.estimated_size, l0.mutation_count)
5353 };
5354 let result = Writer::flush_finalize_now(
5355 shared,
5356 rotated.old_l0_arc.clone(),
5357 rotated.wal_lsn,
5358 outcome.new_manifest,
5359 outcome.snapshot_id,
5360 initial_size,
5361 initial_count,
5362 std::time::Instant::now(),
5363 )
5364 .await;
5365 // `rotated` (permit + flush_in_progress_guard) drops here.
5366 drop(rotated.permit);
5367 result
5368 })
5369 }
5370
5371 fn finalize_failure<'a>(
5372 &'a self,
5373 rotated: RotatedFlush,
5374 err: anyhow::Error,
5375 _shared: SharedFlushCtx,
5376 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Error> + Send + 'a>> {
5377 Box::pin(async move {
5378 tracing::warn!(
5379 error = %err,
5380 seq = rotated.seq,
5381 "async flush stream failed; old L0 remains in pending_flush, \
5382 WAL retains its data, recovery via WAL replay on restart"
5383 );
5384 metrics::counter!("uni_flush_failures_total").increment(1);
5385 // Permit + guard drop here so back-pressure releases even on
5386 // failure.
5387 drop(rotated.permit);
5388 err
5389 })
5390 }
5391}
5392
5393#[cfg(test)]
5394mod tests {
5395 use super::*;
5396 use tempfile::tempdir;
5397
5398 /// Test that commit_transaction writes mutations to WAL before merging to main L0.
5399 /// This verifies fix for issue #137 (transaction commit atomicity).
5400 #[tokio::test]
5401 async fn test_commit_transaction_wal_before_merge() -> Result<()> {
5402 use crate::runtime::wal::WriteAheadLog;
5403 use crate::storage::manager::StorageManager;
5404 use object_store::local::LocalFileSystem;
5405 use object_store::path::Path as ObjectStorePath;
5406 use uni_common::core::schema::SchemaManager;
5407
5408 let dir = tempdir()?;
5409 let path = dir.path().to_str().unwrap();
5410 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5411 let schema_path = ObjectStorePath::from("schema.json");
5412
5413 let schema_manager =
5414 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5415 let _label_id = schema_manager.add_label("Test")?;
5416 schema_manager.save().await?;
5417
5418 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5419
5420 // Create WAL for main L0
5421 let wal_path = ObjectStorePath::from("wal");
5422 let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
5423
5424 let writer = Writer::new_with_config(
5425 storage.clone(),
5426 schema_manager.clone(),
5427 1,
5428 UniConfig::default(),
5429 Some(wal),
5430 None,
5431 )
5432 .await?;
5433
5434 // Begin transaction — create a transaction L0
5435 let tx_l0 = writer.create_transaction_l0();
5436
5437 // Insert data in transaction
5438 let vid_a = writer.next_vid().await?;
5439 let vid_b = writer.next_vid().await?;
5440
5441 let mut props = std::collections::HashMap::new();
5442 props.insert("test".to_string(), Value::String("data".to_string()));
5443
5444 writer
5445 .insert_vertex_with_labels(vid_a, props.clone(), &["Test".to_string()], Some(&tx_l0))
5446 .await?;
5447 writer
5448 .insert_vertex_with_labels(
5449 vid_b,
5450 std::collections::HashMap::new(),
5451 &["Test".to_string()],
5452 Some(&tx_l0),
5453 )
5454 .await?;
5455
5456 let eid = writer.next_eid(1).await?;
5457 writer
5458 .insert_edge(
5459 vid_a,
5460 vid_b,
5461 1,
5462 eid,
5463 std::collections::HashMap::new(),
5464 None,
5465 Some(&tx_l0),
5466 )
5467 .await?;
5468
5469 // Get WAL before commit
5470 let l0 = writer.l0_manager.get_current();
5471 let wal = l0.read().wal.clone().expect("Main L0 should have WAL");
5472 let mutations_before = wal.replay().await?;
5473 let count_before = mutations_before.len();
5474
5475 // Commit transaction - this should write to WAL first
5476 let writer = Arc::new(writer);
5477 writer.commit_transaction_l0(tx_l0).await?;
5478
5479 // Verify WAL has the new mutations
5480 let mutations_after = wal.replay().await?;
5481 assert!(
5482 mutations_after.len() > count_before,
5483 "WAL should contain transaction mutations after commit"
5484 );
5485
5486 // Verify mutations are in correct order: vertices first, then edges
5487 let new_mutations: Vec<_> = mutations_after.into_iter().skip(count_before).collect();
5488
5489 let mut saw_vertex_a = false;
5490 let mut saw_vertex_b = false;
5491 let mut saw_edge = false;
5492
5493 for mutation in &new_mutations {
5494 match mutation {
5495 crate::runtime::wal::Mutation::InsertVertex { vid, .. } => {
5496 if *vid == vid_a {
5497 saw_vertex_a = true;
5498 }
5499 if *vid == vid_b {
5500 saw_vertex_b = true;
5501 }
5502 // Vertices should come before edges
5503 assert!(!saw_edge, "Vertices should be logged to WAL before edges");
5504 }
5505 crate::runtime::wal::Mutation::InsertEdge { eid: e, .. } => {
5506 if *e == eid {
5507 saw_edge = true;
5508 }
5509 // Edges should come after vertices
5510 assert!(
5511 saw_vertex_a && saw_vertex_b,
5512 "Edge should be logged after both vertices"
5513 );
5514 }
5515 _ => {}
5516 }
5517 }
5518
5519 assert!(saw_vertex_a, "Vertex A should be in WAL");
5520 assert!(saw_vertex_b, "Vertex B should be in WAL");
5521 assert!(saw_edge, "Edge should be in WAL");
5522
5523 // Verify data is also in main L0
5524 let l0_read = l0.read();
5525 assert!(
5526 l0_read.vertex_properties.contains_key(&vid_a),
5527 "Vertex A should be in main L0"
5528 );
5529 assert!(
5530 l0_read.vertex_properties.contains_key(&vid_b),
5531 "Vertex B should be in main L0"
5532 );
5533 assert!(
5534 l0_read.edge_endpoints.contains_key(&eid),
5535 "Edge should be in main L0"
5536 );
5537
5538 Ok(())
5539 }
5540
5541 /// Test that failed WAL flush leaves transaction intact for retry or rollback.
5542 #[tokio::test]
5543 async fn test_commit_transaction_wal_failure_rollback() -> Result<()> {
5544 use crate::runtime::wal::WriteAheadLog;
5545 use crate::storage::manager::StorageManager;
5546 use object_store::local::LocalFileSystem;
5547 use object_store::path::Path as ObjectStorePath;
5548 use uni_common::core::schema::SchemaManager;
5549
5550 let dir = tempdir()?;
5551 let path = dir.path().to_str().unwrap();
5552 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5553 let schema_path = ObjectStorePath::from("schema.json");
5554
5555 let schema_manager =
5556 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5557 let _label_id = schema_manager.add_label("Test")?;
5558 let _baseline_label_id = schema_manager.add_label("Baseline")?;
5559 let _txdata_label_id = schema_manager.add_label("TxData")?;
5560 schema_manager.save().await?;
5561
5562 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5563
5564 // Create WAL for main L0
5565 let wal_path = ObjectStorePath::from("wal");
5566 let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
5567
5568 let writer = Writer::new_with_config(
5569 storage.clone(),
5570 schema_manager.clone(),
5571 1,
5572 UniConfig::default(),
5573 Some(wal),
5574 None,
5575 )
5576 .await?;
5577
5578 // Insert baseline data (outside transaction)
5579 let baseline_vid = writer.next_vid().await?;
5580 writer
5581 .insert_vertex_with_labels(
5582 baseline_vid,
5583 [("baseline".to_string(), Value::Bool(true))]
5584 .into_iter()
5585 .collect(),
5586 &["Baseline".to_string()],
5587 None,
5588 )
5589 .await?;
5590
5591 // Begin transaction — create a transaction L0
5592 let tx_l0 = writer.create_transaction_l0();
5593
5594 // Insert data in transaction
5595 let tx_vid = writer.next_vid().await?;
5596 writer
5597 .insert_vertex_with_labels(
5598 tx_vid,
5599 [("tx_data".to_string(), Value::Bool(true))]
5600 .into_iter()
5601 .collect(),
5602 &["TxData".to_string()],
5603 Some(&tx_l0),
5604 )
5605 .await?;
5606
5607 // Capture main L0 state before rollback
5608 let l0 = writer.l0_manager.get_current();
5609 let vertex_count_before = l0.read().vertex_properties.len();
5610
5611 // Rollback transaction (simulating what would happen after WAL flush failure)
5612 drop(tx_l0);
5613
5614 // Verify main L0 is unchanged
5615 let vertex_count_after = l0.read().vertex_properties.len();
5616 assert_eq!(
5617 vertex_count_before, vertex_count_after,
5618 "Main L0 should not change after rollback"
5619 );
5620
5621 // Baseline should still be present
5622 assert!(
5623 l0.read().vertex_properties.contains_key(&baseline_vid),
5624 "Baseline data should remain"
5625 );
5626
5627 // Transaction data should NOT be in main L0
5628 assert!(
5629 !l0.read().vertex_properties.contains_key(&tx_vid),
5630 "Transaction data should not be in main L0 after rollback"
5631 );
5632
5633 Ok(())
5634 }
5635
5636 /// Test that batch insert with shared labels does not clone labels per vertex.
5637 /// This verifies fix for issue #161 (redundant label cloning).
5638 #[tokio::test]
5639 async fn test_batch_insert_shared_labels() -> Result<()> {
5640 use crate::storage::manager::StorageManager;
5641 use object_store::local::LocalFileSystem;
5642 use object_store::path::Path as ObjectStorePath;
5643 use uni_common::core::schema::SchemaManager;
5644
5645 let dir = tempdir()?;
5646 let path = dir.path().to_str().unwrap();
5647 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5648 let schema_path = ObjectStorePath::from("schema.json");
5649
5650 let schema_manager =
5651 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5652 let _label_id = schema_manager.add_label("Person")?;
5653 schema_manager.save().await?;
5654
5655 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5656
5657 let writer = Writer::new(storage.clone(), schema_manager.clone(), 1).await?;
5658
5659 // Shared labels - should not be cloned per vertex
5660 let labels = &["Person".to_string()];
5661
5662 // Insert batch of vertices with same labels
5663 let mut vids = Vec::new();
5664 for i in 0..100 {
5665 let vid = writer.next_vid().await?;
5666 let mut props = std::collections::HashMap::new();
5667 props.insert("id".to_string(), Value::Int(i));
5668 writer
5669 .insert_vertex_with_labels(vid, props, labels, None)
5670 .await?;
5671 vids.push(vid);
5672 }
5673
5674 // Verify all vertices have the correct labels
5675 let l0 = writer.l0_manager.get_current();
5676 for vid in vids {
5677 let l0_guard = l0.read();
5678 let vertex_labels = l0_guard.vertex_labels.get(&vid);
5679 assert!(vertex_labels.is_some(), "Vertex should have labels");
5680 assert_eq!(
5681 vertex_labels.unwrap(),
5682 &vec!["Person".to_string()],
5683 "Labels should match"
5684 );
5685 }
5686
5687 Ok(())
5688 }
5689
5690 /// Test that estimated_size tracks mutations correctly and approximates size_bytes().
5691 /// This verifies fix for issue #147 (O(V+E) size_bytes() in metrics).
5692 #[tokio::test]
5693 async fn test_estimated_size_tracks_mutations() -> Result<()> {
5694 use crate::storage::manager::StorageManager;
5695 use object_store::local::LocalFileSystem;
5696 use object_store::path::Path as ObjectStorePath;
5697 use uni_common::core::schema::SchemaManager;
5698
5699 let dir = tempdir()?;
5700 let path = dir.path().to_str().unwrap();
5701 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5702 let schema_path = ObjectStorePath::from("schema.json");
5703
5704 let schema_manager =
5705 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5706 let _label_id = schema_manager.add_label("Test")?;
5707 schema_manager.save().await?;
5708
5709 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5710
5711 let writer = Writer::new(storage.clone(), schema_manager.clone(), 1).await?;
5712
5713 let l0 = writer.l0_manager.get_current();
5714
5715 // Initial state should be empty
5716 let initial_estimated = l0.read().estimated_size;
5717 let initial_actual = l0.read().size_bytes();
5718 assert_eq!(initial_estimated, 0, "Initial estimated_size should be 0");
5719 assert_eq!(initial_actual, 0, "Initial size_bytes should be 0");
5720
5721 // Insert vertices with properties
5722 let mut vids = Vec::new();
5723 for i in 0..10 {
5724 let vid = writer.next_vid().await?;
5725 let mut props = std::collections::HashMap::new();
5726 props.insert("name".to_string(), Value::String(format!("vertex_{}", i)));
5727 props.insert("index".to_string(), Value::Int(i));
5728 writer
5729 .insert_vertex_with_labels(vid, props, &[], None)
5730 .await?;
5731 vids.push(vid);
5732 }
5733
5734 // Verify estimated_size grew
5735 let after_vertices_estimated = l0.read().estimated_size;
5736 let after_vertices_actual = l0.read().size_bytes();
5737 assert!(
5738 after_vertices_estimated > 0,
5739 "estimated_size should grow after insertions"
5740 );
5741
5742 // Verify estimated_size is within reasonable bounds of actual size (within 2x)
5743 let ratio = after_vertices_estimated as f64 / after_vertices_actual as f64;
5744 assert!(
5745 (0.5..=2.0).contains(&ratio),
5746 "estimated_size ({}) should be within 2x of size_bytes ({}), ratio: {}",
5747 after_vertices_estimated,
5748 after_vertices_actual,
5749 ratio
5750 );
5751
5752 // Insert edges with a simple edge type
5753 let edge_type = 1u32;
5754 for i in 0..9 {
5755 let eid = writer.next_eid(edge_type).await?;
5756 writer
5757 .insert_edge(
5758 vids[i],
5759 vids[i + 1],
5760 edge_type,
5761 eid,
5762 std::collections::HashMap::new(),
5763 Some("NEXT".to_string()),
5764 None,
5765 )
5766 .await?;
5767 }
5768
5769 // Verify estimated_size grew further
5770 let after_edges_estimated = l0.read().estimated_size;
5771 let after_edges_actual = l0.read().size_bytes();
5772 assert!(
5773 after_edges_estimated > after_vertices_estimated,
5774 "estimated_size should grow after edge insertions"
5775 );
5776
5777 // Verify still within reasonable bounds
5778 let ratio = after_edges_estimated as f64 / after_edges_actual as f64;
5779 assert!(
5780 (0.5..=2.0).contains(&ratio),
5781 "estimated_size ({}) should be within 2x of size_bytes ({}), ratio: {}",
5782 after_edges_estimated,
5783 after_edges_actual,
5784 ratio
5785 );
5786
5787 Ok(())
5788 }
5789
5790 /// Test that flushing WAL on a writer with no mutations succeeds cleanly.
5791 #[tokio::test]
5792 async fn test_flush_wal_empty_l0_is_noop() -> Result<()> {
5793 use crate::runtime::wal::WriteAheadLog;
5794 use crate::storage::manager::StorageManager;
5795 use object_store::local::LocalFileSystem;
5796 use object_store::path::Path as ObjectStorePath;
5797 use uni_common::core::schema::SchemaManager;
5798
5799 let dir = tempdir()?;
5800 let path = dir.path().to_str().unwrap();
5801 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5802 let schema_path = ObjectStorePath::from("schema.json");
5803
5804 let schema_manager =
5805 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5806 schema_manager.save().await?;
5807
5808 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5809
5810 let wal_path = ObjectStorePath::from("wal");
5811 let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
5812
5813 let writer = Writer::new_with_config(
5814 storage.clone(),
5815 schema_manager.clone(),
5816 1,
5817 UniConfig::default(),
5818 Some(wal.clone()),
5819 None,
5820 )
5821 .await?;
5822
5823 // Flush with no mutations — should succeed cleanly
5824 let lsn = writer.flush_wal().await?;
5825 // LSN should be 0 or 1 (no real mutations flushed)
5826 assert!(lsn <= 1, "Empty flush should produce low LSN, got {}", lsn);
5827
5828 Ok(())
5829 }
5830
5831 /// Test that transaction data does not leak into main L0 without commit.
5832 #[tokio::test]
5833 async fn test_transaction_isolation_without_commit() -> Result<()> {
5834 use crate::runtime::wal::WriteAheadLog;
5835 use crate::storage::manager::StorageManager;
5836 use object_store::local::LocalFileSystem;
5837 use object_store::path::Path as ObjectStorePath;
5838 use uni_common::core::schema::SchemaManager;
5839
5840 let dir = tempdir()?;
5841 let path = dir.path().to_str().unwrap();
5842 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5843 let schema_path = ObjectStorePath::from("schema.json");
5844
5845 let schema_manager =
5846 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5847 let _label_id = schema_manager.add_label("Person")?;
5848 schema_manager.save().await?;
5849
5850 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5851
5852 let wal_path = ObjectStorePath::from("wal");
5853 let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
5854
5855 let writer = Writer::new_with_config(
5856 storage.clone(),
5857 schema_manager.clone(),
5858 1,
5859 UniConfig::default(),
5860 Some(wal),
5861 None,
5862 )
5863 .await?;
5864
5865 // Create transaction L0
5866 let tx_l0 = writer.create_transaction_l0();
5867
5868 // Insert vertex into transaction L0
5869 let vid = writer.next_vid().await?;
5870 writer
5871 .insert_vertex_with_labels(
5872 vid,
5873 [("name".to_string(), Value::String("Ghost".to_string()))]
5874 .into_iter()
5875 .collect(),
5876 &["Person".to_string()],
5877 Some(&tx_l0),
5878 )
5879 .await?;
5880
5881 // Verify data is in transaction L0
5882 assert!(
5883 tx_l0.read().vertex_properties.contains_key(&vid),
5884 "Transaction L0 should contain the vertex"
5885 );
5886
5887 // Verify data is NOT in main L0
5888 let main_l0 = writer.l0_manager.get_current();
5889 assert!(
5890 !main_l0.read().vertex_properties.contains_key(&vid),
5891 "Main L0 should NOT contain uncommitted transaction data"
5892 );
5893
5894 // Drop transaction without committing — data should be lost
5895 drop(tx_l0);
5896
5897 // Main L0 still should not have it
5898 assert!(
5899 !main_l0.read().vertex_properties.contains_key(&vid),
5900 "Main L0 should remain clean after dropped transaction"
5901 );
5902
5903 Ok(())
5904 }
5905
5906 /// Phase 2 Day 12: the fork-fragment warn fires exactly once when
5907 /// the flush count crosses the configured threshold and stays
5908 /// silent on subsequent flushes for the lifetime of the writer.
5909 /// Primary writers (`fork_id == None`) never fire it.
5910 ///
5911 /// Tested directly against `tick_fork_fragment_observability` so
5912 /// the contract is locked in independently of the broader
5913 /// `flush_to_l1` path (the end-to-end fork-flush path is blocked
5914 /// on Day 10's on-the-fly schema overlay growth).
5915 #[tokio::test]
5916 async fn fork_fragment_warn_fires_once_then_silences() -> Result<()> {
5917 use crate::storage::manager::StorageManager;
5918 use object_store::local::LocalFileSystem;
5919 use object_store::path::Path as ObjectStorePath;
5920 use uni_common::core::fork::ForkId;
5921 use uni_common::core::schema::SchemaManager;
5922
5923 let dir = tempdir()?;
5924 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5925 let schema_path = ObjectStorePath::from("schema.json");
5926 let schema_manager =
5927 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5928 let storage = Arc::new(
5929 StorageManager::new(dir.path().to_str().unwrap(), schema_manager.clone()).await?,
5930 );
5931
5932 let config = UniConfig {
5933 fork_fragment_warn_threshold: 3,
5934 ..Default::default()
5935 };
5936 let mut writer =
5937 Writer::new_with_config(storage, schema_manager, 1, config, None, None).await?;
5938
5939 // Primary path: never fires.
5940 for _ in 0..10 {
5941 writer.tick_fork_fragment_observability();
5942 }
5943 assert!(!writer.fork_fragment_warn_fired.load(Ordering::Relaxed));
5944 assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 0);
5945
5946 // Fork path: tag and tick. Below threshold → no fire.
5947 writer.fork_id = Some(ForkId::new());
5948 writer.tick_fork_fragment_observability();
5949 writer.tick_fork_fragment_observability();
5950 assert!(!writer.fork_fragment_warn_fired.load(Ordering::Relaxed));
5951 assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 2);
5952
5953 // Crossing threshold → fires once.
5954 writer.tick_fork_fragment_observability();
5955 assert!(writer.fork_fragment_warn_fired.load(Ordering::Relaxed));
5956 assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 3);
5957
5958 // Subsequent ticks bump the gauge but do not re-fire.
5959 let fired_after = writer.fork_fragment_warn_fired.load(Ordering::Relaxed);
5960 for _ in 0..5 {
5961 writer.tick_fork_fragment_observability();
5962 }
5963 assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 8);
5964 assert_eq!(
5965 writer.fork_fragment_warn_fired.load(Ordering::Relaxed),
5966 fired_after
5967 );
5968
5969 Ok(())
5970 }
5971
5972 /// The hot-path mutators must not write to any `Writer` struct field.
5973 /// Phase 2 of the refactor
5974 /// gave them `&self` receivers, which the compiler enforces against
5975 /// direct `self.x = y` assignment — but interior-mutable writes
5976 /// (Mutex/Atomic/OnceLock) still compile. This regression test snapshots
5977 /// every potentially-writable field, calls each hot-path mutator, and
5978 /// asserts no field changed.
5979 ///
5980 /// Cold-path methods (`flush_to_l1`, `commit_transaction_l0`,
5981 /// `tick_fork_fragment_observability`) DO mutate fields by design and
5982 /// are intentionally out of scope here.
5983 #[tokio::test]
5984 async fn hot_path_mutators_do_not_change_writer_fields() -> Result<()> {
5985 use crate::storage::manager::StorageManager;
5986 use object_store::local::LocalFileSystem;
5987 use object_store::path::Path as ObjectStorePath;
5988 use uni_common::core::schema::SchemaManager;
5989
5990 let dir = tempdir()?;
5991 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5992 let schema_path = ObjectStorePath::from("schema.json");
5993 let schema_manager =
5994 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5995 schema_manager.add_label("Person")?;
5996 schema_manager.save().await?;
5997 let storage = Arc::new(
5998 StorageManager::new(dir.path().to_str().unwrap(), schema_manager.clone()).await?,
5999 );
6000
6001 let writer =
6002 Writer::new_with_config(storage, schema_manager, 1, UniConfig::default(), None, None)
6003 .await?;
6004
6005 /// Captures every `Writer` field that *could* be written by a
6006 /// hot-path mutator (i.e., every non-Arc, non-immutable-after-
6007 /// construction field). Arc'd substructures (`l0_manager`,
6008 /// `storage`, etc.) are intentionally not checked — they are
6009 /// re-pointed only at construction.
6010 #[derive(Debug, PartialEq)]
6011 struct Snapshot {
6012 last_flush_time: std::time::Instant,
6013 cached_manifest_some: bool,
6014 fork_flush_count: u64,
6015 fork_fragment_warn_fired: bool,
6016 xervo_runtime_some: bool,
6017 index_rebuild_manager_some: bool,
6018 fork_id: Option<ForkId>,
6019 }
6020
6021 fn snap(w: &Writer) -> Snapshot {
6022 Snapshot {
6023 last_flush_time: *w.last_flush_time.lock(),
6024 cached_manifest_some: w.cached_manifest.lock().is_some(),
6025 fork_flush_count: w.fork_flush_count.load(Ordering::Relaxed),
6026 fork_fragment_warn_fired: w.fork_fragment_warn_fired.load(Ordering::Relaxed),
6027 xervo_runtime_some: w.xervo_runtime.get().is_some(),
6028 index_rebuild_manager_some: w.index_rebuild_manager.get().is_some(),
6029 fork_id: w.fork_id,
6030 }
6031 }
6032
6033 // 1. insert_vertex_with_labels
6034 let before = snap(&writer);
6035 let vid = writer.next_vid().await?;
6036 writer
6037 .insert_vertex_with_labels(vid, Properties::new(), &["Person".to_string()], None)
6038 .await?;
6039 assert_eq!(
6040 snap(&writer),
6041 before,
6042 "insert_vertex_with_labels mutated a Writer field"
6043 );
6044
6045 // 2. insert_vertices_batch
6046 let before = snap(&writer);
6047 let vids = writer.allocate_vids(2).await?;
6048 writer
6049 .insert_vertices_batch(
6050 vids,
6051 vec![Properties::new(), Properties::new()],
6052 vec!["Person".into()],
6053 None,
6054 )
6055 .await?;
6056 assert_eq!(
6057 snap(&writer),
6058 before,
6059 "insert_vertices_batch mutated a Writer field"
6060 );
6061
6062 // 3. delete_vertex
6063 let before = snap(&writer);
6064 writer.delete_vertex(vid, None, None).await?;
6065 assert_eq!(
6066 snap(&writer),
6067 before,
6068 "delete_vertex mutated a Writer field"
6069 );
6070
6071 // (insert_edge / delete_edge are skipped here: their fixture cost is
6072 // disproportionate to the audit's marginal value, and the same
6073 // structural argument plus the compiler-enforced `&self` covers them.)
6074
6075 Ok(())
6076 }
6077}