uni_store/runtime/writer.rs
1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::runtime::context::QueryContext;
5use crate::runtime::flush_coordinator::{
6 FinalizeFn, FlushCoordinator, FlushOutcome as AsyncFlushOutcome, RotatedFlush, SharedFlushCtx,
7};
8use crate::runtime::id_allocator::IdAllocator;
9use crate::runtime::l0::{L0Buffer, serialize_constraint_key};
10use crate::runtime::l0_manager::L0Manager;
11use crate::runtime::property_manager::PropertyManager;
12use crate::runtime::wal::WriteAheadLog;
13use crate::storage::adjacency_manager::AdjacencyManager;
14use crate::storage::delta::{L1Entry, Op};
15use crate::storage::main_edge::MainEdgeDataset;
16use crate::storage::main_vertex::MainVertexDataset;
17use crate::storage::manager::StorageManager;
18use anyhow::{Result, anyhow};
19use chrono::Utc;
20use metrics;
21use parking_lot::{Mutex as PlMutex, RwLock};
22use std::collections::{BTreeMap, HashMap, HashSet};
23use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
24use std::sync::{Arc, OnceLock};
25use tracing::{debug, info, instrument};
26use uni_common::Properties;
27use uni_common::Value;
28use uni_common::config::UniConfig;
29use uni_common::core::fork::ForkId;
30use uni_common::core::id::{Eid, Vid};
31use uni_common::core::schema::{ConstraintTarget, ConstraintType, IndexDefinition};
32use uni_common::core::snapshot::{EdgeSnapshot, LabelSnapshot, SnapshotManifest};
33use uni_xervo::runtime::ModelRuntime;
34use uuid::Uuid;
35
36/// Whether `property` on `label` is a multi-vector (`List<Vector>`) column — the
37/// late-interaction (ColBERT) shape that auto-embeds per-token via the multi-vector model
38/// (issue #104). A plain `Vector` column uses the dense single-vector model.
39fn is_multivector_property(
40 schema: &uni_common::core::schema::Schema,
41 label: &str,
42 property: &str,
43) -> bool {
44 schema
45 .properties
46 .get(label)
47 .and_then(|p| p.get(property))
48 .is_some_and(|m| {
49 matches!(&m.r#type, uni_common::DataType::List(inner)
50 if matches!(**inner, uni_common::DataType::Vector { .. }))
51 })
52}
53
54/// Convert per-token embedding vectors into the stored `List<Vector>` representation:
55/// a `Value::List` whose elements are each a `Value::List<Float>` (one token vector).
56fn multivec_to_value(tokens: &[Vec<f32>]) -> Value {
57 Value::List(
58 tokens
59 .iter()
60 .map(|tok| Value::List(tok.iter().map(|f| Value::Float(*f as f64)).collect()))
61 .collect(),
62 )
63}
64
65/// One coalesced auto-embed group: all targets that share an `alias` + `source_properties`,
66/// split into dense (`Vector`) vs multi-vector (`List<Vector>`) target columns. When a group
67/// has BOTH kinds, a single hybrid inference fills both (single forward pass).
68struct EmbedGroupSpec {
69 source_properties: Vec<String>,
70 document_prefix: Option<String>,
71 dense: Vec<String>,
72 multi: Vec<String>,
73}
74
75/// Run ONE embedding inference for a coalesced group of auto-embed targets sharing an alias +
76/// source text, returning `(dense_per_text, multi_vector_per_text)` (each `Some` iff that head
77/// was requested). When both heads are needed this uses the hybrid model
78/// (`hybrid_embedder`), so a multi-functional model (e.g. BGE-M3) produces the dense + ColBERT
79/// heads from a SINGLE forward pass; otherwise it uses the single-head dense / multi-vector
80/// embedder (today's behavior for non-mixed groups).
81async fn embed_group(
82 runtime: &ModelRuntime,
83 alias: &str,
84 texts: &[&str],
85 want_dense: bool,
86 want_multi: bool,
87) -> Result<(Option<Vec<Vec<f32>>>, Option<Vec<Vec<Vec<f32>>>>)> {
88 use uni_xervo::traits::hybrid::HeadSet;
89 if want_dense && want_multi {
90 // Single-pass hybrid: one model, one inference, both heads.
91 let embedder = runtime.hybrid_embedder(alias).await?;
92 let res = embedder
93 .embed(texts, HeadSet::DENSE | HeadSet::MULTI_VECTOR)
94 .await?;
95 let dense = res.dense.ok_or_else(|| {
96 anyhow!("hybrid model '{alias}' returned no dense head for a Vector target")
97 })?;
98 let multi = res.multi_vector.ok_or_else(|| {
99 anyhow!(
100 "hybrid model '{alias}' returned no multi-vector head for a List<Vector> target"
101 )
102 })?;
103 Ok((Some(dense), Some(multi)))
104 } else if want_multi {
105 let embedder = runtime.multi_vector_embedder(alias).await?;
106 Ok((None, Some(embedder.embed(texts).await?.vectors)))
107 } else {
108 let embedder = runtime.embedding(alias).await?;
109 Ok((Some(embedder.embed(texts).await?.vectors), None))
110 }
111}
112
113/// Group a label's auto-embed configs by `(alias, source_properties)`, classifying each target
114/// column as dense vs multi-vector. A group with both kinds is a single-pass hybrid source.
115fn collect_embed_groups(
116 schema: &uni_common::core::schema::Schema,
117 label: &str,
118) -> std::collections::BTreeMap<(String, Vec<String>), EmbedGroupSpec> {
119 use std::collections::BTreeMap;
120 let mut groups: BTreeMap<(String, Vec<String>), EmbedGroupSpec> = BTreeMap::new();
121 for idx in &schema.indexes {
122 if let IndexDefinition::Vector(v_config) = idx
123 && v_config.label == label
124 && let Some(emb) = &v_config.embedding_config
125 {
126 let g = groups
127 .entry((emb.alias.clone(), emb.source_properties.clone()))
128 .or_insert_with(|| EmbedGroupSpec {
129 source_properties: emb.source_properties.clone(),
130 document_prefix: emb.document_prefix.clone(),
131 dense: Vec::new(),
132 multi: Vec::new(),
133 });
134 if is_multivector_property(schema, label, &v_config.property) {
135 g.multi.push(v_config.property.clone());
136 } else {
137 g.dense.push(v_config.property.clone());
138 }
139 }
140 }
141 groups
142}
143
144#[derive(Clone, Debug)]
145pub struct WriterConfig {
146 pub max_mutations: usize,
147 /// Enable the partial-column MergeInsert path for SET-only flushes.
148 ///
149 /// When `true`, `Writer::insert_vertex_partial` records the touched
150 /// property keys into `L0Buffer::vertex_partial_keys` and the flush
151 /// routes those VIDs through Lance `MergeInsertBuilder` with a
152 /// subset-of-schema source, skipping the read of (and write of)
153 /// the unchanged columns — including wide ones like embeddings.
154 ///
155 /// When `false`, `insert_vertex_partial` falls back to the
156 /// read-modify-write `insert_vertex_with_labels` path (preserving
157 /// bit-for-bit equivalence with prior releases). Default `false`
158 /// for the first release; flip to `true` after telemetry on the
159 /// issue #72 ingest workload confirms the win.
160 ///
161 /// See the soundness probe at
162 /// `crates/uni-store/tests/common/storage/lance_merge_insert_probe.rs`.
163 pub partial_lance_writes: bool,
164}
165
166impl Default for WriterConfig {
167 fn default() -> Self {
168 Self {
169 max_mutations: 10_000,
170 partial_lance_writes: false,
171 }
172 }
173}
174
175/// Parent state captured atomically at a fork point under `flush_lock`.
176///
177/// Holds the allocator high-water marks and every existing dataset's
178/// Lance main-branch version at the instant the parent's L0 was
179/// flushed. Because [`Writer::flush_and_capture_fork_point`] reads these
180/// while still holding `flush_lock`, no concurrent commit or flush can
181/// advance the allocator or any dataset tip between the flush and the
182/// reads. A fork built from these values therefore cannot collide VIDs
183/// with the parent nor inherit rows committed after the fork point.
184#[derive(Clone, Debug, Default)]
185pub struct ForkPoint {
186 /// Next vertex id the parent would allocate at the fork point.
187 pub vid_hwm: u64,
188 /// Next edge id the parent would allocate at the fork point.
189 pub eid_hwm: u64,
190 /// `dataset_name` → Lance main-branch version at the fork point.
191 ///
192 /// Keys use the same dataset naming as the fork branch loop
193 /// (`vertices`, `edges`, `vertices_{label}`, `deltas_{type}_{dir}`,
194 /// `adjacency_{type}_{dir}`). A dataset with no `.lance` directory
195 /// on disk at the fork point has no entry.
196 pub dataset_versions: BTreeMap<String, u64>,
197 /// Parent's MVCC version high-water-mark at the fork point: the
198 /// largest `_version` any inherited row can carry. A fork bootstraps
199 /// its own version counter to this floor so a fork transaction's
200 /// `_version <= pin` read still sees inherited (base_paths) rows,
201 /// while the fork's own writes get versions above it.
202 pub version_hwm: u64,
203}
204
205/// RAII latch on [`StorageManager::flush_in_progress`].
206///
207/// Sets the flag to `true` on construction (via CAS) and back to `false` on
208/// drop, so any `?` early-exit inside `flush_to_l1` cannot leave the flag
209/// stuck. Returns `None` if a flush is already in progress, providing
210/// forward-compatible exclusion once the outer writer-RwLock is removed in
211/// Phase 4 of the concurrent-writer refactor.
212// FlushInProgressGuard moved to storage/manager.rs so flush_coordinator.rs
213// can hold it on RotatedFlush without a writer.rs back-import cycle.
214pub use crate::storage::manager::FlushInProgressGuard;
215
216/// Output of [`Writer::flush_l0_rotate`]: the to-be-flushed L0 buffer,
217/// captured WAL LSN, current_version, and the in-progress guard whose
218/// lifetime spans the full flush (including the future async stream
219/// phase that runs on a spawned task).
220struct RotateOutput {
221 old_l0_arc: Arc<RwLock<L0Buffer>>,
222 wal_lsn: u64,
223 current_version: u64,
224 flush_in_progress_guard: FlushInProgressGuard,
225}
226
227/// Project a property map to a subset selected by `keys`. Used to
228/// run `touched_needs_full_read` against just the SET-touched keys
229/// when the caller passes a fully-merged `props` map.
230fn props_subset(props: &Properties, keys: &HashSet<String>) -> Properties {
231 let mut out = Properties::new();
232 for k in keys {
233 if let Some(v) = props.get(k) {
234 out.insert(k.clone(), v.clone());
235 }
236 }
237 out
238}
239
240/// Join a storage base URI and a dataset name into a `.lance` URI.
241///
242/// Mirrors the fork branch loop's `join_uri` so the versions captured
243/// by [`Writer::flush_and_capture_fork_point`] key the exact same
244/// datasets the fork later branches.
245fn join_lance_uri(base: &str, dataset: &str) -> String {
246 if base.ends_with('/') {
247 format!("{base}{dataset}.lance")
248 } else {
249 format!("{base}/{dataset}.lance")
250 }
251}
252
253/// Cheap on-disk existence check for a dataset `.lance` directory.
254///
255/// Local-fs heuristic: a URI with a `://` scheme is assumed remote and
256/// reported present, deferring the real check to `current_version`.
257/// Mirrors the fork branch loop's `path_exists`.
258fn lance_path_exists(uri: &str) -> bool {
259 if uri.contains("://") {
260 return true;
261 }
262 std::path::Path::new(uri).exists()
263}
264
265/// Output of [`Writer::flush_stream_l1`]: the built (but not yet
266/// published) snapshot manifest and its id. Finalize is responsible
267/// for `save_snapshot` + `set_latest_snapshot` + `cached_manifest`
268/// update.
269struct FlushOutcome {
270 manifest: SnapshotManifest,
271 snapshot_id: String,
272}
273
274pub struct Writer {
275 pub l0_manager: Arc<L0Manager>,
276 pub storage: Arc<StorageManager>,
277 pub schema_manager: Arc<uni_common::core::schema::SchemaManager>,
278 pub allocator: Arc<IdAllocator>,
279 pub config: UniConfig,
280 /// Optional embedding runtime. `OnceLock` so the initializer can run
281 /// on `&self` after the `Writer` has been wrapped in `Arc<Writer>`
282 /// (Phase 4 of concurrent_writer.md). Read through
283 /// [`Writer::xervo_runtime`] — the field itself is private to keep
284 /// callers oblivious to the OnceLock representation.
285 xervo_runtime: OnceLock<Arc<ModelRuntime>>,
286 /// Property manager for cache invalidation after flush
287 pub property_manager: Option<Arc<PropertyManager>>,
288 /// Adjacency manager for dual-write (edges survive flush).
289 adjacency_manager: Arc<AdjacencyManager>,
290 /// Timestamp of last flush or creation. Interior-mutable so that
291 /// `&self` callers can update it; uncontended in practice because all
292 /// writes happen inside the single-flusher critical section.
293 /// Arc-wrapped so it can travel into the SharedFlushCtx that the
294 /// async-flush coordinator passes to spawned stream/finalize tasks.
295 last_flush_time: Arc<PlMutex<std::time::Instant>>,
296 /// Background compaction task handle (prevents concurrent compaction races)
297 compaction_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
298 /// Optional index rebuild manager for post-flush automatic rebuild scheduling.
299 /// `OnceLock` for the same reason as `xervo_runtime`.
300 /// Wrapped in `Arc` so the async-flush finalize path can read it
301 /// from a spawned task via `SharedFlushCtx`.
302 index_rebuild_manager: Arc<OnceLock<Arc<crate::storage::index_rebuild::IndexRebuildManager>>>,
303 /// Cached snapshot manifest from the last flush. Avoids re-reading from
304 /// object store on every flush_to_l1 call. Wrapped in a `Mutex` for
305 /// `&self` access; uncontended because all access is inside the
306 /// single-flusher critical section.
307 cached_manifest: Arc<PlMutex<Option<SnapshotManifest>>>,
308 /// Identifier of the fork this writer serves, if any. `None` for
309 /// primary's writer. Set by [`crate::fork::writer_factory::new_for_fork`]
310 /// and read in `flush_to_l1` to emit fork-tagged metrics and to fire
311 /// the fragment-count guard rail (Phase 2 Day 12).
312 pub fork_id: Option<ForkId>,
313 /// Number of `flush_to_l1` calls since this writer was constructed.
314 /// Used as a proxy for L1 fragment growth on the fork's branches:
315 /// each flush typically appends ~1 fragment per touched dataset, so
316 /// the count tracks the order of magnitude of fragment accumulation.
317 /// Reading the actual `Dataset::manifest().fragments.len()` per
318 /// flush would add a per-dataset object-store roundtrip on the hot
319 /// commit path; the proxy keeps the guard rail purely observational
320 /// (Phase 5 introduces fork compaction proper). Only meaningful when
321 /// `fork_id.is_some()`. `Relaxed` is sufficient — observational only.
322 fork_flush_count: Arc<AtomicU64>,
323 /// Whether the fork-fragment warning has already fired at the
324 /// configured threshold. One-shot per writer lifetime. `Relaxed` is
325 /// sufficient — observational only.
326 fork_fragment_warn_fired: Arc<AtomicBool>,
327 /// Dedicated lock for the genuinely-exclusive flush path. Acquired by
328 /// the [`Writer::flush_to_l1`] entry and by `commit_transaction_l0`
329 /// across its WAL-append + L0-merge window. Replaces the outer
330 /// `Arc<RwLock<Writer>>` for flush exclusion once Phase 4 drops it.
331 /// Arc-wrapped so async-flush coordinator's finalize path can
332 /// re-acquire it from a spawned task via SharedFlushCtx.
333 flush_lock: Arc<tokio::sync::Mutex<()>>,
334 /// Coordinator for async-flush pipeline. Owns the back-pressure
335 /// semaphore, rotate-order sequence, single-finalizer task, and
336 /// pending-flush counter. Always present even when async flush is
337 /// disabled — the sync `flush_to_l1` path uses it for the future
338 /// `FlushInProgressGuard`/permit ownership model.
339 /// Coordinator is `None` when `async_flush_enabled = false`. The
340 /// coordinator's finalizer task captures `SharedFlushCtx` which
341 /// includes `Arc<StorageManager>`; on a fork-scoped Writer that
342 /// also pins the fork's `ForkScope` via `storage.fork_scope`, so
343 /// the holder count never drops. Constructing it only when the
344 /// feature is actually on avoids that side-effect for all
345 /// existing sync-flush paths. When async-flush graduates from
346 /// opt-in to default (Commit 12), `drop_fork` (Commit 8) handles
347 /// the drain explicitly.
348 #[allow(dead_code)] // first production use lands in Commit 6/7
349 pub(crate) flush_coordinator: Option<Arc<crate::runtime::flush_coordinator::FlushCoordinator>>,
350 /// Optimistic-concurrency commit-sequence counter (SSI). Incremented once
351 /// per successful commit under `flush_lock`; a transaction captures the
352 /// current value at begin as its read sequence (`L0Buffer::occ_read_seq`).
353 ///
354 /// Always allocated; consulted only when `config.ssi_enabled` is `true`.
355 ///
356 /// Typed through the [`crate::runtime::sync`] shim so the OCC commit core can
357 /// be model-checked under loom/shuttle; aliases to `std::AtomicU64` normally.
358 commit_sequence: Arc<crate::runtime::sync::AtomicU64>,
359 /// Bounded log of recently-committed write-sets for OCC conflict detection.
360 /// Read and updated only under `flush_lock`.
361 ///
362 /// Always allocated; consulted only when `config.ssi_enabled` is `true`.
363 committed_writes: Arc<PlMutex<crate::runtime::occ::CommitRegistry>>,
364 /// Per-row pessimistic locks for `FOR UPDATE` (SSI escape hatch), keyed by
365 /// canonical (label, key-props) bytes. A transaction holds the lock from
366 /// MATCH until commit/rollback, serializing concurrent `FOR UPDATE` writers
367 /// on the same key (avoiding optimistic abort-retry on hot keys).
368 ///
369 /// Always allocated; populated only when `config.ssi_enabled` is `true`.
370 for_update_locks: Arc<dashmap::DashMap<Vec<u8>, Arc<tokio::sync::Mutex<()>>>>,
371}
372
373/// Number of recent commits retained for OCC conflict detection. Large enough
374/// that under-run — and the resulting conservative abort — is rare in practice;
375/// each entry is a small set of touched ids.
376const OCC_REGISTRY_CAPACITY: usize = 4096;
377
378impl Writer {
379 pub async fn new(
380 storage: Arc<StorageManager>,
381 schema_manager: Arc<uni_common::core::schema::SchemaManager>,
382 start_version: u64,
383 ) -> Result<Self> {
384 Self::new_with_config(
385 storage,
386 schema_manager,
387 start_version,
388 UniConfig::default(),
389 None,
390 None,
391 )
392 .await
393 }
394
395 pub async fn new_with_config(
396 storage: Arc<StorageManager>,
397 schema_manager: Arc<uni_common::core::schema::SchemaManager>,
398 start_version: u64,
399 config: UniConfig,
400 wal: Option<Arc<WriteAheadLog>>,
401 allocator: Option<Arc<IdAllocator>>,
402 ) -> Result<Self> {
403 let allocator = if let Some(a) = allocator {
404 a
405 } else {
406 let store = storage.store();
407 let path = object_store::path::Path::from("id_allocator.json");
408 Arc::new(IdAllocator::new(store, path, 1000).await?)
409 };
410
411 let l0_manager = Arc::new(L0Manager::new(start_version, wal));
412
413 let property_manager = Some(Arc::new(PropertyManager::new(
414 storage.clone(),
415 schema_manager.clone(),
416 1000,
417 )));
418
419 let adjacency_manager = storage.adjacency_manager();
420
421 // Hoist the Arc'd fields so we can both stash them on Writer and
422 // hand the same Arcs to the SharedFlushCtx that FlushCoordinator
423 // captures. Single-source-of-truth for each piece of mutable
424 // shared state.
425 let last_flush_time = Arc::new(PlMutex::new(std::time::Instant::now()));
426 let cached_manifest = Arc::new(PlMutex::new(None));
427 let fork_flush_count = Arc::new(AtomicU64::new(0));
428 let fork_fragment_warn_fired = Arc::new(AtomicBool::new(false));
429 let flush_lock = Arc::new(tokio::sync::Mutex::new(()));
430 let compaction_handle = Arc::new(RwLock::new(None));
431 let index_rebuild_manager: Arc<
432 OnceLock<Arc<crate::storage::index_rebuild::IndexRebuildManager>>,
433 > = Arc::new(OnceLock::new());
434
435 let flush_coordinator = if config.async_flush_enabled {
436 let shared = SharedFlushCtx {
437 storage: storage.clone(),
438 l0_manager: l0_manager.clone(),
439 adjacency_manager: adjacency_manager.clone(),
440 property_manager: property_manager.clone(),
441 schema_manager: schema_manager.clone(),
442 cached_manifest: cached_manifest.clone(),
443 last_flush_time: last_flush_time.clone(),
444 fork_id: None,
445 fork_flush_count: fork_flush_count.clone(),
446 fork_fragment_warn_fired: fork_fragment_warn_fired.clone(),
447 fork_fragment_warn_threshold: config.fork_fragment_warn_threshold,
448 flush_lock: flush_lock.clone(),
449 index_rebuild_manager: index_rebuild_manager.clone(),
450 compaction_handle: compaction_handle.clone(),
451 compaction_config: config.compaction.clone(),
452 index_rebuild_config: config.index_rebuild.clone(),
453 auto_rebuild_enabled: config.index_rebuild.auto_rebuild_enabled,
454 };
455 let finalize_fn: Arc<dyn FinalizeFn> = Arc::new(WriterFinalizer);
456 Some(Arc::new(FlushCoordinator::new(
457 config.max_pending_flushes,
458 shared,
459 finalize_fn,
460 )))
461 } else {
462 None
463 };
464
465 let commit_sequence = Arc::new(crate::runtime::sync::AtomicU64::new(0));
466 let committed_writes = Arc::new(PlMutex::new(crate::runtime::occ::CommitRegistry::new(
467 OCC_REGISTRY_CAPACITY,
468 )));
469 let for_update_locks = Arc::new(dashmap::DashMap::new());
470
471 Ok(Self {
472 l0_manager,
473 storage,
474 schema_manager,
475 allocator,
476 config,
477 xervo_runtime: OnceLock::new(),
478 property_manager,
479 adjacency_manager,
480 last_flush_time,
481 compaction_handle,
482 index_rebuild_manager,
483 cached_manifest,
484 fork_id: None,
485 fork_flush_count,
486 fork_fragment_warn_fired,
487 flush_lock,
488 flush_coordinator,
489 commit_sequence,
490 committed_writes,
491 for_update_locks,
492 })
493 }
494
495 /// Returns the shared pessimistic lock handle for a `FOR UPDATE` row key,
496 /// creating it on first use. The caller `.lock_owned().await`s the returned
497 /// mutex and holds the guard for the transaction's lifetime.
498 pub fn row_lock_handle(&self, key: &[u8]) -> Arc<tokio::sync::Mutex<()>> {
499 self.for_update_locks
500 .entry(key.to_vec())
501 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
502 .clone()
503 }
504
505 /// Prunes `FOR UPDATE` lock-map entries for `keys` that no live transaction
506 /// holds anymore, so the map does not grow without bound across the keyspace.
507 ///
508 /// Called when a transaction ends, **after** its guards have been dropped.
509 /// `remove_if` evaluates its predicate under the DashMap shard lock, which is
510 /// the same lock `row_lock_handle` takes to clone an entry — so the check
511 /// `strong_count == 1` (only the map holds the `Arc`) is race-free: a
512 /// concurrent acquirer either already cloned the `Arc` (count ≥ 2 → we skip
513 /// removal) or has not yet taken the shard lock (it will mint a fresh entry
514 /// after we remove). Either way no two transactions ever lock different
515 /// `Mutex` instances for the same key.
516 pub fn release_for_update_locks(&self, keys: &[Vec<u8>]) {
517 for key in keys {
518 self.for_update_locks
519 .remove_if(key, |_, handle| Arc::strong_count(handle) == 1);
520 }
521 }
522
523 /// Number of live entries in the `FOR UPDATE` lock map. Introspection for
524 /// tests that the map does not leak entries across transactions (G5).
525 pub fn for_update_lock_count(&self) -> usize {
526 self.for_update_locks.len()
527 }
528
529 /// The current OCC commit sequence. A `FOR UPDATE` acquisition re-stamps a
530 /// fresh transaction's `occ_read_seq` to this so its conflict-detection
531 /// baseline advances to lock-acquisition time (read-latest under the lock).
532 pub fn current_commit_sequence(&self) -> u64 {
533 self.commit_sequence
534 .load(crate::runtime::sync::Ordering::Relaxed)
535 }
536
537 /// Build a fresh `SharedFlushCtx` from this Writer's current state.
538 /// Used by the async-flush stream/finalize paths to pass into spawned
539 /// tasks without smuggling `Arc<Writer>` (which would create a cycle
540 /// with `flush_coordinator -> FinalizeFn -> Writer`).
541 pub(crate) fn shared_ctx(&self) -> SharedFlushCtx {
542 SharedFlushCtx {
543 storage: self.storage.clone(),
544 l0_manager: self.l0_manager.clone(),
545 adjacency_manager: self.adjacency_manager.clone(),
546 property_manager: self.property_manager.clone(),
547 schema_manager: self.schema_manager.clone(),
548 cached_manifest: self.cached_manifest.clone(),
549 last_flush_time: self.last_flush_time.clone(),
550 fork_id: self.fork_id,
551 fork_flush_count: self.fork_flush_count.clone(),
552 fork_fragment_warn_fired: self.fork_fragment_warn_fired.clone(),
553 fork_fragment_warn_threshold: self.config.fork_fragment_warn_threshold,
554 flush_lock: self.flush_lock.clone(),
555 index_rebuild_manager: self.index_rebuild_manager.clone(),
556 compaction_handle: self.compaction_handle.clone(),
557 compaction_config: self.config.compaction.clone(),
558 index_rebuild_config: self.config.index_rebuild.clone(),
559 auto_rebuild_enabled: self.config.index_rebuild.auto_rebuild_enabled,
560 }
561 }
562
563 /// Borrow the flush coordinator if async flush is enabled.
564 /// Returns `None` when `config.async_flush_enabled = false`.
565 /// External callers (`drop_fork`) use this to drain pending streams.
566 pub fn flush_coordinator(
567 &self,
568 ) -> Option<&Arc<crate::runtime::flush_coordinator::FlushCoordinator>> {
569 self.flush_coordinator.as_ref()
570 }
571
572 /// Set the index rebuild manager for post-flush automatic rebuild scheduling.
573 ///
574 /// One-shot: returns `Err` if already set. The receiver is `&self` so this
575 /// can be called after the `Writer` has been wrapped in `Arc<Writer>`.
576 pub fn set_index_rebuild_manager(
577 &self,
578 manager: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
579 ) -> Result<()> {
580 self.index_rebuild_manager
581 .set(manager)
582 .map_err(|_| anyhow!("index_rebuild_manager already set"))
583 }
584
585 /// Replay WAL mutations into the current L0 buffer.
586 pub async fn replay_wal(&self, wal_high_water_mark: u64) -> Result<usize> {
587 let l0 = self.l0_manager.get_current();
588 let wal = l0.read().wal.clone();
589
590 if let Some(wal) = wal {
591 wal.initialize().await?;
592 let mutations = wal.replay_since(wal_high_water_mark).await?;
593 let count = mutations.len();
594
595 if count > 0 {
596 log::info!(
597 "Replaying {} mutations from WAL (LSN > {})",
598 count,
599 wal_high_water_mark
600 );
601 let mut l0_guard = l0.write();
602 l0_guard.replay_mutations(mutations)?;
603 // Rebuild the UNIQUE constraint index over the recovered rows
604 // (Bug #9 Mechanism B). `replay_mutations` restores
605 // vertices/properties/labels but never repopulates
606 // `constraint_index` (its only other caller is the live insert
607 // path). Without this, a unique key that lives only in the WAL
608 // (committed but not yet flushed to Lance) is invisible to
609 // `check_unique_constraint_multi` after recovery and a
610 // duplicate of it could be created.
611 self.rebuild_constraint_index(&mut l0_guard);
612 }
613
614 Ok(count)
615 } else {
616 Ok(0)
617 }
618 }
619
620 /// Rebuild the UNIQUE constraint index on a recovered L0 buffer.
621 ///
622 /// Scans every recovered vertex's properties and, for each enabled UNIQUE
623 /// constraint whose target label the vertex carries and whose member
624 /// properties are all present, inserts the same constraint key the live
625 /// insert path builds (`serialize_constraint_key`). Tombstoned vertices are
626 /// skipped. Called after [`L0Buffer::replay_mutations`] under the buffer's
627 /// write lock; the schema is already loaded on the `Writer`.
628 fn rebuild_constraint_index(&self, l0_guard: &mut L0Buffer) {
629 let schema = self.schema_manager.schema();
630 // Collect entries first to avoid borrowing `vertex_properties`
631 // immutably while mutating `constraint_index` through the same guard.
632 let mut keys: Vec<(Vec<u8>, Vid)> = Vec::new();
633 for (&vid, props) in &l0_guard.vertex_properties {
634 if l0_guard.vertex_tombstones.contains(&vid) {
635 continue;
636 }
637 let Some(labels) = l0_guard.vertex_labels.get(&vid) else {
638 continue;
639 };
640 for label in labels {
641 for constraint in &schema.constraints {
642 if !constraint.enabled {
643 continue;
644 }
645 let ConstraintTarget::Label(l) = &constraint.target else {
646 continue;
647 };
648 if l != label {
649 continue;
650 }
651 let ConstraintType::Unique {
652 properties: unique_props,
653 } = &constraint.constraint_type
654 else {
655 continue;
656 };
657 let mut key_values = Vec::new();
658 let mut all_present = true;
659 for prop in unique_props {
660 if let Some(val) = props.get(prop) {
661 key_values.push((prop.clone(), val.clone()));
662 } else {
663 all_present = false;
664 break;
665 }
666 }
667 if all_present {
668 keys.push((serialize_constraint_key(label, &key_values), vid));
669 }
670 }
671 }
672 }
673 for (key, vid) in keys {
674 l0_guard.insert_constraint_key(key, vid);
675 }
676 }
677
678 /// Allocates the next VID (pure auto-increment).
679 pub async fn next_vid(&self) -> Result<Vid> {
680 self.allocator.allocate_vid().await
681 }
682
683 /// Allocates multiple VIDs at once for bulk operations.
684 /// This is more efficient than calling next_vid() in a loop.
685 pub async fn allocate_vids(&self, count: usize) -> Result<Vec<Vid>> {
686 self.allocator.allocate_vids(count).await
687 }
688
689 /// Allocates the next EID (pure auto-increment).
690 pub async fn next_eid(&self, _type_id: u32) -> Result<Eid> {
691 self.allocator.allocate_eid().await
692 }
693
694 /// Allocates multiple EIDs at once for bulk operations.
695 /// This is more efficient than calling next_eid() in a loop.
696 pub async fn allocate_eids(&self, count: usize) -> Result<Vec<Eid>> {
697 self.allocator.allocate_eids(count).await
698 }
699
700 /// Install the embedding runtime exactly once. Receiver is `&self` so it
701 /// can be called after the `Writer` has been wrapped in `Arc<Writer>`.
702 pub fn set_xervo_runtime(&self, runtime: Arc<ModelRuntime>) -> Result<()> {
703 self.xervo_runtime
704 .set(runtime)
705 .map_err(|_| anyhow!("xervo_runtime already set"))
706 }
707
708 pub fn xervo_runtime(&self) -> Option<Arc<ModelRuntime>> {
709 self.xervo_runtime.get().cloned()
710 }
711
712 /// Create a new empty L0 buffer for transaction-scoped mutations.
713 ///
714 /// Only reads the current version — no exclusive lock required on Writer.
715 /// The returned buffer has no WAL reference; mutations are logged at
716 /// commit time via [`Self::commit_transaction_l0`].
717 pub fn create_transaction_l0(&self) -> Arc<RwLock<L0Buffer>> {
718 let current_version = self.l0_manager.get_current().read().current_version;
719 // Transaction mutations are logged to WAL at COMMIT time, not during the transaction.
720 let buf = L0Buffer::new(current_version, None);
721 // SSI: stamp the OCC read sequence at begin so commit can detect any
722 // transaction that committed since. Gated on the runtime `ssi_enabled`
723 // toggle — when off, `occ_read_set` stays `None` and every downstream
724 // read-set recording / commit validation self-gates to a no-op.
725 let buf = if self.config.ssi_enabled {
726 let mut buf = buf;
727 buf.occ_read_seq = self
728 .commit_sequence
729 .load(crate::runtime::sync::Ordering::Relaxed);
730 // The read path records observed ids here for SSI antidependency
731 // detection; commit consults it.
732 buf.occ_read_set = Some(Arc::new(parking_lot::Mutex::new(
733 crate::runtime::l0::OccReadSet::default(),
734 )));
735 buf
736 } else {
737 buf
738 };
739 Arc::new(RwLock::new(buf))
740 }
741
742 /// Resolve the target L0 buffer for a mutation.
743 ///
744 /// When `tx_l0` is `Some`, the mutation targets a transaction-private buffer.
745 /// When `None`, it targets the global L0 from the manager.
746 fn resolve_l0(&self, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) -> Arc<RwLock<L0Buffer>> {
747 tx_l0
748 .cloned()
749 .unwrap_or_else(|| self.l0_manager.get_current())
750 }
751
752 fn update_metrics(&self) {
753 let l0 = self.l0_manager.get_current();
754 let size = l0.read().estimated_size;
755 metrics::gauge!("l0_buffer_size_bytes").set(size as f64);
756 }
757
758 /// Overlay-aware issue-#77 edge-endpoint validation.
759 ///
760 /// The current buffer alone does not hold all committed-but-unflushed
761 /// tombstones — a flush rotation moves them onto `pending_flush` until
762 /// the Lance write completes. A vertex is effectively deleted iff,
763 /// walking newest-first (tx → current → pending newest→oldest), the
764 /// first buffer that knows the vid says "tombstoned" (an insert clears
765 /// the tombstone within a buffer, so props/tombstone are mutually
766 /// exclusive per buffer).
767 ///
768 /// Must run under `flush_lock` so the overlay cannot change before the
769 /// merge, and BEFORE the durable WAL flush (see the call site).
770 fn validate_edge_endpoints_overlay(&self, tx_l0: &L0Buffer) -> Result<()> {
771 let chain = self.l0_manager.get_pending_flush();
772 let current = self.l0_manager.get_current();
773 let effectively_deleted = |vid: &Vid| -> bool {
774 if tx_l0.vertex_properties.contains_key(vid) {
775 return false;
776 }
777 if tx_l0.vertex_tombstones.contains(vid) {
778 return true;
779 }
780 {
781 let cur = current.read();
782 if cur.vertex_properties.contains_key(vid) {
783 return false;
784 }
785 if cur.vertex_tombstones.contains(vid) {
786 return true;
787 }
788 }
789 for frozen in chain.iter().rev() {
790 let g = frozen.read();
791 if g.vertex_properties.contains_key(vid) {
792 return false;
793 }
794 if g.vertex_tombstones.contains(vid) {
795 return true;
796 }
797 }
798 false
799 };
800 for (eid, (src_vid, dst_vid, _etype)) in &tx_l0.edge_endpoints {
801 if tx_l0.tombstones.contains_key(eid) {
802 continue; // a deletion, not an insertion — never resurrects a vertex
803 }
804 if effectively_deleted(src_vid) {
805 anyhow::bail!(
806 "Cannot insert edge {}: source vertex {} has been deleted (issue #77)",
807 eid,
808 src_vid
809 );
810 }
811 if effectively_deleted(dst_vid) {
812 anyhow::bail!(
813 "Cannot insert edge {}: destination vertex {} has been deleted (issue #77)",
814 eid,
815 dst_vid
816 );
817 }
818 }
819 Ok(())
820 }
821
822 /// Seed `main_l0` (the current buffer) with the newest pending-overlay
823 /// value for each CRDT property the transaction writes, so the commit
824 /// merge MERGES against the committed CRDT state instead of shadowing it
825 /// (the carve-out lets concurrent CRDT writers commit on the assumption
826 /// that the merge sees the committed value — true only while that value
827 /// lives in current, not mid-flush on `pending_flush`). No-op when
828 /// nothing is pending or the property already exists in current. Vertex
829 /// properties only, mirroring the carve-out itself.
830 fn seed_crdt_state_from_chain(&self, tx_l0: &L0Buffer, main_l0: &mut L0Buffer) {
831 let chain = self.l0_manager.get_pending_flush();
832 if chain.is_empty() {
833 return;
834 }
835 for (vid, props) in &tx_l0.vertex_properties {
836 Self::seed_crdt_props(&chain, *vid, props, main_l0);
837 }
838 }
839
840 /// Per-vertex CRDT seeding (see [`Self::seed_crdt_state_from_chain`]).
841 /// Also used by the non-transactional vertex write path, which CRDT-merges
842 /// into the current buffer directly and has the same shadowing hazard
843 /// during a flush window.
844 fn seed_crdt_props(
845 chain: &[Arc<RwLock<L0Buffer>>],
846 vid: Vid,
847 props: &Properties,
848 target: &mut L0Buffer,
849 ) {
850 for (key, value) in props {
851 if crate::runtime::l0::try_as_crdt(value).is_none() {
852 continue;
853 }
854 if target
855 .vertex_properties
856 .get(&vid)
857 .is_some_and(|p| p.contains_key(key))
858 {
859 continue;
860 }
861 // Newest generation first: the first hit is the live state.
862 for frozen in chain.iter().rev() {
863 let g = frozen.read();
864 if let Some(v) = g.vertex_properties.get(&vid).and_then(|p| p.get(key)) {
865 if crate::runtime::l0::try_as_crdt(v).is_some() {
866 target
867 .vertex_properties
868 .entry(vid)
869 .or_default()
870 .insert(key.clone(), v.clone());
871 }
872 break;
873 }
874 }
875 }
876 }
877
878 /// Commit an externally-owned transaction L0 buffer.
879 ///
880 /// Writes mutations to WAL, flushes, merges into main L0, and replays
881 /// edges into the AdjacencyManager. Returns the WAL LSN of the commit
882 /// (0 when no WAL is configured).
883 /// Commit a transaction's private L0 buffer into main L0.
884 ///
885 /// Returns `(wal_lsn, flush_pending)`. When `flush_pending == true`, the
886 /// post-commit `should_flush()` predicate fired but no flush ran — the
887 /// caller is expected to spawn a background `flush_to_l1`. This is the
888 /// shape used when `UniConfig::async_flush_enabled` is set, so commits
889 /// don't block on L1-streaming I/O.
890 pub async fn commit_transaction_l0(
891 self: &Arc<Self>,
892 tx_l0_arc: Arc<RwLock<L0Buffer>>,
893 ) -> Result<(u64, bool)> {
894 // Hold `flush_lock` across WAL append + flush + main-L0 merge.
895 // Two concurrent commits serialize here; in Phase 3 the outer
896 // `Arc<RwLock<Writer>>` already provides this exclusion, so the
897 // acquisition is uncontended. Phase 4 drops the outer lock and
898 // this becomes the load-bearing serialization point.
899 let _flush_lock_guard = self.flush_lock.lock().await;
900
901 // Crash-recovery seam: simulate process death immediately after winning
902 // the commit serialization point but before any durable work. No-op
903 // unless built with `--features failpoints`. (See ssi_resilience tests.)
904 fail::fail_point!("commit::after-flush-lock");
905
906 // SSI: optimistic conflict detection. This MUST run before any WAL
907 // write — `flush_wal()` below is the durable commit point and the WAL
908 // has no abort marker, so aborting after it would resurrect this
909 // transaction on crash recovery. The write-set is reused for
910 // registration after a successful merge.
911 // Runtime-gated on `config.ssi_enabled`. When off, no validation runs
912 // and `occ_write_set` is `None`, so the post-merge registration below
913 // is skipped — reproducing last-writer-wins exactly.
914 let occ_write_set: Option<crate::runtime::occ::WriteSet> = if self.config.ssi_enabled {
915 let tx_l0 = tx_l0_arc.read();
916 let read_seq = tx_l0.occ_read_seq;
917 let write_set = crate::runtime::occ::WriteSet::from_l0(&tx_l0);
918 if !write_set.is_empty() {
919 // Telemetry: one validation per non-empty (writing) commit. The
920 // ratio of conflicts to validations is the headline abort rate.
921 metrics::counter!("uni_ssi_commit_validations_total").increment(1);
922 // Read-set is consulted only for writing transactions, so a
923 // read-only commit (empty write-set) runs at snapshot isolation.
924 let read_guard = tx_l0.occ_read_set.as_ref().map(|rs| rs.lock());
925 if let Some(conflict) =
926 self.committed_writes
927 .lock()
928 .check(read_seq, &write_set, read_guard.as_deref())
929 {
930 use crate::runtime::occ::Conflict;
931 match &conflict {
932 Conflict::WriteWrite { .. } => metrics::counter!(
933 "uni_ssi_serialization_conflicts_total",
934 "kind" => "write_write",
935 )
936 .increment(1),
937 Conflict::ReadWrite { .. } => metrics::counter!(
938 "uni_ssi_serialization_conflicts_total",
939 "kind" => "read_write",
940 )
941 .increment(1),
942 Conflict::HistoryTruncated { .. } => {
943 metrics::counter!("uni_ssi_history_truncated_total").increment(1)
944 }
945 }
946 return Err(anyhow::Error::new(
947 uni_common::UniError::SerializationConflict {
948 message: conflict.to_string(),
949 },
950 ));
951 }
952 }
953
954 // Validate against the committed-but-unflushed overlay under
955 // `flush_lock`: serializable MERGE uniqueness + CRDT carve-out
956 // soundness. The current buffer alone does not hold all committed
957 // state — a flush rotation moves it onto `pending_flush` until the
958 // Lance write completes (the Bug #9A window, here at the
959 // commit-time layer) — so every check walks [current, pending…].
960 {
961 let pending = self.l0_manager.get_pending_flush();
962 let main_l0 = self.l0_manager.get_current();
963 let overlay: Vec<Arc<RwLock<L0Buffer>>> =
964 std::iter::once(main_l0).chain(pending).collect();
965
966 // SSI / serializable MERGE: abort if a concurrent transaction has
967 // already committed a row with one of this transaction's unique
968 // keys. Commits serialize here, so this closes the race window
969 // left by the per-insert check. (Empty index → no iterations.)
970 for (key, vid) in &tx_l0.constraint_index {
971 if overlay
972 .iter()
973 .any(|b| b.read().has_constraint_key(key, *vid))
974 {
975 metrics::counter!("uni_ssi_constraint_conflicts_total").increment(1);
976 return Err(anyhow::Error::new(
977 uni_common::UniError::ConstraintConflict {
978 message: "unique key already committed by a concurrent \
979 transaction"
980 .to_string(),
981 },
982 ));
983 }
984 }
985
986 // Implicit MERGE phantom guard: a `MERGE` that *created* a node
987 // registered its (label, key-props) here even with no declared
988 // UNIQUE constraint. If a concurrent transaction already committed
989 // the same MERGE key, abort retriably so the two converge to one
990 // node on retry (the loser's MATCH then finds the committed row).
991 // Only MERGE-creates register keys, so a plain CREATE of the same
992 // properties never lands here. (Empty index → no iterations.)
993 for (key, vid) in &tx_l0.merge_guard_index {
994 if overlay
995 .iter()
996 .any(|b| b.read().has_merge_guard_key(key, *vid))
997 {
998 metrics::counter!("uni_ssi_constraint_conflicts_total").increment(1);
999 return Err(anyhow::Error::new(
1000 uni_common::UniError::ConstraintConflict {
1001 message: "MERGE key already committed by a concurrent \
1002 transaction"
1003 .to_string(),
1004 },
1005 ));
1006 }
1007 }
1008
1009 // Same race window for global ext_id uniqueness: the per-insert
1010 // check ran against an older main L0; re-probe the committed
1011 // index here, where commits serialize.
1012 for (ext_id, vid) in &tx_l0.extid_index {
1013 let taken = overlay.iter().any(|b| {
1014 matches!(b.read().extid_index.get(ext_id), Some(&owner) if owner != *vid)
1015 });
1016 if taken {
1017 metrics::counter!("uni_ssi_constraint_conflicts_total").increment(1);
1018 return Err(anyhow::Error::new(
1019 uni_common::UniError::ConstraintConflict {
1020 message: format!(
1021 "ext_id '{ext_id}' already committed by a concurrent \
1022 transaction"
1023 ),
1024 },
1025 ));
1026 }
1027 }
1028
1029 // CRDT carve-out soundness: a pure-CRDT write was dropped from the
1030 // write-set assuming its merge commutes. If the overlay holds a
1031 // *different* CRDT variant for the same property, the merge would
1032 // silently overwrite it — abort instead of losing the update.
1033 // (Checked against every overlay buffer: conservative if an old
1034 // generation held a different variant that a newer commit already
1035 // replaced, but an abort+retry is always sound.)
1036 for buf in &overlay {
1037 if let Some(conflict) =
1038 crate::runtime::occ::crdt_carveout_overwrite(&tx_l0, &buf.read())
1039 {
1040 metrics::counter!("uni_ssi_crdt_aborts_total").increment(1);
1041 return Err(anyhow::Error::new(
1042 uni_common::UniError::SerializationConflict {
1043 message: conflict.to_string(),
1044 },
1045 ));
1046 }
1047 }
1048 }
1049 Some(write_set)
1050 } else {
1051 None
1052 };
1053
1054 // Issue #77: an edge whose endpoint is effectively deleted makes the
1055 // merge below bail. That bail MUST happen before the durable WAL flush —
1056 // after it the transaction is committed-but-unmerged (a ghost commit),
1057 // and WAL replay re-hits the same bail, making the database unopenable.
1058 // SSI validation was deliberately placed before the flush for exactly
1059 // this reason; the endpoint check belongs here too. Runs unconditionally
1060 // (issue #77 is not SSI-gated) under `flush_lock`, so the overlay
1061 // tombstone state cannot change between here and the merge. A
1062 // tombstone may live in a flush-rotated pending buffer rather than
1063 // the current buffer, so the check walks the overlay newest-first.
1064 {
1065 let tx_l0 = tx_l0_arc.read();
1066 self.validate_edge_endpoints_overlay(&tx_l0)?;
1067 }
1068
1069 // Crash-recovery seam: SSI validation has passed; the transaction is
1070 // about to become durable. A crash here must leave NO trace (validation
1071 // happens before the WAL is touched). No-op unless `failpoints`.
1072 fail::fail_point!("commit::after-validate");
1073
1074 // 1. Write transaction mutations to WAL BEFORE merging into main L0
1075 // This ensures durability before visibility.
1076 {
1077 let tx_l0 = tx_l0_arc.read();
1078 let main_l0_arc = self.l0_manager.get_current();
1079 let main_l0 = main_l0_arc.read();
1080
1081 // If WAL exists, write mutations to it for durability
1082 if let Some(wal) = main_l0.wal.as_ref() {
1083 // Order: vertices first, then edges (to ensure src/dst exist on replay)
1084
1085 // Vertex insertions
1086 for (vid, properties) in &tx_l0.vertex_properties {
1087 if !tx_l0.vertex_tombstones.contains(vid) {
1088 let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
1089 wal.append(crate::runtime::wal::Mutation::InsertVertex {
1090 vid: *vid,
1091 properties: properties.clone(),
1092 labels,
1093 })?;
1094 }
1095 }
1096
1097 // Vertex deletions
1098 for vid in &tx_l0.vertex_tombstones {
1099 let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
1100 wal.append(crate::runtime::wal::Mutation::DeleteVertex { vid: *vid, labels })?;
1101 }
1102
1103 // Label-only mutations (SET n:Label / REMOVE n:Label). After
1104 // vertex inserts (so the vertex exists on replay), before edges,
1105 // and skipping vertices deleted in this same commit.
1106 for vid in &tx_l0.vertex_label_overwrites {
1107 if tx_l0.vertex_tombstones.contains(vid) {
1108 continue;
1109 }
1110 let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
1111 wal.append(crate::runtime::wal::Mutation::SetVertexLabels {
1112 vid: *vid,
1113 labels,
1114 })?;
1115 }
1116
1117 // Crash-recovery seam: vertices appended, edges not yet. Tests
1118 // assert that a crash here (before `flush_wal`) recovers NOTHING
1119 // — the durable commit point is the flush below, not append.
1120 fail::fail_point!("commit::mid-wal");
1121
1122 // Edge insertions and deletions from edge_endpoints
1123 for (eid, (src_vid, dst_vid, edge_type)) in &tx_l0.edge_endpoints {
1124 if tx_l0.tombstones.contains_key(eid) {
1125 let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
1126 wal.append(crate::runtime::wal::Mutation::DeleteEdge {
1127 eid: *eid,
1128 src_vid: *src_vid,
1129 dst_vid: *dst_vid,
1130 edge_type: *edge_type,
1131 version,
1132 })?;
1133 } else {
1134 let properties =
1135 tx_l0.edge_properties.get(eid).cloned().unwrap_or_default();
1136 let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
1137 let edge_type_name = tx_l0.edge_types.get(eid).cloned();
1138 wal.append(crate::runtime::wal::Mutation::InsertEdge {
1139 src_vid: *src_vid,
1140 dst_vid: *dst_vid,
1141 edge_type: *edge_type,
1142 eid: *eid,
1143 version,
1144 properties,
1145 edge_type_name,
1146 })?;
1147 }
1148 }
1149
1150 // Tombstones for edges that only exist in the global L0 (not in
1151 // this transaction's edge_endpoints). Without this, deletes of
1152 // pre-existing edges would be silently lost.
1153 for (eid, tombstone) in &tx_l0.tombstones {
1154 if !tx_l0.edge_endpoints.contains_key(eid) {
1155 let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
1156 wal.append(crate::runtime::wal::Mutation::DeleteEdge {
1157 eid: *eid,
1158 src_vid: tombstone.src_vid,
1159 dst_vid: tombstone.dst_vid,
1160 edge_type: tombstone.edge_type,
1161 version,
1162 })?;
1163 }
1164 }
1165 }
1166 }
1167
1168 // 2. Flush WAL to durable storage - THIS IS THE COMMIT POINT
1169 let wal_lsn = self.flush_wal().await?;
1170
1171 // Crash-recovery seam: the WAL is durable but main L0 has NOT merged.
1172 // A crash here must RECOVER the transaction on replay (it is committed),
1173 // even though it was never made visible in-process. No-op unless `failpoints`.
1174 fail::fail_point!("commit::after-wal-flush");
1175
1176 // Component C1: if an outstanding snapshot pins the current generation,
1177 // clone it aside (lazy copy-on-write) before merging, so the pinning
1178 // transaction's reads stay isolated from this commit. No-op — and zero
1179 // cost — when nothing is pinned (the common case). We hold `flush_lock`,
1180 // so this cannot race a flush rotate or another commit's merge; the merge
1181 // below re-fetches `get_current()`, landing in the fresh post-freeze buffer.
1182 // Self-gates on the runtime SSI toggle: a snapshot is only ever pinned by
1183 // a transaction begun under `ssi_enabled`, so `is_current_pinned()` is
1184 // always false when SSI is off and this is a zero-cost no-op.
1185 if self.l0_manager.is_current_pinned() {
1186 self.l0_manager.freeze_current_for_snapshot();
1187 metrics::counter!("uni_l0_snapshot_freezes_total").increment(1);
1188 }
1189
1190 // 3. Merge into main L0 and make visible
1191 {
1192 // Write-lock the tx buffer: `merge_take` moves its property maps
1193 // into main L0 instead of cloning them. The commit consumes the
1194 // transaction, so the drained maps are never observed afterwards;
1195 // everything read below (endpoints, versions, tombstones) is left
1196 // intact.
1197 let mut tx_l0 = tx_l0_arc.write();
1198 let main_l0_arc = self.l0_manager.get_current();
1199 let mut main_l0 = main_l0_arc.write();
1200 // A CRDT property's committed state may live only in a
1201 // flush-rotated pending buffer (the post-rotation current is
1202 // empty until the Lance write completes — the Bug #9A window).
1203 // `merge_crdt_properties` merges against the CURRENT buffer's
1204 // value — without seeding, the tx's CRDT state would SHADOW the
1205 // pending buffer's at read time (newest buffer wins per property)
1206 // and concurrent increments would be lost. Seed the newest
1207 // overlay value for each CRDT property the tx writes that current
1208 // lacks, so the merge below merges instead of replaces.
1209 self.seed_crdt_state_from_chain(&tx_l0, &mut main_l0);
1210 main_l0.merge_take(&mut tx_l0)?;
1211
1212 // Replay transaction edges into the AdjacencyManager overlay
1213 for (eid, (src, dst, etype)) in &tx_l0.edge_endpoints {
1214 let edge_version = tx_l0
1215 .edge_versions
1216 .get(eid)
1217 .copied()
1218 .unwrap_or(main_l0.current_version);
1219 if tx_l0.tombstones.contains_key(eid) {
1220 self.adjacency_manager
1221 .add_tombstone(*eid, *src, *dst, *etype, edge_version);
1222 } else {
1223 self.adjacency_manager
1224 .insert_edge(*src, *dst, *eid, *etype, edge_version);
1225 }
1226 }
1227
1228 // Replay tombstones for edges that only exist in the global L0
1229 // (not in this transaction's edge_endpoints).
1230 for (eid, tombstone) in &tx_l0.tombstones {
1231 if !tx_l0.edge_endpoints.contains_key(eid) {
1232 let edge_version = tx_l0
1233 .edge_versions
1234 .get(eid)
1235 .copied()
1236 .unwrap_or(main_l0.current_version);
1237 self.adjacency_manager.add_tombstone(
1238 *eid,
1239 tombstone.src_vid,
1240 tombstone.dst_vid,
1241 tombstone.edge_type,
1242 edge_version,
1243 );
1244 }
1245 }
1246 }
1247
1248 // Crash-recovery seam: durable AND merged, but the in-memory commit
1249 // registry has not recorded this write-set yet. A crash here is
1250 // indistinguishable from one at `after-wal-flush` on reopen (the
1251 // registry is in-memory and rebuilt empty); the tx still recovers.
1252 fail::fail_point!("commit::after-merge");
1253
1254 // SSI: register this commit's write-set under a fresh commit sequence so
1255 // later transactions detect conflicts against it. Still under
1256 // `flush_lock`, before the async-flush branch can drop the guard.
1257 // `occ_write_set` is `Some` only when `config.ssi_enabled`.
1258 if let Some(write_set) = occ_write_set
1259 && !write_set.is_empty()
1260 {
1261 // Bump-then-record via the shared OCC seam (see `CommitRegistry::commit`)
1262 // so production and the loom/shuttle models exercise identical logic.
1263 self.committed_writes
1264 .lock()
1265 .commit(&self.commit_sequence, write_set);
1266 }
1267
1268 self.update_metrics();
1269
1270 // 4. Best-effort post-commit auto-flush.
1271 //
1272 // Two paths:
1273 // - async_flush_enabled = false (default): inline under our
1274 // existing flush_lock guard via flush_inline_under_lock.
1275 // - async_flush_enabled = true: rotate inline, drop flush_lock,
1276 // then submit the stream phase to the coordinator. Gated on
1277 // `pending_flush_count() < max_pending_flushes` so we don't
1278 // stack up rotations beyond the configured pipeline depth.
1279 // `try_acquire_permit` is non-blocking: if we lose the race
1280 // for the last permit, we just skip this trigger (the next
1281 // commit retries).
1282 let mut flush_pending = false;
1283 if self.should_flush() {
1284 if self.config.async_flush_enabled
1285 && let Some(coord) = self.flush_coordinator.as_ref()
1286 && coord.pending_flush_count() < self.config.max_pending_flushes
1287 {
1288 match coord.try_acquire_permit() {
1289 Some(permit) => {
1290 match self.flush_l0_rotate().await {
1291 Ok(rotate_out) => {
1292 // Allocate the rotate seq and bump pending ONLY
1293 // after the rotate succeeds (Bug #3). A failed
1294 // rotate must consume neither: the finalizer
1295 // advances strictly in consecutive seq order and
1296 // only decrements pending on finalize, so a
1297 // leaked seq/pending from a failed rotate would
1298 // wedge the finalizer forever and climb pending
1299 // toward `max_pending_flushes`. The seq is still
1300 // allocated under `flush_lock` (immediately after
1301 // the rotate, before the guard drops below), so
1302 // concurrent rotates keep seq order == rotation
1303 // order, and the seq is not used until submit.
1304 let seq = coord.next_rotate_seq();
1305 coord.note_pending();
1306 // Release flush_lock BEFORE the spawn so concurrent
1307 // commits can proceed while the stream runs.
1308 drop(_flush_lock_guard);
1309 let parent_manifest = self.cached_manifest.lock().clone();
1310 let rotated = crate::runtime::flush_coordinator::RotatedFlush {
1311 seq,
1312 old_l0_arc: rotate_out.old_l0_arc.clone(),
1313 wal_lsn: rotate_out.wal_lsn,
1314 current_version: rotate_out.current_version,
1315 name: None,
1316 parent_manifest,
1317 permit,
1318 flush_in_progress_guard: rotate_out.flush_in_progress_guard,
1319 };
1320 let writer = self.clone();
1321 let _ticket = coord.submit_for_stream(
1322 rotated,
1323 move |old_l0, wal, ver, n| async move {
1324 let outcome =
1325 writer.flush_stream_l1(old_l0, wal, ver, n).await?;
1326 Ok(crate::runtime::flush_coordinator::FlushOutcome {
1327 new_manifest: outcome.manifest,
1328 snapshot_id: outcome.snapshot_id,
1329 })
1330 },
1331 );
1332 flush_pending = true;
1333 // Early return — flush_lock already dropped.
1334 return Ok((wal_lsn, flush_pending));
1335 }
1336 Err(e) => {
1337 tracing::warn!("Async rotate failed (non-critical): {}", e);
1338 // No seq was allocated and pending was not
1339 // bumped (both moved into the Ok arm for Bug
1340 // #3), so the finalizer is not wedged. The
1341 // permit drops here, freeing the slot.
1342 }
1343 }
1344 }
1345 None => {
1346 // Race: someone else grabbed the last permit. Skip;
1347 // next commit will retry should_flush().
1348 metrics::counter!("uni_flush_trigger_skipped_total").increment(1);
1349 }
1350 }
1351 } else if let Err(e) = self.flush_inline_under_lock(None).await {
1352 tracing::warn!("Post-commit flush check failed (non-critical): {}", e);
1353 }
1354 }
1355
1356 Ok((wal_lsn, flush_pending))
1357 }
1358
1359 /// Flush the WAL buffer to durable storage.
1360 ///
1361 /// Returns the LSN of the flushed segment, or `0` when no WAL is configured.
1362 pub async fn flush_wal(&self) -> Result<u64> {
1363 let l0 = self.l0_manager.get_current();
1364 let wal = l0.read().wal.clone();
1365
1366 match wal {
1367 Some(wal) => Ok(wal.flush().await?),
1368 None => Ok(0),
1369 }
1370 }
1371
1372 /// Record property removals in the active L0 mutation stats.
1373 ///
1374 /// Routes to the transaction L0 if provided, otherwise to the main L0.
1375 pub fn track_properties_removed(&self, count: usize, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) {
1376 if count == 0 {
1377 return;
1378 }
1379 let l0 = self.resolve_l0(tx_l0);
1380 l0.write().mutation_stats.properties_removed += count;
1381 }
1382
1383 /// Validates vertex constraints for the given properties.
1384 /// In the new design, label is passed as a parameter since VID no longer embeds label.
1385 async fn validate_vertex_constraints_for_label(
1386 &self,
1387 vid: Vid,
1388 properties: &Properties,
1389 label: &str,
1390 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1391 ) -> Result<()> {
1392 self.validate_vertex_constraints_for_label_impl(vid, properties, label, tx_l0, false)
1393 .await
1394 }
1395
1396 /// Partial-update sibling: validates only constraints touching keys
1397 /// present in `properties` (the touched set). NOT NULL is checked
1398 /// only for touched keys; multi-key UNIQUE / CHECK / EXISTS are
1399 /// skipped when any referenced key is absent (the caller is
1400 /// expected to have routed to the full-row path in that case via
1401 /// `touched_needs_full_read`).
1402 async fn validate_vertex_constraints_for_label_partial(
1403 &self,
1404 vid: Vid,
1405 properties: &Properties,
1406 label: &str,
1407 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1408 ) -> Result<()> {
1409 self.validate_vertex_constraints_for_label_impl(vid, properties, label, tx_l0, true)
1410 .await
1411 }
1412
1413 async fn validate_vertex_constraints_for_label_impl(
1414 &self,
1415 vid: Vid,
1416 properties: &Properties,
1417 label: &str,
1418 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1419 partial: bool,
1420 ) -> Result<()> {
1421 let schema = self.schema_manager.schema();
1422
1423 {
1424 // 1. Check NOT NULL constraints (from Property definitions).
1425 // Under partial-update mode, skip properties NOT in
1426 // `properties` — they retain their previous (already-
1427 // validated) value.
1428 if let Some(props_meta) = schema.properties.get(label) {
1429 for (prop_name, meta) in props_meta {
1430 if !meta.nullable {
1431 let present = properties.get(prop_name);
1432 if partial && present.is_none() {
1433 continue;
1434 }
1435 if present.is_none_or(|v| v.is_null()) {
1436 log::warn!(
1437 "Constraint violation: Property '{}' cannot be null for label '{}'",
1438 prop_name,
1439 label
1440 );
1441 return Err(anyhow!(
1442 "Constraint violation: Property '{}' cannot be null",
1443 prop_name
1444 ));
1445 }
1446 }
1447 }
1448 }
1449
1450 // 2. Check Explicit Constraints (Unique, Check, etc.)
1451 for constraint in &schema.constraints {
1452 if !constraint.enabled {
1453 continue;
1454 }
1455 match &constraint.target {
1456 ConstraintTarget::Label(l) if l == label => {}
1457 _ => continue,
1458 }
1459
1460 match &constraint.constraint_type {
1461 ConstraintType::Unique {
1462 properties: unique_props,
1463 } => {
1464 // Support single and multi-property unique constraints
1465 if !unique_props.is_empty() {
1466 let mut key_values = Vec::new();
1467 let mut missing = false;
1468 for prop in unique_props {
1469 if let Some(val) = properties.get(prop) {
1470 key_values.push((prop.clone(), val.clone()));
1471 } else {
1472 missing = true; // Can't enforce if property missing (partial update?)
1473 // For INSERT, missing means null?
1474 // If property is nullable, unique constraint typically allows multiple nulls or ignores?
1475 // For now, only check if ALL keys are present
1476 }
1477 }
1478
1479 if !missing {
1480 self.check_unique_constraint_multi(label, &key_values, vid, tx_l0)
1481 .await?;
1482 }
1483 }
1484 }
1485 ConstraintType::Exists { property } => {
1486 if properties.get(property).is_none_or(|v| v.is_null()) {
1487 log::warn!(
1488 "Constraint violation: Property '{}' must exist for label '{}'",
1489 property,
1490 label
1491 );
1492 return Err(anyhow!(
1493 "Constraint violation: Property '{}' must exist",
1494 property
1495 ));
1496 }
1497 }
1498 ConstraintType::Check { expression } => {
1499 if !self.evaluate_check_constraint(expression, properties)? {
1500 return Err(anyhow!(
1501 "CHECK constraint '{}' violated: expression '{}' evaluated to false",
1502 constraint.name,
1503 expression
1504 ));
1505 }
1506 }
1507 _ => {
1508 return Err(anyhow!("Unsupported constraint type"));
1509 }
1510 }
1511 }
1512 }
1513 Ok(())
1514 }
1515
1516 /// Validates vertex constraints for a vertex with the given labels.
1517 /// Labels must be passed explicitly since the vertex may not yet be in L0.
1518 /// Unknown labels (not in schema) are skipped.
1519 async fn validate_vertex_constraints(
1520 &self,
1521 vid: Vid,
1522 properties: &Properties,
1523 labels: &[String],
1524 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1525 ) -> Result<()> {
1526 let schema = self.schema_manager.schema();
1527
1528 // Validate constraints only for known labels
1529 for label in labels {
1530 // Skip unknown labels (schemaless support)
1531 if schema.get_label_case_insensitive(label).is_none() {
1532 continue;
1533 }
1534 self.validate_vertex_constraints_for_label(vid, properties, label, tx_l0)
1535 .await?;
1536 }
1537
1538 // Check global ext_id uniqueness if ext_id is provided
1539 if let Some(ext_id) = properties.get("ext_id").and_then(|v| v.as_str()) {
1540 self.check_extid_globally_unique(ext_id, vid, tx_l0).await?;
1541 }
1542
1543 Ok(())
1544 }
1545
1546 /// Partial sibling of `validate_vertex_constraints` — validates only
1547 /// constraints touching keys present in `properties`. Used by
1548 /// `insert_vertex_partial`'s fast path; the caller pre-screens for
1549 /// multi-key UNIQUE constraints via `touched_needs_full_read`.
1550 async fn validate_vertex_constraints_partial(
1551 &self,
1552 vid: Vid,
1553 touched: &Properties,
1554 labels: &[String],
1555 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1556 ) -> Result<()> {
1557 let schema = self.schema_manager.schema();
1558 for label in labels {
1559 if schema.get_label_case_insensitive(label).is_none() {
1560 continue;
1561 }
1562 self.validate_vertex_constraints_for_label_partial(vid, touched, label, tx_l0)
1563 .await?;
1564 }
1565 if let Some(ext_id) = touched.get("ext_id").and_then(|v| v.as_str()) {
1566 self.check_extid_globally_unique(ext_id, vid, tx_l0).await?;
1567 }
1568 Ok(())
1569 }
1570
1571 /// Collect ext_ids and unique constraint keys from an iterator of vertex properties.
1572 ///
1573 /// Used to build a constraint key index from L0 buffers for batch validation.
1574 fn collect_constraint_keys_from_properties<'a>(
1575 properties_iter: impl Iterator<Item = &'a Properties>,
1576 label: &str,
1577 constraints: &[uni_common::core::schema::Constraint],
1578 existing_keys: &mut HashMap<String, HashSet<String>>,
1579 existing_extids: &mut HashSet<String>,
1580 ) {
1581 for props in properties_iter {
1582 if let Some(ext_id) = props.get("ext_id").and_then(|v| v.as_str()) {
1583 existing_extids.insert(ext_id.to_string());
1584 }
1585
1586 for constraint in constraints {
1587 if !constraint.enabled {
1588 continue;
1589 }
1590 if let ConstraintTarget::Label(l) = &constraint.target {
1591 if l != label {
1592 continue;
1593 }
1594 } else {
1595 continue;
1596 }
1597
1598 if let ConstraintType::Unique {
1599 properties: unique_props,
1600 } = &constraint.constraint_type
1601 {
1602 let mut key_parts = Vec::new();
1603 let mut all_present = true;
1604 for prop in unique_props {
1605 if let Some(val) = props.get(prop) {
1606 key_parts.push(format!("{}:{}", prop, val));
1607 } else {
1608 all_present = false;
1609 break;
1610 }
1611 }
1612 if all_present {
1613 let key = key_parts.join("|");
1614 existing_keys
1615 .entry(constraint.name.clone())
1616 .or_default()
1617 .insert(key);
1618 }
1619 }
1620 }
1621 }
1622 }
1623
1624 /// Validates constraints for a batch of vertices efficiently.
1625 ///
1626 /// This method builds an in-memory index from L0 buffers ONCE instead of scanning
1627 /// per vertex, reducing complexity from O(n²) to O(n) for bulk inserts.
1628 ///
1629 /// # Arguments
1630 /// * `vids` - VIDs of vertices being inserted
1631 /// * `properties_batch` - Properties for each vertex
1632 /// * `label` - Label for all vertices (assumes single label for now)
1633 ///
1634 /// # Performance
1635 /// For N vertices with unique constraints:
1636 /// - Old approach: O(N²) - scan L0 buffer N times
1637 /// - New approach: O(N) - scan L0 buffer once, build HashSet, check each vertex in O(1)
1638 async fn validate_vertex_batch_constraints(
1639 &self,
1640 vids: &[Vid],
1641 properties_batch: &[Properties],
1642 label: &str,
1643 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1644 ) -> Result<()> {
1645 if vids.len() != properties_batch.len() {
1646 return Err(anyhow!("VID/properties length mismatch"));
1647 }
1648
1649 let schema = self.schema_manager.schema();
1650
1651 // 1. Validate NOT NULL constraints for each vertex
1652 if let Some(props_meta) = schema.properties.get(label) {
1653 for (idx, properties) in properties_batch.iter().enumerate() {
1654 for (prop_name, meta) in props_meta {
1655 if !meta.nullable && properties.get(prop_name).is_none_or(|v| v.is_null()) {
1656 return Err(anyhow!(
1657 "Constraint violation at index {}: Property '{}' cannot be null",
1658 idx,
1659 prop_name
1660 ));
1661 }
1662 }
1663 }
1664 }
1665
1666 // 2. Build constraint key index from L0 buffers (ONCE for entire batch)
1667 let mut existing_keys: HashMap<String, HashSet<String>> = HashMap::new();
1668 let mut existing_extids: HashSet<String> = HashSet::new();
1669
1670 // Scan current L0 buffer
1671 {
1672 let l0 = self.l0_manager.get_current();
1673 let l0_guard = l0.read();
1674 Self::collect_constraint_keys_from_properties(
1675 l0_guard.vertex_properties.values(),
1676 label,
1677 &schema.constraints,
1678 &mut existing_keys,
1679 &mut existing_extids,
1680 );
1681 }
1682
1683 // Scan transaction L0 if present
1684 if let Some(tx_l0) = tx_l0 {
1685 let tx_l0_guard = tx_l0.read();
1686 Self::collect_constraint_keys_from_properties(
1687 tx_l0_guard.vertex_properties.values(),
1688 label,
1689 &schema.constraints,
1690 &mut existing_keys,
1691 &mut existing_extids,
1692 );
1693 }
1694
1695 // 3. Check batch vertices against index AND check for duplicates within batch
1696 let mut batch_keys: HashMap<String, HashMap<String, usize>> = HashMap::new();
1697 let mut batch_extids: HashMap<String, usize> = HashMap::new();
1698
1699 for (idx, (_vid, properties)) in vids.iter().zip(properties_batch.iter()).enumerate() {
1700 // Check ext_id uniqueness
1701 if let Some(ext_id) = properties.get("ext_id").and_then(|v| v.as_str()) {
1702 if existing_extids.contains(ext_id) {
1703 return Err(anyhow!(
1704 "Constraint violation at index {}: ext_id '{}' already exists",
1705 idx,
1706 ext_id
1707 ));
1708 }
1709 if let Some(first_idx) = batch_extids.get(ext_id) {
1710 return Err(anyhow!(
1711 "Constraint violation: ext_id '{}' duplicated in batch at indices {} and {}",
1712 ext_id,
1713 first_idx,
1714 idx
1715 ));
1716 }
1717 // Also check the main vertices table — the L0 scans above
1718 // miss vertices already flushed to L1, so without this a
1719 // batch insert (e.g. a fork promote onto primary) silently
1720 // twins a duplicate ext_id instead of erroring. Mirrors the
1721 // single-vertex `check_extid_globally_unique`.
1722 if let Ok(Some(found_vid)) =
1723 MainVertexDataset::find_by_ext_id(self.storage.backend(), ext_id, None).await
1724 {
1725 return Err(anyhow!(
1726 "Constraint violation at index {}: ext_id '{}' already exists (vertex {:?})",
1727 idx,
1728 ext_id,
1729 found_vid
1730 ));
1731 }
1732 batch_extids.insert(ext_id.to_string(), idx);
1733 }
1734
1735 // Check unique constraints
1736 for constraint in &schema.constraints {
1737 if !constraint.enabled {
1738 continue;
1739 }
1740 if let ConstraintTarget::Label(l) = &constraint.target {
1741 if l != label {
1742 continue;
1743 }
1744 } else {
1745 continue;
1746 }
1747
1748 match &constraint.constraint_type {
1749 ConstraintType::Unique {
1750 properties: unique_props,
1751 } => {
1752 let mut key_parts = Vec::new();
1753 let mut all_present = true;
1754 for prop in unique_props {
1755 if let Some(val) = properties.get(prop) {
1756 key_parts.push(format!("{}:{}", prop, val));
1757 } else {
1758 all_present = false;
1759 break;
1760 }
1761 }
1762
1763 if all_present {
1764 let key = key_parts.join("|");
1765
1766 // Check against existing L0 keys
1767 if let Some(keys) = existing_keys.get(&constraint.name)
1768 && keys.contains(&key)
1769 {
1770 return Err(anyhow!(
1771 "Constraint violation at index {}: Duplicate composite key for label '{}' (constraint '{}')",
1772 idx,
1773 label,
1774 constraint.name
1775 ));
1776 }
1777
1778 // Check for duplicates within batch
1779 let batch_constraint_keys =
1780 batch_keys.entry(constraint.name.clone()).or_default();
1781 if let Some(first_idx) = batch_constraint_keys.get(&key) {
1782 return Err(anyhow!(
1783 "Constraint violation: Duplicate key '{}' in batch at indices {} and {}",
1784 key,
1785 first_idx,
1786 idx
1787 ));
1788 }
1789 batch_constraint_keys.insert(key, idx);
1790 }
1791 }
1792 ConstraintType::Exists { property }
1793 if properties.get(property).is_none_or(|v| v.is_null()) =>
1794 {
1795 return Err(anyhow!(
1796 "Constraint violation at index {}: Property '{}' must exist",
1797 idx,
1798 property
1799 ));
1800 }
1801 ConstraintType::Check { expression }
1802 if !self.evaluate_check_constraint(expression, properties)? =>
1803 {
1804 return Err(anyhow!(
1805 "Constraint violation at index {}: CHECK constraint '{}' violated",
1806 idx,
1807 constraint.name
1808 ));
1809 }
1810 _ => {}
1811 }
1812 }
1813 }
1814
1815 // 4. Check storage for unique constraints (can batch this into a single query)
1816 for constraint in &schema.constraints {
1817 if !constraint.enabled {
1818 continue;
1819 }
1820 if let ConstraintTarget::Label(l) = &constraint.target {
1821 if l != label {
1822 continue;
1823 }
1824 } else {
1825 continue;
1826 }
1827
1828 if let ConstraintType::Unique {
1829 properties: unique_props,
1830 } = &constraint.constraint_type
1831 {
1832 // Build compound OR filter for all batch vertices
1833 let mut or_filters = Vec::new();
1834 for properties in properties_batch.iter() {
1835 let mut and_parts = Vec::new();
1836 let mut all_present = true;
1837 for prop in unique_props {
1838 if let Some(val) = properties.get(prop) {
1839 let val_str = match val {
1840 Value::String(s) => format!("'{}'", s.replace('\'', "''")),
1841 Value::Int(n) => n.to_string(),
1842 Value::Float(f) => f.to_string(),
1843 Value::Bool(b) => b.to_string(),
1844 _ => {
1845 all_present = false;
1846 break;
1847 }
1848 };
1849 and_parts.push(format!("{} = {}", prop, val_str));
1850 } else {
1851 all_present = false;
1852 break;
1853 }
1854 }
1855 if all_present {
1856 or_filters.push(format!("({})", and_parts.join(" AND ")));
1857 }
1858 }
1859
1860 #[cfg(feature = "lance-backend")]
1861 if !or_filters.is_empty() {
1862 let vid_list: Vec<String> =
1863 vids.iter().map(|v| v.as_u64().to_string()).collect();
1864 let filter = format!(
1865 "({}) AND _deleted = false AND _vid NOT IN ({})",
1866 or_filters.join(" OR "),
1867 vid_list.join(", ")
1868 );
1869
1870 if let Ok(ds) = self.storage.vertex_dataset(label)
1871 && let Ok(lance_ds) = ds.open_raw().await
1872 {
1873 let count = lance_ds.count_rows(Some(filter.clone())).await?;
1874 if count > 0 {
1875 return Err(anyhow!(
1876 "Constraint violation: Duplicate composite key for label '{}' in storage (constraint '{}')",
1877 label,
1878 constraint.name
1879 ));
1880 }
1881 }
1882 }
1883 }
1884 }
1885
1886 Ok(())
1887 }
1888
1889 /// Checks that ext_id is globally unique across all vertices.
1890 ///
1891 /// Searches L0 buffers (current, transaction, pending) and the main vertices table
1892 /// to ensure no other vertex uses this ext_id.
1893 ///
1894 /// # Errors
1895 ///
1896 /// Returns error if another vertex with the same ext_id exists.
1897 async fn check_extid_globally_unique(
1898 &self,
1899 ext_id: &str,
1900 current_vid: Vid,
1901 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1902 ) -> Result<()> {
1903 // Check L0 buffers: current, transaction, and pending flush
1904 let l0_buffers_to_check: Vec<Arc<RwLock<L0Buffer>>> = {
1905 let mut buffers = vec![self.l0_manager.get_current()];
1906 if let Some(tx_l0) = tx_l0 {
1907 buffers.push(tx_l0.clone());
1908 }
1909 buffers.extend(self.l0_manager.get_pending_flush());
1910 buffers
1911 };
1912
1913 for l0 in &l0_buffers_to_check {
1914 // O(1) per buffer via the maintained `extid_index` (the previous
1915 // full `vertex_properties` scan made constrained ingest O(n²)).
1916 if let Some(&vid) = l0.read().extid_index.get(ext_id)
1917 && vid != current_vid
1918 {
1919 return Err(anyhow!(
1920 "Constraint violation: ext_id '{}' already exists (vertex {:?})",
1921 ext_id,
1922 vid
1923 ));
1924 }
1925 }
1926
1927 // Check main vertices table (if it exists)
1928 // Pass None for global uniqueness check (not snapshot-isolated)
1929 let backend = self.storage.backend();
1930 if let Ok(Some(found_vid)) = MainVertexDataset::find_by_ext_id(backend, ext_id, None).await
1931 && found_vid != current_vid
1932 {
1933 return Err(anyhow!(
1934 "Constraint violation: ext_id '{}' already exists (vertex {:?})",
1935 ext_id,
1936 found_vid
1937 ));
1938 }
1939
1940 Ok(())
1941 }
1942
1943 /// Helper to get vertex labels from L0 buffer.
1944 fn get_vertex_labels_from_l0(&self, vid: Vid) -> Option<Vec<String>> {
1945 let l0 = self.l0_manager.get_current();
1946 let l0_guard = l0.read();
1947 // Check if vertex is tombstoned (deleted) - if so, return None
1948 if l0_guard.vertex_tombstones.contains(&vid) {
1949 return None;
1950 }
1951 l0_guard.get_vertex_labels(vid).map(|l| l.to_vec())
1952 }
1953
1954 /// Get vertex labels from all sources: current L0, pending L0s, and storage.
1955 /// This is the proper way to read vertex labels after a flush, as it checks both
1956 /// in-memory buffers and persisted storage.
1957 pub async fn get_vertex_labels(
1958 &self,
1959 vid: Vid,
1960 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1961 ) -> Option<Vec<String>> {
1962 // 1. Check current L0
1963 if let Some(labels) = self.get_vertex_labels_from_l0(vid) {
1964 return Some(labels);
1965 }
1966
1967 // 2. Check transaction L0 if present
1968 if let Some(tx_l0) = tx_l0 {
1969 let guard = tx_l0.read();
1970 if guard.vertex_tombstones.contains(&vid) {
1971 return None;
1972 }
1973 if let Some(labels) = guard.get_vertex_labels(vid) {
1974 return Some(labels.to_vec());
1975 }
1976 }
1977
1978 // 3. Check pending flush L0s
1979 for pending_l0 in self.l0_manager.get_pending_flush() {
1980 let guard = pending_l0.read();
1981 if guard.vertex_tombstones.contains(&vid) {
1982 return None;
1983 }
1984 if let Some(labels) = guard.get_vertex_labels(vid) {
1985 return Some(labels.to_vec());
1986 }
1987 }
1988
1989 // 4. Check storage
1990 self.find_vertex_labels_in_storage(vid).await.ok().flatten()
1991 }
1992
1993 /// Helper to get edge type from L0 buffer.
1994 fn get_edge_type_from_l0(&self, eid: Eid) -> Option<String> {
1995 let l0 = self.l0_manager.get_current();
1996 let l0_guard = l0.read();
1997 l0_guard.get_edge_type(eid).map(|s| s.to_string())
1998 }
1999
2000 /// Look up the edge type ID (u32) for an EID from the L0 buffer's edge endpoints.
2001 /// Falls back to the transaction L0 if available.
2002 pub fn get_edge_type_id_from_l0(
2003 &self,
2004 eid: Eid,
2005 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2006 ) -> Option<u32> {
2007 // Check transaction L0 first
2008 if let Some(tx_l0) = tx_l0 {
2009 let guard = tx_l0.read();
2010 if let Some((_, _, etype)) = guard.get_edge_endpoint_full(eid) {
2011 return Some(etype);
2012 }
2013 }
2014 // Fall back to main L0
2015 let l0 = self.l0_manager.get_current();
2016 let l0_guard = l0.read();
2017 l0_guard
2018 .get_edge_endpoint_full(eid)
2019 .map(|(_, _, etype)| etype)
2020 }
2021
2022 /// Set the type name for an edge (used for schemaless edge types).
2023 /// This is called during CREATE for edge types not found in the schema.
2024 pub fn set_edge_type(
2025 &self,
2026 eid: Eid,
2027 type_name: String,
2028 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2029 ) {
2030 self.resolve_l0(tx_l0).write().set_edge_type(eid, type_name);
2031 }
2032
2033 /// Evaluate a simple CHECK constraint expression.
2034 /// Supports: "property op value" (e.g., "age > 18", "status = 'active'")
2035 fn evaluate_check_constraint(&self, expression: &str, properties: &Properties) -> Result<bool> {
2036 let parts: Vec<&str> = expression.split_whitespace().collect();
2037 if parts.len() != 3 {
2038 // For now, only support "prop op val"
2039 // Fallback to true if too complex to avoid breaking, but warn
2040 log::warn!(
2041 "Complex CHECK constraint expression '{}' not fully supported yet; allowing write.",
2042 expression
2043 );
2044 return Ok(true);
2045 }
2046
2047 let prop_part = parts[0].trim_start_matches('(');
2048 // Handle "variable.property" format - take the part after the dot
2049 let prop_name = if let Some(idx) = prop_part.find('.') {
2050 &prop_part[idx + 1..]
2051 } else {
2052 prop_part
2053 };
2054
2055 let op = parts[1];
2056 let val_str = parts[2].trim_end_matches(')');
2057
2058 let prop_val = match properties.get(prop_name) {
2059 Some(v) => v,
2060 None => return Ok(true), // If property missing, CHECK usually passes (unless NOT NULL)
2061 };
2062
2063 // Parse value string (handle quotes for strings)
2064 let target_val = if (val_str.starts_with('\'') && val_str.ends_with('\''))
2065 || (val_str.starts_with('"') && val_str.ends_with('"'))
2066 {
2067 Value::String(val_str[1..val_str.len() - 1].to_string())
2068 } else if let Ok(n) = val_str.parse::<i64>() {
2069 Value::Int(n)
2070 } else if let Ok(n) = val_str.parse::<f64>() {
2071 Value::Float(n)
2072 } else if let Ok(b) = val_str.parse::<bool>() {
2073 Value::Bool(b)
2074 } else {
2075 // Check for internal format wrappers if they somehow leaked through
2076 if val_str.starts_with("Number(") && val_str.ends_with(')') {
2077 let n_str = &val_str[7..val_str.len() - 1];
2078 if let Ok(n) = n_str.parse::<i64>() {
2079 Value::Int(n)
2080 } else if let Ok(n) = n_str.parse::<f64>() {
2081 Value::Float(n)
2082 } else {
2083 Value::String(val_str.to_string())
2084 }
2085 } else {
2086 Value::String(val_str.to_string())
2087 }
2088 };
2089
2090 match op {
2091 "=" | "==" => Ok(prop_val == &target_val),
2092 "!=" | "<>" => Ok(prop_val != &target_val),
2093 ">" => self
2094 .compare_values(prop_val, &target_val)
2095 .map(|o| o.is_gt()),
2096 "<" => self
2097 .compare_values(prop_val, &target_val)
2098 .map(|o| o.is_lt()),
2099 ">=" => self
2100 .compare_values(prop_val, &target_val)
2101 .map(|o| o.is_ge()),
2102 "<=" => self
2103 .compare_values(prop_val, &target_val)
2104 .map(|o| o.is_le()),
2105 _ => {
2106 log::warn!("Unsupported operator '{}' in CHECK constraint", op);
2107 Ok(true)
2108 }
2109 }
2110 }
2111
2112 fn compare_values(&self, a: &Value, b: &Value) -> Result<std::cmp::Ordering> {
2113 use std::cmp::Ordering;
2114
2115 fn cmp_f64(x: f64, y: f64) -> Ordering {
2116 x.partial_cmp(&y).unwrap_or(Ordering::Equal)
2117 }
2118
2119 match (a, b) {
2120 (Value::Int(n1), Value::Int(n2)) => Ok(n1.cmp(n2)),
2121 (Value::Float(f1), Value::Float(f2)) => Ok(cmp_f64(*f1, *f2)),
2122 (Value::Int(n), Value::Float(f)) => Ok(cmp_f64(*n as f64, *f)),
2123 (Value::Float(f), Value::Int(n)) => Ok(cmp_f64(*f, *n as f64)),
2124 (Value::String(s1), Value::String(s2)) => Ok(s1.cmp(s2)),
2125 _ => Err(anyhow!(
2126 "Cannot compare incompatible types: {:?} vs {:?}",
2127 a,
2128 b
2129 )),
2130 }
2131 }
2132
2133 async fn check_unique_constraint_multi(
2134 &self,
2135 label: &str,
2136 key_values: &[(String, Value)],
2137 current_vid: Vid,
2138 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2139 ) -> Result<()> {
2140 // Serialize constraint key once for O(1) lookups
2141 let key = serialize_constraint_key(label, key_values);
2142
2143 // 1. Check L0 (in-memory) using O(1) constraint index
2144 {
2145 let l0 = self.l0_manager.get_current();
2146 let l0_guard = l0.read();
2147 if l0_guard.has_constraint_key(&key, current_vid) {
2148 return Err(anyhow!(
2149 "Constraint violation: Duplicate composite key for label '{}'",
2150 label
2151 ));
2152 }
2153 }
2154
2155 // 1b. Check pending-flush buffers (Bug #9A). A flush rotates a key's
2156 // buffer onto `pending_flush` and installs a fresh empty current
2157 // buffer; until the rotated rows reach Lance the key is invisible to
2158 // both the current-buffer check above and the storage check below, so
2159 // a duplicate could slip through that flush window. Mirror the read
2160 // paths (e.g. `check_extid_globally_unique`, `get_vertex_labels`) that
2161 // already consult `pending_flush`.
2162 for pending_l0 in self.l0_manager.get_pending_flush() {
2163 if pending_l0.read().has_constraint_key(&key, current_vid) {
2164 return Err(anyhow!(
2165 "Constraint violation: Duplicate composite key for label '{}' (in pending flush)",
2166 label
2167 ));
2168 }
2169 }
2170
2171 // Check Transaction L0
2172 if let Some(tx_l0) = tx_l0 {
2173 let tx_l0_guard = tx_l0.read();
2174 if tx_l0_guard.has_constraint_key(&key, current_vid) {
2175 return Err(anyhow!(
2176 "Constraint violation: Duplicate composite key for label '{}' (in tx)",
2177 label
2178 ));
2179 }
2180 }
2181
2182 // 2. Check Storage (L1/L2)
2183 let filters: Vec<String> = key_values
2184 .iter()
2185 .map(|(prop, val)| {
2186 let val_str = match val {
2187 Value::String(s) => format!("'{}'", s.replace('\'', "''")),
2188 Value::Int(n) => n.to_string(),
2189 Value::Float(f) => f.to_string(),
2190 Value::Bool(b) => b.to_string(),
2191 _ => "NULL".to_string(),
2192 };
2193 format!("{} = {}", prop, val_str)
2194 })
2195 .collect();
2196
2197 let mut filter = filters.join(" AND ");
2198 filter.push_str(&format!(
2199 " AND _deleted = false AND _vid != {}",
2200 current_vid.as_u64()
2201 ));
2202
2203 #[cfg(feature = "lance-backend")]
2204 if let Ok(ds) = self.storage.vertex_dataset(label)
2205 && let Ok(lance_ds) = ds.open_raw().await
2206 {
2207 let count = lance_ds.count_rows(Some(filter.clone())).await?;
2208 if count > 0 {
2209 return Err(anyhow!(
2210 "Constraint violation: Duplicate composite key for label '{}' (in storage). Filter: {}",
2211 label,
2212 filter
2213 ));
2214 }
2215 }
2216
2217 Ok(())
2218 }
2219
2220 async fn check_write_pressure(&self) -> Result<()> {
2221 let status = self
2222 .storage
2223 .compaction_status()
2224 .map_err(|e| anyhow::anyhow!("Failed to get compaction status: {}", e))?;
2225 let l1_runs = status.l1_runs;
2226 let throttle = &self.config.throttle;
2227
2228 if l1_runs >= throttle.hard_limit {
2229 log::warn!("Write stalled: L1 runs ({}) at hard limit", l1_runs);
2230 // Simple polling for now
2231 while self
2232 .storage
2233 .compaction_status()
2234 .map_err(|e| anyhow::anyhow!("Failed to get compaction status: {}", e))?
2235 .l1_runs
2236 >= throttle.hard_limit
2237 {
2238 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2239 }
2240 } else if l1_runs >= throttle.soft_limit {
2241 let excess = l1_runs - throttle.soft_limit;
2242 // Cap multiplier to avoid overflow
2243 let excess = std::cmp::min(excess, 31);
2244 let multiplier = 2_u32.pow(excess as u32);
2245 let delay = throttle.base_delay * multiplier;
2246 tokio::time::sleep(delay).await;
2247 }
2248 Ok(())
2249 }
2250
2251 /// Check transaction memory limit to prevent OOM.
2252 /// No-op when no transaction is active.
2253 fn check_transaction_memory(&self, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) -> Result<()> {
2254 if let Some(tx_l0) = tx_l0 {
2255 let size = tx_l0.read().estimated_size;
2256 if size > self.config.max_transaction_memory {
2257 return Err(anyhow!(
2258 "Transaction memory limit exceeded: {} bytes used, limit is {} bytes. \
2259 Roll back or commit the current transaction.",
2260 size,
2261 self.config.max_transaction_memory
2262 ));
2263 }
2264 }
2265 Ok(())
2266 }
2267
2268 async fn get_query_context(
2269 &self,
2270 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2271 ) -> Option<QueryContext> {
2272 Some(QueryContext::new_with_pending(
2273 self.l0_manager.get_current(),
2274 tx_l0.cloned(),
2275 self.l0_manager.get_pending_flush(),
2276 ))
2277 }
2278
2279 /// Layer-1 CRDT variant enforcement, shared by the single-vertex and batch
2280 /// write paths.
2281 ///
2282 /// Rejects a declared CRDT property written as a parsed CRDT value
2283 /// (`Value::Map`) whose variant differs from the schema's declared variant.
2284 /// A mismatch would make the commit-time merge silently overwrite instead of
2285 /// merge, and the OCC CRDT carve-out (`occ::crdt_carveout_overwrite` /
2286 /// `WriteSet::from_l0`) would hide it as a lost update — so it must be caught
2287 /// at write time, on *every* write path. `try_as_crdt` is `Map`-gated, so the
2288 /// JSON-string (Cypher) form and non-CRDT values pass through untouched: they
2289 /// are never carved out and stay conflictable.
2290 fn enforce_crdt_variants(
2291 props_meta: &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
2292 properties: &Properties,
2293 ) -> Result<()> {
2294 for (key, value) in properties {
2295 let Some(meta) = props_meta.get(key) else {
2296 continue;
2297 };
2298 let uni_common::core::schema::DataType::Crdt(expected) = &meta.r#type else {
2299 continue;
2300 };
2301 if let Some(crdt) = crate::runtime::l0::try_as_crdt(value)
2302 && crdt.type_name() != expected.type_name()
2303 {
2304 return Err(anyhow::Error::new(uni_common::UniError::Constraint {
2305 message: format!(
2306 "CRDT property '{key}' must be written as a {} value",
2307 expected.type_name()
2308 ),
2309 }));
2310 }
2311 }
2312 Ok(())
2313 }
2314
2315 /// Prepare a vertex for upsert by merging CRDT properties with existing values.
2316 ///
2317 /// When `label` is provided, uses it directly to look up property metadata.
2318 /// Otherwise falls back to discovering the label from L0 buffers and storage.
2319 ///
2320 /// # Errors
2321 ///
2322 /// Returns an error if CRDT property merging fails.
2323 async fn prepare_vertex_upsert(
2324 &self,
2325 vid: Vid,
2326 properties: &mut Properties,
2327 label: Option<&str>,
2328 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2329 ) -> Result<()> {
2330 let Some(pm) = &self.property_manager else {
2331 return Ok(());
2332 };
2333
2334 let schema = self.schema_manager.schema();
2335
2336 // Resolve label: use provided label or discover from L0/storage
2337 let discovered_labels;
2338 let label_name = if let Some(l) = label {
2339 Some(l)
2340 } else {
2341 discovered_labels = self.get_vertex_labels(vid, tx_l0).await;
2342 discovered_labels
2343 .as_ref()
2344 .and_then(|l| l.first().map(|s| s.as_str()))
2345 };
2346
2347 let Some(label_str) = label_name else {
2348 return Ok(());
2349 };
2350 let Some(props_meta) = schema.properties.get(label_str) else {
2351 return Ok(());
2352 };
2353
2354 // Identify CRDT properties in the insert data
2355 let crdt_keys: Vec<String> = properties
2356 .keys()
2357 .filter(|key| {
2358 props_meta.get(*key).is_some_and(|meta| {
2359 matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2360 })
2361 })
2362 .cloned()
2363 .collect();
2364
2365 if crdt_keys.is_empty() {
2366 return Ok(());
2367 }
2368
2369 // Enforce that each declared CRDT property written as a parsed CRDT value
2370 // (`Value::Map`) carries its declared variant. A mismatched variant makes
2371 // `merge_crdt_properties` overwrite rather than merge at commit, and the
2372 // OCC carve-out (`occ::crdt_carveout_overwrite` / `WriteSet::from_l0`)
2373 // would hide that as a silent lost update — reject it at the source.
2374 //
2375 // Only the `Map` form is checked: it is exactly the form the carve-out
2376 // applies to (`try_as_crdt` is `Map`-gated). A CRDT written as a JSON
2377 // string (the Cypher form) or a non-CRDT value is never carved out — it
2378 // stays conflictable — so it poses no carve-out soundness risk and is left
2379 // to the existing merge/parse path. This is the declared-property half of
2380 // the layered fix; the commit-time check covers undeclared CRDT-shaped values.
2381 Self::enforce_crdt_variants(props_meta, properties)?;
2382
2383 let ctx = self.get_query_context(tx_l0).await;
2384 for key in crdt_keys {
2385 let existing = pm.get_vertex_prop_with_ctx(vid, &key, ctx.as_ref()).await?;
2386 if !existing.is_null()
2387 && let Some(val) = properties.get_mut(&key)
2388 {
2389 *val = pm.merge_crdt_values(&existing, val)?;
2390 }
2391 }
2392
2393 Ok(())
2394 }
2395
2396 async fn prepare_edge_upsert(
2397 &self,
2398 eid: Eid,
2399 properties: &mut Properties,
2400 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2401 ) -> Result<()> {
2402 if let Some(pm) = &self.property_manager {
2403 let schema = self.schema_manager.schema();
2404 // Get edge type from L0 buffer instead of from EID
2405 let type_name = self.get_edge_type_from_l0(eid);
2406
2407 if let Some(ref t_name) = type_name
2408 && let Some(props_meta) = schema.properties.get(t_name)
2409 {
2410 let mut crdt_keys = Vec::new();
2411 for (key, _) in properties.iter() {
2412 if let Some(meta) = props_meta.get(key)
2413 && matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2414 {
2415 crdt_keys.push(key.clone());
2416 }
2417 }
2418
2419 if !crdt_keys.is_empty() {
2420 let ctx = self.get_query_context(tx_l0).await;
2421 for key in crdt_keys {
2422 let existing = pm.get_edge_prop(eid, &key, ctx.as_ref()).await?;
2423
2424 if !existing.is_null()
2425 && let Some(val) = properties.get_mut(&key)
2426 {
2427 *val = pm.merge_crdt_values(&existing, val)?;
2428 }
2429 }
2430 }
2431 }
2432 }
2433 Ok(())
2434 }
2435
2436 #[instrument(skip(self, properties), level = "trace")]
2437 pub async fn insert_vertex(
2438 &self,
2439 vid: Vid,
2440 properties: Properties,
2441 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2442 ) -> Result<()> {
2443 self.insert_vertex_with_labels(vid, properties, &[], tx_l0)
2444 .await?;
2445 Ok(())
2446 }
2447
2448 /// Component C1 (G4): before a non-transactional mutation merges into main
2449 /// L0, if an outstanding snapshot pins the current generation, freeze it
2450 /// aside so snapshots taken *before* this write stay isolated from it.
2451 ///
2452 /// `flush_lock` (acquired and released here) serializes the freeze against
2453 /// concurrent commit-time freezes/merges, matching the atomicity the tx
2454 /// commit path gets. No-op for transactional writes (their freeze happens at
2455 /// commit) and — the common case — when nothing is pinned, where it costs one
2456 /// atomic load. Freezes at most once per pinned generation: the freeze
2457 /// installs a fresh unpinned `current`, so later writes in the same bulk
2458 /// import see no pin and merge in place, and the snapshot keeps reading the
2459 /// frozen pre-import buffer.
2460 async fn freeze_for_non_tx_write_if_pinned(&self, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) {
2461 // Self-gates on the runtime SSI toggle: nothing pins a snapshot unless a
2462 // transaction began under `ssi_enabled`, so `is_current_pinned()` is
2463 // always false (one atomic load) when SSI is off.
2464 if tx_l0.is_none() && self.l0_manager.is_current_pinned() {
2465 let _flush_lock_guard = self.flush_lock.lock().await;
2466 // Re-check under the lock: a concurrent commit may have frozen first.
2467 if self.l0_manager.is_current_pinned() {
2468 self.l0_manager.freeze_current_for_snapshot();
2469 metrics::counter!("uni_l0_snapshot_freezes_total").increment(1);
2470 }
2471 }
2472 }
2473
2474 #[instrument(skip(self, properties, labels), level = "trace")]
2475 pub async fn insert_vertex_with_labels(
2476 &self,
2477 vid: Vid,
2478 mut properties: Properties,
2479 labels: &[String],
2480 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2481 ) -> Result<Properties> {
2482 let start = std::time::Instant::now();
2483 self.check_write_pressure().await?;
2484 self.check_transaction_memory(tx_l0)?;
2485
2486 // Component C1 (G4): a non-transactional write (`tx_l0 == None`, e.g. bulk
2487 // import / LOAD CSV) mutates main L0 directly, outside the commit-time
2488 // snapshot freeze. Freeze the pinned generation aside first so snapshots
2489 // taken before this write stay isolated from it.
2490 self.freeze_for_non_tx_write_if_pinned(tx_l0).await;
2491
2492 if !self.try_defer_embedding(labels, &properties, vid, tx_l0) {
2493 self.process_embeddings_for_labels(labels, &mut properties)
2494 .await?;
2495 }
2496 self.validate_vertex_constraints(vid, &properties, labels, tx_l0)
2497 .await?;
2498 self.prepare_vertex_upsert(
2499 vid,
2500 &mut properties,
2501 labels.first().map(|s| s.as_str()),
2502 tx_l0,
2503 )
2504 .await?;
2505
2506 // Clone properties and labels before moving into L0 to return them and populate constraint index
2507 let properties_copy = properties.clone();
2508 let labels_copy = labels.to_vec();
2509
2510 {
2511 // For a non-tx write, re-resolve the live `current` buffer and hold
2512 // `flush_lock` across the (synchronous) write so a concurrent flush
2513 // rotate (which takes `flush_lock` via `begin_flush`) cannot install
2514 // a fresh `current` between resolve and write, dropping our write
2515 // (Bug #4). For a tx write `resolve_l0` returns the tx-private
2516 // buffer, never the rotating `current`, so no `flush_lock` is needed
2517 // (and taking it would risk re-entrancy with the commit path).
2518 let _flush_lock_guard = if tx_l0.is_none() {
2519 Some(self.flush_lock.lock().await)
2520 } else {
2521 None
2522 };
2523 let l0 = self.resolve_l0(tx_l0);
2524 let mut l0_guard = l0.write();
2525 // Generation chaining: a non-tx CRDT write into the (post-freeze,
2526 // possibly empty) current buffer must merge against the chained
2527 // committed state, not shadow it. No-op when the chain is empty
2528 // or this is a tx-private write.
2529 if tx_l0.is_none() {
2530 let pending = self.l0_manager.get_pending_flush();
2531 if !pending.is_empty() {
2532 Self::seed_crdt_props(&pending, vid, &properties, &mut l0_guard);
2533 }
2534 }
2535 l0_guard.insert_vertex_with_labels(vid, properties, labels);
2536
2537 // Populate constraint index for O(1) duplicate detection
2538 let schema = self.schema_manager.schema();
2539 for label in &labels_copy {
2540 if schema.get_label_case_insensitive(label).is_none() {
2541 if self.config.strict_schema {
2542 return Err(anyhow::anyhow!(
2543 "Label '{}' is not defined in the schema \
2544 (strict_schema is enabled).",
2545 label
2546 ));
2547 }
2548 continue; // Schemaless: skip unknown labels.
2549 }
2550
2551 // For each unique constraint on this label, insert into constraint index
2552 for constraint in &schema.constraints {
2553 if !constraint.enabled {
2554 continue;
2555 }
2556 if let ConstraintTarget::Label(l) = &constraint.target {
2557 if l != label {
2558 continue;
2559 }
2560 } else {
2561 continue;
2562 }
2563
2564 if let ConstraintType::Unique {
2565 properties: unique_props,
2566 } = &constraint.constraint_type
2567 {
2568 let mut key_values = Vec::new();
2569 let mut all_present = true;
2570 for prop in unique_props {
2571 if let Some(val) = properties_copy.get(prop) {
2572 key_values.push((prop.clone(), val.clone()));
2573 } else {
2574 all_present = false;
2575 break;
2576 }
2577 }
2578
2579 if all_present {
2580 let key = serialize_constraint_key(label, &key_values);
2581 l0_guard.insert_constraint_key(key, vid);
2582 }
2583 }
2584 }
2585 }
2586 }
2587
2588 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2589 self.update_metrics();
2590
2591 if tx_l0.is_none() {
2592 self.check_flush().await?;
2593 }
2594 if start.elapsed().as_millis() > 100 {
2595 log::warn!("Slow insert_vertex: {}ms", start.elapsed().as_millis());
2596 }
2597 Ok(properties_copy)
2598 }
2599
2600 /// True iff routing this partial write through MergeInsert would
2601 /// miss a constraint check. Specifically: a multi-key UNIQUE
2602 /// constraint where the touched-set doesn't cover all member keys
2603 /// requires the unchanged keys from the existing row to compute
2604 /// the composite. Conservative: also returns true if any touched
2605 /// key is `ext_id` (uniqueness checked globally — handled in the
2606 /// full-row path).
2607 fn touched_needs_full_read(&self, touched: &Properties, labels: &[String]) -> bool {
2608 if touched.contains_key("ext_id") {
2609 return true;
2610 }
2611 let schema = self.schema_manager.schema();
2612 for label in labels {
2613 if schema.get_label_case_insensitive(label).is_none() {
2614 continue;
2615 }
2616 for constraint in &schema.constraints {
2617 if !constraint.enabled {
2618 continue;
2619 }
2620 if let ConstraintTarget::Label(l) = &constraint.target {
2621 if !l.eq_ignore_ascii_case(label) {
2622 continue;
2623 }
2624 } else {
2625 continue;
2626 }
2627 if let ConstraintType::Unique {
2628 properties: unique_props,
2629 } = &constraint.constraint_type
2630 {
2631 if unique_props.len() < 2 {
2632 continue; // single-key UNIQUE — partial path sees the key
2633 }
2634 if unique_props.iter().any(|p| touched.contains_key(p)) {
2635 return true;
2636 }
2637 }
2638 }
2639 }
2640 false
2641 }
2642
2643 /// Insert a vertex's FULL property row plus a touched-keys hint so
2644 /// the flush emits ONLY those columns via Lance MergeInsert.
2645 ///
2646 /// Caller must have read the full row (via PropertyManager) and
2647 /// applied SET-touched values on top before calling — same input
2648 /// shape as `insert_vertex_with_labels`. The new arg `touched_keys`
2649 /// is the set of property keys this SET statement actually
2650 /// assigned; L0 records it in `vertex_partial_keys[vid]` and the
2651 /// flush filters the MergeInsert source schema down to those keys.
2652 /// When `UniConfig::partial_lance_writes == false`, falls through
2653 /// to `insert_vertex_with_labels` (Append) — preserving bit-for-bit
2654 /// equivalence with prior releases.
2655 #[instrument(skip(self, props, touched_keys, labels), level = "trace")]
2656 pub async fn insert_vertex_partial_full(
2657 &self,
2658 vid: Vid,
2659 mut props: Properties,
2660 touched_keys: HashSet<String>,
2661 labels: &[String],
2662 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2663 ) -> Result<()> {
2664 if !self.config.partial_lance_writes
2665 || self.touched_needs_full_read(&props_subset(&props, &touched_keys), labels)
2666 {
2667 self.insert_vertex_with_labels(vid, props, labels, tx_l0)
2668 .await?;
2669 return Ok(());
2670 }
2671
2672 self.check_write_pressure().await?;
2673 self.check_transaction_memory(tx_l0)?;
2674 if !self.try_defer_embedding(labels, &props, vid, tx_l0) {
2675 self.process_embeddings_for_labels(labels, &mut props)
2676 .await?;
2677 }
2678 // Full-row validation runs because we have the complete map;
2679 // no need for the partial-only validator.
2680 self.validate_vertex_constraints(vid, &props, labels, tx_l0)
2681 .await?;
2682 {
2683 let l0 = self.resolve_l0(tx_l0);
2684 let mut l0_guard = l0.write();
2685 l0_guard.insert_vertex_partial_full(vid, props, touched_keys, labels);
2686 }
2687 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2688 metrics::counter!("uni_partial_writes_total").increment(1);
2689 self.update_metrics();
2690 if tx_l0.is_none() {
2691 self.check_flush().await?;
2692 }
2693 Ok(())
2694 }
2695
2696 /// Insert a vertex's *partial* property set without first reading the
2697 /// full row.
2698 ///
2699 /// When `WriterConfig::partial_lance_writes` is `true`, the touched
2700 /// keys flow into `L0Buffer::vertex_partial_keys` so the next flush
2701 /// emits them via Lance `MergeInsertBuilder` against a subset-of-
2702 /// schema source — preserving untouched columns (e.g., embeddings)
2703 /// byte-equal in Lance with no read at the caller and no write of
2704 /// those columns.
2705 ///
2706 /// When the flag is `false`, this falls back to the existing
2707 /// `insert_vertex_with_labels` path after merging `touched` with
2708 /// the current properties from L0/storage. The caller can therefore
2709 /// use this entry point unconditionally; the optimization activates
2710 /// only when the flag is on.
2711 #[instrument(skip(self, touched, labels), level = "trace")]
2712 pub async fn insert_vertex_partial(
2713 &self,
2714 vid: Vid,
2715 touched: Properties,
2716 labels: &[String],
2717 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2718 ) -> Result<()> {
2719 let needs_full_read =
2720 !self.config.partial_lance_writes || self.touched_needs_full_read(&touched, labels);
2721 if needs_full_read {
2722 // Flag-off fallback (or constraint-driven fallback): merge
2723 // `touched` with the current full property snapshot from
2724 // L0/storage and route through the existing path. Preserves
2725 // bit-for-bit equivalence with the pre-Round-11 release.
2726 let existing = if let Some(pm) = &self.property_manager {
2727 pm.get_all_vertex_props_with_ctx(vid, None)
2728 .await
2729 .unwrap_or_default()
2730 .unwrap_or_default()
2731 } else {
2732 Properties::new()
2733 };
2734 let mut merged = existing;
2735 for (k, v) in touched {
2736 merged.insert(k, v);
2737 }
2738 self.insert_vertex_with_labels(vid, merged, labels, tx_l0)
2739 .await?;
2740 return Ok(());
2741 }
2742
2743 // Flag-on fast path: stage the partial update directly. Pressure
2744 // checks, embedding generation, constraint validation all still
2745 // run — but the validator is the partial-aware variant that
2746 // skips NOT NULL / multi-key UNIQUE / CHECK / EXISTS for
2747 // properties not present in `touched`. Multi-key UNIQUE that
2748 // overlaps the touched set forces a fallback above via
2749 // `touched_needs_full_read`.
2750 let mut touched = touched;
2751 self.check_write_pressure().await?;
2752 self.check_transaction_memory(tx_l0)?;
2753 if !self.try_defer_embedding(labels, &touched, vid, tx_l0) {
2754 self.process_embeddings_for_labels(labels, &mut touched)
2755 .await?;
2756 }
2757 self.validate_vertex_constraints_partial(vid, &touched, labels, tx_l0)
2758 .await?;
2759
2760 {
2761 let l0 = self.resolve_l0(tx_l0);
2762 let mut l0_guard = l0.write();
2763 l0_guard.insert_vertex_partial(vid, touched, labels);
2764 }
2765
2766 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2767 metrics::counter!("uni_partial_writes_total").increment(1);
2768 self.update_metrics();
2769 if tx_l0.is_none() {
2770 self.check_flush().await?;
2771 }
2772 Ok(())
2773 }
2774
2775 /// Insert multiple vertices with batched operations.
2776 ///
2777 /// This method uses batched operations to achieve O(N) complexity instead of O(N²)
2778 /// for bulk inserts with unique constraints.
2779 ///
2780 /// # Performance Improvements
2781 /// - Batch VID allocation: 1 call instead of N calls
2782 /// - Batch constraint validation: O(N) instead of O(N²)
2783 /// - Batch embedding generation: 1 API call per config instead of N calls
2784 /// - Transaction wrapping: Automatic flush deferral, atomicity
2785 ///
2786 /// # Arguments
2787 /// * `vids` - Pre-allocated VIDs for the vertices
2788 /// * `properties_batch` - Properties for each vertex
2789 /// * `labels` - Labels for all vertices (assumes single label for simplicity)
2790 ///
2791 /// # Errors
2792 /// Returns error if:
2793 /// - VID/properties length mismatch
2794 /// - Constraint violation detected
2795 /// - Embedding generation fails
2796 /// - Transaction commit fails
2797 ///
2798 /// # Atomicity
2799 /// If this method fails, all changes are rolled back (if transaction was started here).
2800 pub async fn insert_vertices_batch(
2801 &self,
2802 vids: Vec<Vid>,
2803 mut properties_batch: Vec<Properties>,
2804 labels: Vec<String>,
2805 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2806 ) -> Result<Vec<Properties>> {
2807 let start = std::time::Instant::now();
2808
2809 // Validate inputs
2810 if vids.len() != properties_batch.len() {
2811 return Err(anyhow!(
2812 "VID/properties size mismatch: {} vids, {} properties",
2813 vids.len(),
2814 properties_batch.len()
2815 ));
2816 }
2817
2818 if vids.is_empty() {
2819 return Ok(Vec::new());
2820 }
2821
2822 // Batch operations — writes go directly to the resolved L0.
2823 // Atomicity is guaranteed by the caller holding the writer lock.
2824 let result = async {
2825 self.check_write_pressure().await?;
2826 self.check_transaction_memory(tx_l0)?;
2827
2828 // Component C1 (G4): batch bulk-import is the canonical non-tx write —
2829 // freeze the pinned generation aside before merging so snapshot
2830 // readers stay isolated. No-op when unpinned or transactional.
2831 self.freeze_for_non_tx_write_if_pinned(tx_l0).await;
2832
2833 // Batch embedding generation (1 API call per config)
2834 self.process_embeddings_for_batch(&labels, &mut properties_batch)
2835 .await?;
2836
2837 // Batch constraint validation (O(N) instead of O(N²))
2838 let label = labels
2839 .first()
2840 .ok_or_else(|| anyhow!("No labels provided"))?;
2841 self.validate_vertex_batch_constraints(&vids, &properties_batch, label, tx_l0)
2842 .await?;
2843
2844 // Batch prepare (CRDT merging if needed)
2845 // Check schema once: skip entirely if no CRDT properties for this label.
2846 // For new vertices (freshly allocated VIDs), there are no existing CRDT
2847 // values to merge, so the per-vertex lookup is unnecessary in that case.
2848 let has_crdt_fields = {
2849 let schema = self.schema_manager.schema();
2850 schema
2851 .properties
2852 .get(label.as_str())
2853 .is_some_and(|props_meta| {
2854 props_meta.values().any(|meta| {
2855 matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2856 })
2857 })
2858 };
2859
2860 if has_crdt_fields {
2861 // Layer-1 variant enforcement (G3): the batch path must reject a
2862 // declared-CRDT variant mismatch exactly as the single-vertex
2863 // `prepare_vertex_upsert` does. Without this, a wrong-variant CRDT
2864 // written via batch import slips past write-time validation and
2865 // the OCC carve-out then masks the overwrite as a lost update.
2866 {
2867 let schema = self.schema_manager.schema();
2868 if let Some(props_meta) = schema.properties.get(label.as_str()) {
2869 for props in &properties_batch {
2870 Self::enforce_crdt_variants(props_meta, props)?;
2871 }
2872 }
2873 }
2874
2875 // Batch fetch existing CRDT values: collect VIDs that need merging,
2876 // then query once via PropertyManager instead of per-vertex lookups.
2877 let schema = self.schema_manager.schema();
2878 let crdt_keys: Vec<String> = schema
2879 .properties
2880 .get(label.as_str())
2881 .map(|props_meta| {
2882 props_meta
2883 .iter()
2884 .filter(|(_, meta)| {
2885 matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2886 })
2887 .map(|(key, _)| key.clone())
2888 .collect()
2889 })
2890 .unwrap_or_default();
2891
2892 if let Some(pm) = &self.property_manager {
2893 let ctx = self.get_query_context(tx_l0).await;
2894 for (vid, props) in vids.iter().zip(&mut properties_batch) {
2895 for key in &crdt_keys {
2896 if props.contains_key(key) {
2897 let existing =
2898 pm.get_vertex_prop_with_ctx(*vid, key, ctx.as_ref()).await?;
2899 if !existing.is_null()
2900 && let Some(val) = props.get_mut(key)
2901 {
2902 *val = pm.merge_crdt_values(&existing, val)?;
2903 }
2904 }
2905 }
2906 }
2907 }
2908 }
2909
2910 // Batch L0 writes — route to active L0 (transaction L0 if active, else current).
2911 let target_l0 = self.resolve_l0(tx_l0);
2912
2913 let properties_result = properties_batch.clone();
2914 {
2915 let mut l0_guard = target_l0.write();
2916 for (vid, props) in vids.iter().zip(properties_batch.iter()) {
2917 l0_guard.insert_vertex_with_labels(*vid, props.clone(), &labels);
2918 }
2919 }
2920
2921 // Update metrics (batch increment)
2922 metrics::counter!("uni_l0_buffer_mutations_total").increment(vids.len() as u64);
2923 self.update_metrics();
2924
2925 Ok::<Vec<Properties>, anyhow::Error>(properties_result)
2926 }
2927 .await;
2928
2929 let props = result?;
2930
2931 if start.elapsed().as_millis() > 100 {
2932 log::warn!(
2933 "Slow insert_vertices_batch ({} vertices): {}ms",
2934 vids.len(),
2935 start.elapsed().as_millis()
2936 );
2937 }
2938
2939 Ok(props)
2940 }
2941
2942 /// Delete a vertex by VID.
2943 ///
2944 /// When `labels` is provided, uses them directly to populate L0 for
2945 /// correct tombstone flushing. Otherwise discovers labels from L0
2946 /// buffers and storage (which can be slow for many vertices).
2947 ///
2948 /// # Errors
2949 ///
2950 /// Returns an error if write pressure stalls, label lookup fails, or
2951 /// the L0 delete operation fails.
2952 #[instrument(skip(self, labels), level = "trace")]
2953 pub async fn delete_vertex(
2954 &self,
2955 vid: Vid,
2956 labels: Option<Vec<String>>,
2957 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2958 ) -> Result<()> {
2959 let start = std::time::Instant::now();
2960 self.check_write_pressure().await?;
2961 self.check_transaction_memory(tx_l0)?;
2962 self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
2963
2964 // Before deleting, ensure we have the vertex's labels stored in L0 so
2965 // the tombstone can be flushed to the correct label datasets. Discover
2966 // them up front (this may await storage) WITHOUT pinning the buffer we
2967 // will eventually mutate — for non-tx writes the live `current` buffer
2968 // is re-resolved below under `flush_lock`, so a concurrent rotate can't
2969 // drop our write (Bug #4). `resolve_l0` here is only used for cheap
2970 // reads that tolerate a racing rotate.
2971 let has_labels = {
2972 let l0_guard = self.resolve_l0(tx_l0);
2973 let guard = l0_guard.read();
2974 guard.vertex_labels.contains_key(&vid)
2975 };
2976
2977 let backfill_labels = if has_labels {
2978 None
2979 } else if let Some(provided) = labels {
2980 // Caller provided labels — skip the lookup entirely
2981 Some(provided)
2982 } else {
2983 // Discover labels from pending flush L0s, then storage
2984 let mut found = None;
2985 for pending_l0 in self.l0_manager.get_pending_flush() {
2986 let pending_guard = pending_l0.read();
2987 if let Some(l) = pending_guard.get_vertex_labels(vid) {
2988 found = Some(l.to_vec());
2989 break;
2990 }
2991 }
2992 if found.is_none() {
2993 found = self.find_vertex_labels_in_storage(vid).await?;
2994 }
2995 found
2996 };
2997
2998 // Test-only seam (no-op without the `failpoints` feature): pause a
2999 // non-transactional delete AFTER the awaited label discovery but BEFORE
3000 // it re-resolves the live buffer and writes the tombstone. A concurrent
3001 // flush can rotate+complete a buffer in this window; the fix re-resolves
3002 // `get_current()` and mutates it under `flush_lock`, so the tombstone
3003 // always lands in the live buffer (Bug #4 — silent lost delete across
3004 // L0 rotation).
3005 fail::fail_point!("nontx::after-capture");
3006
3007 // Apply the label backfill and the tombstone together. For a non-tx
3008 // write, hold `flush_lock` across the (synchronous) re-resolve + write
3009 // so a concurrent flush rotate (which takes `flush_lock` via
3010 // `begin_flush`) cannot install a fresh `current` between our resolve
3011 // and our write. For a tx write `resolve_l0` returns the tx-private
3012 // buffer (never the rotating `current`), so no `flush_lock` is needed
3013 // — and taking it there would risk re-entrancy with the commit path.
3014 if tx_l0.is_none() {
3015 let _flush_lock_guard = self.flush_lock.lock().await;
3016 let l0 = self.l0_manager.get_current();
3017 let mut guard = l0.write();
3018 if let Some(found_labels) = backfill_labels {
3019 guard.vertex_labels.insert(vid, found_labels);
3020 }
3021 guard.delete_vertex(vid)?;
3022 } else {
3023 let l0 = self.resolve_l0(tx_l0);
3024 let mut guard = l0.write();
3025 if let Some(found_labels) = backfill_labels {
3026 guard.vertex_labels.insert(vid, found_labels);
3027 }
3028 guard.delete_vertex(vid)?;
3029 }
3030 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
3031 self.update_metrics();
3032
3033 if tx_l0.is_none() {
3034 self.check_flush().await?;
3035 }
3036 if start.elapsed().as_millis() > 100 {
3037 log::warn!("Slow delete_vertex: {}ms", start.elapsed().as_millis());
3038 }
3039 Ok(())
3040 }
3041
3042 /// Find vertex labels from storage by querying the main vertices table.
3043 /// Returns the labels from the latest non-deleted version of the vertex.
3044 async fn find_vertex_labels_in_storage(&self, vid: Vid) -> Result<Option<Vec<String>>> {
3045 use crate::backend::types::ScanRequest;
3046 use arrow_array::Array;
3047 use arrow_array::cast::AsArray;
3048
3049 let backend = self.storage.backend();
3050 let table_name = MainVertexDataset::table_name();
3051
3052 // Check if table exists first; if not, vertex hasn't been flushed to storage yet
3053 if !backend.table_exists(table_name).await? {
3054 return Ok(None);
3055 }
3056
3057 // Query for this specific vid (don't filter by _deleted yet - we need to find the latest version first)
3058 let filter = format!("_vid = {}", vid.as_u64());
3059 let batches = backend
3060 .scan(
3061 ScanRequest::all(table_name)
3062 .with_filter(filter)
3063 .with_columns(vec![
3064 "_vid".to_string(),
3065 "labels".to_string(),
3066 "_version".to_string(),
3067 "_deleted".to_string(),
3068 ]),
3069 )
3070 .await
3071 .unwrap_or_default();
3072
3073 // Find the row with the highest version number
3074 let mut max_version: Option<u64> = None;
3075 let mut labels: Option<Vec<String>> = None;
3076 let mut is_deleted = false;
3077
3078 for batch in batches {
3079 if batch.num_rows() == 0 {
3080 continue;
3081 }
3082
3083 let version_array = batch
3084 .column_by_name("_version")
3085 .unwrap()
3086 .as_primitive::<arrow_array::types::UInt64Type>();
3087
3088 let deleted_array = batch.column_by_name("_deleted").unwrap().as_boolean();
3089
3090 let labels_array = batch.column_by_name("labels").unwrap().as_list::<i32>();
3091
3092 for row_idx in 0..batch.num_rows() {
3093 let version = version_array.value(row_idx);
3094
3095 if max_version.is_none_or(|mv| version > mv) {
3096 is_deleted = deleted_array.value(row_idx);
3097
3098 let labels_list = labels_array.value(row_idx);
3099 let string_array = labels_list.as_string::<i32>();
3100 let vertex_labels: Vec<String> = (0..string_array.len())
3101 .filter(|&i| !string_array.is_null(i))
3102 .map(|i| string_array.value(i).to_string())
3103 .collect();
3104
3105 max_version = Some(version);
3106 labels = Some(vertex_labels);
3107 }
3108 }
3109 }
3110
3111 // If the latest version is deleted, return None
3112 if is_deleted { Ok(None) } else { Ok(labels) }
3113 }
3114
3115 #[expect(clippy::too_many_arguments)]
3116 #[instrument(skip(self, props, touched_keys), level = "trace")]
3117 pub async fn insert_edge_partial_full(
3118 &self,
3119 src_vid: Vid,
3120 dst_vid: Vid,
3121 edge_type: u32,
3122 eid: Eid,
3123 props: Properties,
3124 edge_type_name: Option<String>,
3125 touched_keys: HashSet<String>,
3126 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
3127 ) -> Result<()> {
3128 self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
3129 if !self.config.partial_lance_writes {
3130 return self
3131 .insert_edge(
3132 src_vid,
3133 dst_vid,
3134 edge_type,
3135 eid,
3136 props,
3137 edge_type_name,
3138 tx_l0,
3139 )
3140 .await;
3141 }
3142
3143 let start = std::time::Instant::now();
3144 self.check_write_pressure().await?;
3145 self.check_transaction_memory(tx_l0)?;
3146 let mut props = props;
3147 self.prepare_edge_upsert(eid, &mut props, tx_l0).await?;
3148
3149 let l0 = self.resolve_l0(tx_l0);
3150 l0.write().insert_edge_partial_full(
3151 src_vid,
3152 dst_vid,
3153 edge_type,
3154 eid,
3155 props,
3156 edge_type_name,
3157 touched_keys,
3158 )?;
3159
3160 if tx_l0.is_none() {
3161 let version = l0.read().current_version;
3162 self.adjacency_manager
3163 .insert_edge(src_vid, dst_vid, eid, edge_type, version);
3164 }
3165
3166 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
3167 metrics::counter!("uni_partial_writes_total").increment(1);
3168 self.update_metrics();
3169 if tx_l0.is_none() {
3170 self.check_flush().await?;
3171 }
3172 if start.elapsed().as_millis() > 100 {
3173 log::warn!(
3174 "Slow insert_edge_partial_full: {}ms",
3175 start.elapsed().as_millis()
3176 );
3177 }
3178 Ok(())
3179 }
3180
3181 #[expect(clippy::too_many_arguments)]
3182 pub async fn insert_edge(
3183 &self,
3184 src_vid: Vid,
3185 dst_vid: Vid,
3186 edge_type: u32,
3187 eid: Eid,
3188 mut properties: Properties,
3189 edge_type_name: Option<String>,
3190 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
3191 ) -> Result<()> {
3192 let start = std::time::Instant::now();
3193 self.check_write_pressure().await?;
3194 self.check_transaction_memory(tx_l0)?;
3195 self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
3196 self.prepare_edge_upsert(eid, &mut properties, tx_l0)
3197 .await?;
3198
3199 let l0 = self.resolve_l0(tx_l0);
3200 l0.write()
3201 .insert_edge(src_vid, dst_vid, edge_type, eid, properties, edge_type_name)?;
3202
3203 // Dual-write to AdjacencyManager overlay (survives flush).
3204 // Skip for transaction-local L0 -- transaction edges are overlaid separately.
3205 if tx_l0.is_none() {
3206 let version = l0.read().current_version;
3207 self.adjacency_manager
3208 .insert_edge(src_vid, dst_vid, eid, edge_type, version);
3209 }
3210
3211 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
3212 self.update_metrics();
3213
3214 if tx_l0.is_none() {
3215 self.check_flush().await?;
3216 }
3217 if start.elapsed().as_millis() > 100 {
3218 log::warn!("Slow insert_edge: {}ms", start.elapsed().as_millis());
3219 }
3220 Ok(())
3221 }
3222
3223 #[instrument(skip(self), level = "trace")]
3224 pub async fn delete_edge(
3225 &self,
3226 eid: Eid,
3227 src_vid: Vid,
3228 dst_vid: Vid,
3229 edge_type: u32,
3230 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
3231 ) -> Result<()> {
3232 let start = std::time::Instant::now();
3233 self.check_write_pressure().await?;
3234 self.check_transaction_memory(tx_l0)?;
3235 self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
3236 let l0 = self.resolve_l0(tx_l0);
3237
3238 l0.write().delete_edge(eid, src_vid, dst_vid, edge_type)?;
3239
3240 // Dual-write tombstone to AdjacencyManager overlay.
3241 if tx_l0.is_none() {
3242 let version = l0.read().current_version;
3243 self.adjacency_manager
3244 .add_tombstone(eid, src_vid, dst_vid, edge_type, version);
3245 }
3246 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
3247 self.update_metrics();
3248
3249 if tx_l0.is_none() {
3250 self.check_flush().await?;
3251 }
3252 if start.elapsed().as_millis() > 100 {
3253 log::warn!("Slow delete_edge: {}ms", start.elapsed().as_millis());
3254 }
3255 Ok(())
3256 }
3257
3258 /// Decide whether a flush should be triggered based on mutation count
3259 /// or elapsed time since the last flush.
3260 ///
3261 /// Extracted from [`Writer::check_flush`] so `commit_transaction_l0` can
3262 /// reuse the decision while bypassing the lock-acquiring entry point
3263 /// (it already holds `flush_lock`).
3264 fn should_flush(&self) -> bool {
3265 let count = self.l0_manager.get_current().read().mutation_count;
3266 if count == 0 {
3267 return false;
3268 }
3269 if count >= self.config.auto_flush_threshold {
3270 return true;
3271 }
3272 if let Some(interval) = self.config.auto_flush_interval
3273 && self.last_flush_time.lock().elapsed() >= interval
3274 && count >= self.config.auto_flush_min_mutations
3275 {
3276 return true;
3277 }
3278 false
3279 }
3280
3281 /// Check if flush should be triggered based on mutation count or time elapsed.
3282 /// This method is called after each write operation and can also be called
3283 /// by a background task for time-based flushing.
3284 pub async fn check_flush(&self) -> Result<()> {
3285 if self.should_flush() {
3286 self.flush_to_l1(None).await?;
3287 }
3288 Ok(())
3289 }
3290
3291 /// Process embeddings for a vertex using labels passed directly.
3292 /// Use this when labels haven't been stored to L0 yet.
3293 async fn process_embeddings_for_labels(
3294 &self,
3295 labels: &[String],
3296 properties: &mut Properties,
3297 ) -> Result<()> {
3298 let label_name = labels.first().map(|s| s.as_str());
3299 self.process_embeddings_impl(label_name, properties).await
3300 }
3301
3302 /// Phase B: if `defer_embeddings` is enabled in `UniConfig` and the
3303 /// vertex has an embedding config that hasn't been satisfied by the
3304 /// caller-provided properties, enqueue the VID in
3305 /// `L0Buffer::pending_embeddings` and return `true`. The caller then
3306 /// skips `process_embeddings_for_labels` and the embedding is computed
3307 /// in a single batched call at flush time via
3308 /// `drain_pending_embeddings`.
3309 ///
3310 /// Returns `false` (caller falls back to today's per-row eager embed)
3311 /// if any of:
3312 /// - the flag is off,
3313 /// - no label has an embedding config,
3314 /// - the user already provided the target property (matches the
3315 /// existing skip-if-present semantics at writer.rs:2727).
3316 ///
3317 /// Trade-off: when deferral is active, in-tx reads of the embedding
3318 /// column return only what was already in storage (or nothing for
3319 /// brand-new vertices). Existing tests that RETURN n.embedding in
3320 /// the same tx as a SET on the source column must run with the flag
3321 /// off; opt in only when no such reads happen between write and
3322 /// commit.
3323 fn try_defer_embedding(
3324 &self,
3325 labels: &[String],
3326 properties: &Properties,
3327 vid: Vid,
3328 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
3329 ) -> bool {
3330 if !self.config.defer_embeddings {
3331 return false;
3332 }
3333 let Some(label) = labels.first() else {
3334 return false;
3335 };
3336
3337 let schema = self.schema_manager.schema();
3338 let mut has_unsatisfied_cfg = false;
3339 for idx in &schema.indexes {
3340 if let IndexDefinition::Vector(v_cfg) = idx
3341 && v_cfg.label == *label
3342 && v_cfg.embedding_config.is_some()
3343 && !properties.contains_key(&v_cfg.property)
3344 {
3345 has_unsatisfied_cfg = true;
3346 break;
3347 }
3348 }
3349 if !has_unsatisfied_cfg {
3350 return false;
3351 }
3352
3353 let l0 = self.resolve_l0(tx_l0);
3354 let mut guard = l0.write();
3355 guard.pending_embeddings.insert(vid, label.clone());
3356 true
3357 }
3358
3359 /// Drain `pending_embeddings` from the rotated old-L0 right before
3360 /// `flush_stream_l1` reads it. Groups by label, issues one batched
3361 /// `process_embeddings_for_batch` call per label, and writes the
3362 /// resulting embedding vectors into each VID's `vertex_properties`
3363 /// map. After this returns, the flush proceeds against an L0 that
3364 /// looks no different from one whose embeddings were generated
3365 /// per-row at insert.
3366 ///
3367 /// Idempotent: a VID whose embedding was already materialized
3368 /// (e.g., by on-demand read paths in a future Phase B revision) is
3369 /// detected via `properties.contains_key(target_prop)` inside
3370 /// `process_embeddings_for_batch` (writer.rs:~2650), so re-running
3371 /// the drain is safe.
3372 async fn drain_pending_embeddings(&self, old_l0_arc: &Arc<RwLock<L0Buffer>>) -> Result<()> {
3373 let by_label: HashMap<String, Vec<Vid>> = {
3374 let guard = old_l0_arc.read();
3375 if guard.pending_embeddings.is_empty() {
3376 return Ok(());
3377 }
3378 let mut m: HashMap<String, Vec<Vid>> = HashMap::new();
3379 for (vid, label) in &guard.pending_embeddings {
3380 m.entry(label.clone()).or_default().push(*vid);
3381 }
3382 m
3383 };
3384
3385 for (label, vids) in by_label {
3386 let mut properties_batch: Vec<Properties> = {
3387 let guard = old_l0_arc.read();
3388 vids.iter()
3389 .map(|vid| {
3390 guard
3391 .vertex_properties
3392 .get(vid)
3393 .cloned()
3394 .unwrap_or_default()
3395 })
3396 .collect()
3397 };
3398
3399 self.process_embeddings_for_batch(std::slice::from_ref(&label), &mut properties_batch)
3400 .await?;
3401
3402 let mut guard = old_l0_arc.write();
3403 for (vid, props) in vids.iter().zip(properties_batch) {
3404 let target = guard.vertex_properties.entry(*vid).or_default();
3405 for (k, v) in props {
3406 target.insert(k, v);
3407 }
3408 guard.pending_embeddings.remove(vid);
3409 }
3410 }
3411 Ok(())
3412 }
3413
3414 /// Materialise MUVERA FDE columns for the about-to-flush L0. Mirrors
3415 /// [`Self::drain_pending_embeddings`]: for each MUVERA index, compute the derived
3416 /// Fixed-Dimensional Encoding from each row's source multi-vector and inject it into
3417 /// that row's `vertex_properties` (so the normal column builder writes the
3418 /// `__fde_*` column with no hot-path change). For partial-write rows that touched the
3419 /// source column, the derived column is added to `vertex_partial_keys` so the partial
3420 /// MergeInsert batch carries the recomputed FDE (avoids staleness on `SET`).
3421 ///
3422 /// No-op when the schema has no MUVERA index. Unlike auto-embed, the FDE is a pure,
3423 /// deterministic, in-process transform — no runtime/embedding service needed.
3424 fn materialize_fde_columns(&self, old_l0_arc: &Arc<RwLock<L0Buffer>>) -> Result<()> {
3425 let schema = self.schema_manager.schema();
3426 let specs = crate::storage::muvera_index::fde_specs(&schema);
3427 if specs.is_empty() {
3428 return Ok(());
3429 }
3430 let mut guard = old_l0_arc.write();
3431 for spec in &specs {
3432 let encoder = uni_common::muvera::FdeEncoder::new(&spec.params)
3433 .map_err(|e| anyhow!("MUVERA index '{}': {e}", spec.index_name))?;
3434 // VIDs of this label currently in L0 (collect first to avoid a borrow
3435 // conflict with the per-row mutation below).
3436 let vids: Vec<Vid> = guard
3437 .vertex_labels
3438 .iter()
3439 .filter(|(_, labels)| labels.contains(&spec.label))
3440 .map(|(vid, _)| *vid)
3441 .collect();
3442 for vid in vids {
3443 // Decode the source multi-vector tokens (borrow ends before the mutation).
3444 let tokens = match guard
3445 .vertex_properties
3446 .get(&vid)
3447 .and_then(|p| p.get(&spec.source_prop))
3448 {
3449 Some(v) => crate::storage::muvera_index::value_to_multivec(v),
3450 None => continue, // source absent → leave the FDE column NULL
3451 };
3452 let fde = encoder.encode_doc(&tokens).map_err(|e| {
3453 anyhow!("MUVERA index '{}' vid {:?}: {e}", spec.index_name, vid)
3454 })?;
3455 if let Some(props) = guard.vertex_properties.get_mut(&vid) {
3456 props.insert(spec.derived_col.clone(), Value::Vector(fde));
3457 }
3458 if let Some(touched) = guard.vertex_partial_keys.get_mut(&vid)
3459 && touched.contains(&spec.source_prop)
3460 {
3461 touched.insert(spec.derived_col.clone());
3462 }
3463 }
3464 }
3465 Ok(())
3466 }
3467
3468 /// Process embeddings for a batch of vertices efficiently.
3469 ///
3470 /// Groups vertices by embedding config and makes batched API calls to the
3471 /// embedding service instead of calling once per vertex.
3472 ///
3473 /// # Performance
3474 /// For N vertices with embedding config:
3475 /// - Old approach: N API calls to embedding service
3476 /// - New approach: 1 API call per embedding config (usually 1 total)
3477 async fn process_embeddings_for_batch(
3478 &self,
3479 labels: &[String],
3480 properties_batch: &mut [Properties],
3481 ) -> Result<()> {
3482 let Some(label) = labels.first().map(|s| s.as_str()) else {
3483 return Ok(());
3484 };
3485 let schema = self.schema_manager.schema();
3486
3487 // Group auto-embed targets by (alias, source). A group with both a dense Vector and a
3488 // multi-vector List<Vector> column is a single-pass hybrid source: one inference fills
3489 // both. Non-mixed groups use the dense / multi-vector embedder as before.
3490 let groups = collect_embed_groups(&schema, label);
3491 if groups.is_empty() {
3492 return Ok(());
3493 }
3494
3495 for (key, group) in groups {
3496 let alias = &key.0;
3497 let want_dense = !group.dense.is_empty();
3498 let want_multi = !group.multi.is_empty();
3499
3500 // A row needs this group's inference if it has the source text and is still missing
3501 // at least one of the group's target columns (user-supplied values are preserved).
3502 let mut input_texts: Vec<String> = Vec::new();
3503 let mut needs: Vec<usize> = Vec::new();
3504 for (idx, properties) in properties_batch.iter().enumerate() {
3505 let all_present = group
3506 .dense
3507 .iter()
3508 .chain(group.multi.iter())
3509 .all(|t| properties.contains_key(t));
3510 if all_present {
3511 continue;
3512 }
3513 let mut inputs = Vec::new();
3514 for src in &group.source_properties {
3515 if let Some(val) = properties.get(src)
3516 && let Some(s) = val.as_str()
3517 {
3518 inputs.push(s.to_string());
3519 }
3520 }
3521 if inputs.is_empty() {
3522 continue;
3523 }
3524 let text = inputs.join(" ");
3525 let text = match &group.document_prefix {
3526 Some(prefix) => format!("{prefix}{text}"),
3527 None => text,
3528 };
3529 input_texts.push(text);
3530 needs.push(idx);
3531 }
3532 if input_texts.is_empty() {
3533 continue;
3534 }
3535
3536 let runtime = self
3537 .xervo_runtime
3538 .get()
3539 .ok_or_else(|| anyhow!("Uni-Xervo runtime not configured for auto-embedding"))?;
3540 let input_refs: Vec<&str> = input_texts.iter().map(|s| s.as_str()).collect();
3541 let (dense, multi) =
3542 embed_group(runtime, alias, &input_refs, want_dense, want_multi).await?;
3543
3544 for (i, &row) in needs.iter().enumerate() {
3545 if let Some(vec) = dense.as_ref().and_then(|d| d.get(i)) {
3546 let vals: Vec<Value> = vec.iter().map(|f| Value::Float(*f as f64)).collect();
3547 for t in &group.dense {
3548 if !properties_batch[row].contains_key(t) {
3549 properties_batch[row].insert(t.clone(), Value::List(vals.clone()));
3550 }
3551 }
3552 }
3553 if let Some(tokens) = multi.as_ref().and_then(|m| m.get(i)) {
3554 let mv = multivec_to_value(tokens);
3555 for t in &group.multi {
3556 if !properties_batch[row].contains_key(t) {
3557 properties_batch[row].insert(t.clone(), mv.clone());
3558 }
3559 }
3560 }
3561 }
3562 }
3563
3564 Ok(())
3565 }
3566
3567 async fn process_embeddings_impl(
3568 &self,
3569 label_name: Option<&str>,
3570 properties: &mut Properties,
3571 ) -> Result<()> {
3572 let schema = self.schema_manager.schema();
3573
3574 let Some(label) = label_name else {
3575 return Ok(());
3576 };
3577
3578 // Same (alias, source) grouping as the deferred path: a mixed dense + multi-vector
3579 // group is a single-pass hybrid source (one inference fills both columns).
3580 let groups = collect_embed_groups(&schema, label);
3581 if groups.is_empty() {
3582 log::info!("No embedding config found for label {}", label);
3583 return Ok(());
3584 }
3585
3586 for (key, group) in groups {
3587 let alias = &key.0;
3588 // Skip if every target already present (user-supplied values win).
3589 if group
3590 .dense
3591 .iter()
3592 .chain(group.multi.iter())
3593 .all(|t| properties.contains_key(t))
3594 {
3595 continue;
3596 }
3597
3598 let mut inputs = Vec::new();
3599 for src in &group.source_properties {
3600 if let Some(val) = properties.get(src)
3601 && let Some(s) = val.as_str()
3602 {
3603 inputs.push(s.to_string());
3604 }
3605 }
3606 if inputs.is_empty() {
3607 continue;
3608 }
3609 let text = inputs.join(" ");
3610 let text = match &group.document_prefix {
3611 Some(prefix) => format!("{prefix}{text}"),
3612 None => text,
3613 };
3614
3615 let runtime = self
3616 .xervo_runtime
3617 .get()
3618 .ok_or_else(|| anyhow!("Uni-Xervo runtime not configured for auto-embedding"))?;
3619 let want_dense = !group.dense.is_empty();
3620 let want_multi = !group.multi.is_empty();
3621 let (dense, multi) =
3622 embed_group(runtime, alias, &[text.as_str()], want_dense, want_multi).await?;
3623
3624 if let Some(vec) = dense.as_ref().and_then(|d| d.first()) {
3625 let vals: Vec<Value> = vec.iter().map(|f| Value::Float(*f as f64)).collect();
3626 for t in &group.dense {
3627 if !properties.contains_key(t) {
3628 properties.insert(t.clone(), Value::List(vals.clone()));
3629 }
3630 }
3631 }
3632 if let Some(tokens) = multi.as_ref().and_then(|m| m.first()) {
3633 let mv = multivec_to_value(tokens);
3634 for t in &group.multi {
3635 if !properties.contains_key(t) {
3636 properties.insert(t.clone(), mv.clone());
3637 }
3638 }
3639 }
3640 }
3641 Ok(())
3642 }
3643
3644 /// Flushes the current in-memory L0 buffer to L1 storage.
3645 ///
3646 /// # Lock Ordering
3647 ///
3648 /// To prevent deadlocks, locks must be acquired in the following order:
3649 /// 1. `Writer` lock (held by caller via outer `Arc<RwLock<Writer>>`; removed in Phase 4)
3650 /// 2. `flush_lock` (acquired by this entry point; held across the whole flush)
3651 /// 3. `L0Manager` lock (via `begin_flush` / `get_current`)
3652 /// 4. `L0Buffer` lock (individual buffer RWLocks)
3653 /// 5. `Index` / `Storage` locks (during actual flush)
3654 ///
3655 /// Callers that already hold `flush_lock` (today only `commit_transaction_l0`)
3656 /// must call `flush_inline_under_lock` (private) directly to avoid a re-entrant
3657 /// `tokio::sync::Mutex` deadlock — see concurrent_writer.md §5.5.
3658 pub async fn flush_to_l1(&self, name: Option<String>) -> Result<String> {
3659 // Drain any in-flight async flushes first. `flush_to_l1` is a
3660 // SYNCHRONIZATION BARRIER — callers (test fixtures, fork
3661 // setup, shutdown paths) rely on it as "all writes are now
3662 // durably in Lance". Without the drain, an async stream from
3663 // a recent commit might still be writing to Lance when
3664 // `flush_to_l1` returns, leaving a window where forks branch
3665 // off pre-write Lance state and lose data.
3666 if let Some(coord) = self.flush_coordinator.as_ref() {
3667 let _ = coord.drain(self.config.drop_fork_drain_timeout).await;
3668 }
3669 let _flush_lock_guard = self.flush_lock.lock().await;
3670 self.flush_inline_under_lock(name).await
3671 }
3672
3673 /// Flush L0→L1 and capture the fork point under one held `flush_lock`.
3674 ///
3675 /// Drains in-flight async flushes, takes `flush_lock`, runs the inline
3676 /// flush, and then — still holding the lock — reads the allocator
3677 /// high-water marks and each existing candidate dataset's Lance
3678 /// version. Capturing under the held lock is what makes the fork point
3679 /// atomic: no concurrent commit can advance the allocator and no
3680 /// concurrent flush can advance a dataset tip between the flush and the
3681 /// reads. See [`ForkPoint`].
3682 ///
3683 /// `candidate_dataset_names` are resolved to `{base_uri}/{name}.lance`;
3684 /// names with no `.lance` directory on disk are skipped (returned map
3685 /// has no entry for them), matching the fork branch loop's existence
3686 /// check.
3687 ///
3688 /// # Errors
3689 /// Propagates flush failures from `flush_inline_under_lock` and any
3690 /// per-dataset version read failure from `lance_branch::current_version`.
3691 ///
3692 /// # Deadlocks
3693 /// Must not be called by a task already holding `flush_lock` (e.g.
3694 /// `commit_transaction_l0`); the `tokio::sync::Mutex` is not reentrant.
3695 /// Fork creation never holds the lock, so the sole call site is safe.
3696 pub async fn flush_and_capture_fork_point(
3697 &self,
3698 candidate_dataset_names: &[String],
3699 ) -> Result<ForkPoint> {
3700 if let Some(coord) = self.flush_coordinator.as_ref() {
3701 let _ = coord.drain(self.config.drop_fork_drain_timeout).await;
3702 }
3703 let _flush_lock_guard = self.flush_lock.lock().await;
3704 self.flush_inline_under_lock(None).await?;
3705
3706 // Still under `flush_lock`: capture the allocator HWM, the MVCC
3707 // version HWM, and every existing dataset's Lance version so
3708 // nothing can interleave.
3709 let (vid_hwm, eid_hwm) = self.allocator.current_hwm().await;
3710 // The parent's current L0 version is the largest `_version` any
3711 // inherited row can carry (flushed or in-memory). A fork bootstraps
3712 // its version floor to this so a fork tx read still sees inherited
3713 // rows. Cheap read lock; no buffer clone.
3714 let version_hwm = self.l0_manager.get_current().read().current_version;
3715
3716 let base = self.storage.base_uri();
3717 let mut dataset_versions = BTreeMap::new();
3718 for name in candidate_dataset_names {
3719 let uri = join_lance_uri(base, name);
3720 if !lance_path_exists(&uri) {
3721 continue;
3722 }
3723 let version = crate::backend::lance_branch::current_version(&uri).await?;
3724 dataset_versions.insert(name.clone(), version);
3725 }
3726
3727 Ok(ForkPoint {
3728 vid_hwm,
3729 eid_hwm,
3730 dataset_versions,
3731 version_hwm,
3732 })
3733 }
3734
3735 /// Async-flush entry point: rotate under `flush_lock`, release the
3736 /// lock, then submit the stream phase to the [`FlushCoordinator`].
3737 /// Returns a [`FlushTicket`](crate::runtime::flush_coordinator::FlushTicket)
3738 /// that resolves when finalize completes.
3739 ///
3740 /// Errors if `config.async_flush_enabled = false` (the coordinator
3741 /// is `None` in that case — see `flush_coordinator` field doc).
3742 pub async fn flush_to_l1_async(
3743 self: &Arc<Self>,
3744 name: Option<String>,
3745 ) -> Result<crate::runtime::flush_coordinator::FlushTicket> {
3746 let coord = self
3747 .flush_coordinator
3748 .as_ref()
3749 .ok_or_else(|| anyhow!("async flush not enabled (config.async_flush_enabled=false)"))?
3750 .clone();
3751 // 1. Acquire permit FIRST (outside flush_lock) so we don't
3752 // introduce a permit-while-holding-flush-lock convoy.
3753 let permit = coord.acquire_permit().await?;
3754 // 2. Rotate under flush_lock (µs work), then allocate the rotate seq
3755 // and bump pending ONLY after the rotate succeeds (Bug #3). A failed
3756 // rotate (the `?` below) must consume neither: the finalizer
3757 // advances in strictly consecutive seq order and only decrements
3758 // pending on finalize, so a leaked seq/pending would wedge it
3759 // forever. The seq is allocated under `flush_lock`, immediately
3760 // after the rotate and before the guard drops, so concurrent rotates
3761 // keep seq order == rotation order, and the seq is unused until
3762 // submit. On the `?` error path the permit drops, freeing the slot.
3763 let (
3764 RotateOutput {
3765 old_l0_arc,
3766 wal_lsn,
3767 current_version,
3768 flush_in_progress_guard,
3769 },
3770 seq,
3771 ) = {
3772 let _flush_lock_guard = self.flush_lock.lock().await;
3773 let rotate_out = self.flush_l0_rotate().await?;
3774 let seq = coord.next_rotate_seq();
3775 coord.note_pending();
3776 (rotate_out, seq)
3777 };
3778 // 3. Build the coordinator's RotatedFlush. parent_manifest is the
3779 // cached_manifest snapshot at this moment.
3780 let parent_manifest = self.cached_manifest.lock().clone();
3781 let rotated = RotatedFlush {
3782 seq,
3783 old_l0_arc: old_l0_arc.clone(),
3784 wal_lsn,
3785 current_version,
3786 name: name.clone(),
3787 parent_manifest,
3788 permit,
3789 flush_in_progress_guard,
3790 };
3791 // 4. Spawn the stream phase via the coordinator. The closure
3792 // captures Arc<Writer> transiently — drops when stream
3793 // completes (bounded, ~50-500 ms).
3794 let writer = self.clone();
3795 let ticket = coord.submit_for_stream(rotated, move |old_l0, wal, ver, n| async move {
3796 let outcome = writer.flush_stream_l1(old_l0, wal, ver, n).await?;
3797 Ok(crate::runtime::flush_coordinator::FlushOutcome {
3798 new_manifest: outcome.manifest,
3799 snapshot_id: outcome.snapshot_id,
3800 })
3801 });
3802 Ok(ticket)
3803 }
3804
3805 /// Phase A+B+C of the flush: flush the WAL, rotate L0 (so the
3806 /// to-be-flushed buffer moves to `pending_flush` and a fresh L0 takes
3807 /// its place), and hand off the WAL to the new L0.
3808 ///
3809 /// Runs in microseconds. Must be called under `flush_lock` (the caller
3810 /// is responsible). The returned [`RotateOutput`] carries everything
3811 /// the subsequent stream + finalize phases need; in particular the
3812 /// [`FlushInProgressGuard`] is bound to the return value so it stays
3813 /// alive for the full flush lifetime — including any future async
3814 /// path where stream runs on a spawned task.
3815 async fn flush_l0_rotate(&self) -> Result<RotateOutput> {
3816 // Acquire the in-progress counter BEFORE any heavy work. The
3817 // guard lives on RotateOutput; dropping RotateOutput drops the
3818 // guard, so the counter goes back to zero exactly when the flush
3819 // is fully done.
3820 let flush_in_progress_guard = FlushInProgressGuard::new(&self.storage);
3821
3822 // A. Flush WAL BEFORE rotating L0. If WAL flush fails, the
3823 // current L0 is still active and mutations are retained in
3824 // memory until restart/retry.
3825 let wal_for_truncate = {
3826 let current_l0 = self.l0_manager.get_current();
3827 let l0_guard = current_l0.read();
3828 l0_guard.wal.clone()
3829 };
3830 // Test-only seam (no-op without the `failpoints` feature): inject a
3831 // WAL-flush failure here to drive the "failed async rotate wedges the
3832 // finalizer" regression (Bug #3). When configured to "return" it makes
3833 // `flush_l0_rotate` return Err exactly as a real WAL-flush failure would.
3834 fail::fail_point!("flush::rotate-fail", |_| {
3835 Err(anyhow!("flush::rotate-fail injected WAL-flush failure"))
3836 });
3837 let wal_lsn = if let Some(ref w) = wal_for_truncate {
3838 w.flush().await?
3839 } else {
3840 0
3841 };
3842
3843 // B. Begin flush: rotate L0 and keep old L0 visible to reads via
3844 // pending_flush until complete_flush is called by finalize.
3845 let old_l0_arc = self.l0_manager.begin_flush(0, None);
3846 metrics::counter!("uni_l0_buffer_rotations_total").increment(1);
3847
3848 // C. WAL handoff: record wal_lsn on old L0, transfer WAL handle
3849 // and current_version to the new L0.
3850 let current_version;
3851 {
3852 let mut old_l0_guard = old_l0_arc.write();
3853 current_version = old_l0_guard.current_version;
3854 old_l0_guard.wal_lsn_at_flush = wal_lsn;
3855 let wal = old_l0_guard.wal.take();
3856 let new_l0_arc = self.l0_manager.get_current();
3857 let mut new_l0_guard = new_l0_arc.write();
3858 new_l0_guard.wal = wal;
3859 new_l0_guard.current_version = current_version;
3860 }
3861
3862 Ok(RotateOutput {
3863 old_l0_arc,
3864 wal_lsn,
3865 current_version,
3866 flush_in_progress_guard,
3867 })
3868 }
3869
3870 /// Phases D, E, F, G of the flush: L1 collect, orphan resolve,
3871 /// manifest seed, Lance writes. Reads from `old_l0_arc` (kept in
3872 /// pending_flush by Phase B); writes append-only Lance datasets; does
3873 /// NOT call save_snapshot / set_latest_snapshot — those are
3874 /// finalize's job, so the manifest doesn't get published until the
3875 /// next phase.
3876 ///
3877 /// Today takes `&self`; in a follow-up commit this becomes a
3878 /// static `Send + 'static` function over `SharedFlushCtx` so it can
3879 /// run on a spawned task while concurrent commits proceed.
3880 async fn flush_stream_l1(
3881 &self,
3882 old_l0_arc: Arc<RwLock<L0Buffer>>,
3883 wal_lsn: u64,
3884 current_version: u64,
3885 name: Option<String>,
3886 ) -> Result<FlushOutcome> {
3887 // Test-only seam (no-op without the `failpoints` feature): the rotate
3888 // (begin_flush) already moved the to-be-flushed buffer onto
3889 // pending_flush and installed a fresh empty current buffer, but the
3890 // rotated rows are NOT yet durable in Lance. Pausing here holds that
3891 // window open to drive the unique-constraint-hole regression
3892 // (Bug #9 Mechanism A).
3893 fail::fail_point!("flush::after-rotate-before-lance");
3894
3895 // Phase B: materialize any deferred embeddings before column
3896 // extraction. No-op when `defer_embeddings` is off (the set will
3897 // be empty). On-demand reads of the embedding column are a TODO
3898 // for a future revision (see UniConfig::defer_embeddings docs).
3899 self.drain_pending_embeddings(&old_l0_arc).await?;
3900
3901 // Materialise MUVERA FDE columns from each row's source multi-vector (pure/sync;
3902 // no-op without a MUVERA index). Runs after embeddings so a row can be both
3903 // auto-embedded and FDE-encoded.
3904 self.materialize_fde_columns(&old_l0_arc)?;
3905
3906 let schema = self.schema_manager.schema();
3907 // 2. Acquire Read lock on Old L0 for flushing
3908 let mut entries_by_type: HashMap<u32, Vec<L1Entry>> = HashMap::new();
3909 // (Vid, labels, properties, deleted, version)
3910 type VertexEntry = (Vid, Vec<String>, Properties, bool, u64);
3911 let mut vertices_by_label: HashMap<u16, Vec<VertexEntry>> = HashMap::new();
3912 // Partial-column updates (Lance MergeInsert path). Per-VID tuple:
3913 // (vid, full L0 properties map, version, set of keys to update).
3914 // Only the keys in the HashSet are emitted to the partial source;
3915 // the full props map is retained so the per-row column extractor
3916 // can read each touched key's value.
3917 type PartialEntry = (Vid, Properties, u64, std::collections::HashSet<String>);
3918 let mut partial_by_label: HashMap<u16, Vec<PartialEntry>> = HashMap::new();
3919 // DELETE-via-MergeInsert (Round-12 §B): tombstones flush as a
3920 // partial source with just `_vid`, `_deleted=true`, `_version`,
3921 // `_updated_at`. Skips the wide-row Append payload that adds
3922 // nothing on a soft-delete.
3923 let mut tombstones_by_label: HashMap<u16, Vec<(Vid, u64)>> = HashMap::new();
3924 let mut main_vertex_tombstones: Vec<(Vid, u64)> = Vec::new();
3925 // Collect vertex timestamps from L0 for flushing to storage
3926 let mut vertex_created_at: HashMap<Vid, i64> = HashMap::new();
3927 let mut vertex_updated_at: HashMap<Vid, i64> = HashMap::new();
3928 // Track tombstones missing labels for storage query fallback
3929 let mut orphaned_tombstones: Vec<(Vid, u64)> = Vec::new();
3930
3931 {
3932 let old_l0 = old_l0_arc.read();
3933
3934 // 1. Collect all edges and tombstones from L0
3935 for edge in old_l0.graph.edges() {
3936 let properties = old_l0
3937 .edge_properties
3938 .get(&edge.eid)
3939 .cloned()
3940 .unwrap_or_default();
3941 let version = old_l0.edge_versions.get(&edge.eid).copied().unwrap_or(0);
3942
3943 // Get timestamps from L0 buffer (populated during insert)
3944 let created_at = old_l0.edge_created_at.get(&edge.eid).copied();
3945 let updated_at = old_l0.edge_updated_at.get(&edge.eid).copied();
3946
3947 entries_by_type
3948 .entry(edge.edge_type)
3949 .or_default()
3950 .push(L1Entry {
3951 src_vid: edge.src_vid,
3952 dst_vid: edge.dst_vid,
3953 eid: edge.eid,
3954 op: Op::Insert,
3955 version,
3956 properties,
3957 created_at,
3958 updated_at,
3959 });
3960 }
3961
3962 // From tombstones
3963 for tombstone in old_l0.tombstones.values() {
3964 let version = old_l0
3965 .edge_versions
3966 .get(&tombstone.eid)
3967 .copied()
3968 .unwrap_or(0);
3969 // Get timestamps - for deletes, updated_at reflects deletion time
3970 let created_at = old_l0.edge_created_at.get(&tombstone.eid).copied();
3971 let updated_at = old_l0.edge_updated_at.get(&tombstone.eid).copied();
3972
3973 entries_by_type
3974 .entry(tombstone.edge_type)
3975 .or_default()
3976 .push(L1Entry {
3977 src_vid: tombstone.src_vid,
3978 dst_vid: tombstone.dst_vid,
3979 eid: tombstone.eid,
3980 op: Op::Delete,
3981 version,
3982 properties: HashMap::new(),
3983 created_at,
3984 updated_at,
3985 });
3986 }
3987
3988 // 1b. Collect vertices by label (using vertex_labels from L0)
3989 //
3990 // Helper: fan-out a single vertex entry into per-label buckets.
3991 // Each per-label table row carries the full label set so multi-label
3992 // info is preserved after flush.
3993 let push_vertex_to_labels =
3994 |vid: Vid,
3995 all_labels: &[String],
3996 props: Properties,
3997 deleted: bool,
3998 version: u64,
3999 out: &mut HashMap<u16, Vec<VertexEntry>>| {
4000 for label in all_labels {
4001 if let Some(label_id) = schema.label_id_by_name(label) {
4002 out.entry(label_id).or_default().push((
4003 vid,
4004 all_labels.to_vec(),
4005 props.clone(),
4006 deleted,
4007 version,
4008 ));
4009 }
4010 }
4011 };
4012
4013 for (vid, props) in &old_l0.vertex_properties {
4014 let version = old_l0.vertex_versions.get(vid).copied().unwrap_or(0);
4015 // Collect timestamps for this vertex
4016 if let Some(&ts) = old_l0.vertex_created_at.get(vid) {
4017 vertex_created_at.insert(*vid, ts);
4018 }
4019 if let Some(&ts) = old_l0.vertex_updated_at.get(vid) {
4020 vertex_updated_at.insert(*vid, ts);
4021 }
4022 if let Some(labels) = old_l0.vertex_labels.get(vid) {
4023 // Partial-write routing: when this VID was last
4024 // touched via `insert_vertex_partial` AND the
4025 // partial_lance_writes flag is on, send only the
4026 // touched columns to a MergeInsert batch. Otherwise
4027 // (CREATE, MERGE-ON-CREATE, full-replace SET, DELETE
4028 // — or flag off) use the existing full-row Append.
4029 let is_partial = self.config.partial_lance_writes
4030 && old_l0.vertex_partial_keys.contains_key(vid);
4031 if is_partial {
4032 if let Some(touched) = old_l0.vertex_partial_keys.get(vid) {
4033 for label in labels {
4034 if let Some(label_id) = schema.label_id_by_name(label) {
4035 partial_by_label.entry(label_id).or_default().push((
4036 *vid,
4037 props.clone(),
4038 version,
4039 touched.clone(),
4040 ));
4041 }
4042 }
4043 }
4044 } else {
4045 push_vertex_to_labels(
4046 *vid,
4047 labels,
4048 props.clone(),
4049 false,
4050 version,
4051 &mut vertices_by_label,
4052 );
4053 }
4054 }
4055 }
4056 for &vid in &old_l0.vertex_tombstones {
4057 let version = old_l0.vertex_versions.get(&vid).copied().unwrap_or(0);
4058 if let Some(&ts) = old_l0.vertex_updated_at.get(&vid) {
4059 vertex_updated_at.insert(vid, ts);
4060 }
4061 if let Some(labels) = old_l0.vertex_labels.get(&vid) {
4062 // Round-12 §B: tombstones flush via Lance MergeInsert
4063 // (just `_vid`, `_deleted=true`, `_version`,
4064 // `_updated_at`) — skipping the wide-row Append.
4065 // Unconditional (no `partial_lance_writes` gating);
4066 // tombstone Append carries no useful payload.
4067 for label in labels {
4068 if let Some(label_id) = schema.label_id_by_name(label) {
4069 tombstones_by_label
4070 .entry(label_id)
4071 .or_default()
4072 .push((vid, version));
4073 }
4074 }
4075 } else {
4076 // Tombstone missing labels (old WAL format) - collect for storage query fallback
4077 orphaned_tombstones.push((vid, version));
4078 }
4079 }
4080 } // Drop read lock
4081
4082 // Resolve orphaned tombstones (missing labels) from storage
4083 if !orphaned_tombstones.is_empty() {
4084 tracing::warn!(
4085 count = orphaned_tombstones.len(),
4086 "Tombstones missing labels in L0, querying storage as fallback"
4087 );
4088 for (vid, version) in orphaned_tombstones {
4089 if let Ok(Some(labels)) = self.find_vertex_labels_in_storage(vid).await
4090 && !labels.is_empty()
4091 {
4092 for label in &labels {
4093 if let Some(label_id) = schema.label_id_by_name(label) {
4094 // Round-12 §B: route through partial tombstone too.
4095 tombstones_by_label
4096 .entry(label_id)
4097 .or_default()
4098 .push((vid, version));
4099 }
4100 }
4101 }
4102 }
4103 }
4104
4105 // 1. Load previous snapshot from cache, or fall back to storage.
4106 //
4107 // Use clone() not take(): for the async path, multiple
4108 // concurrent streams may run; if we take() here, a sibling
4109 // stream sees cached_manifest = None and seeds from
4110 // load_latest_snapshot (stale), losing the chain. clone()
4111 // preserves the parent. Finalize writes back the new manifest
4112 // unconditionally.
4113 let mut manifest = if let Some(cached) = self.cached_manifest.lock().clone() {
4114 cached
4115 } else {
4116 self.storage
4117 .snapshot_manager()
4118 .load_latest_snapshot()
4119 .await?
4120 .unwrap_or_else(|| {
4121 SnapshotManifest::new(Uuid::new_v4().to_string(), schema.schema_version)
4122 })
4123 };
4124
4125 // Update snapshot metadata
4126 // Save parent snapshot ID before generating new one (for lineage tracking)
4127 let parent_id = manifest.snapshot_id.clone();
4128 manifest.parent_snapshot = Some(parent_id);
4129 manifest.snapshot_id = Uuid::new_v4().to_string();
4130 manifest.name = name;
4131 manifest.created_at = Utc::now();
4132 manifest.version_high_water_mark = current_version;
4133 manifest.wal_high_water_mark = wal_lsn;
4134 let snapshot_id = manifest.snapshot_id.clone();
4135
4136 tracing::Span::current().record("snapshot_id", &snapshot_id);
4137
4138 // 2. Write main unified tables FIRST (before deltas).
4139 // Ensures the dual-write invariant: by the time an EID appears in a
4140 // delta table, it already exists in main_edges. This prevents the
4141 // compaction debug_assert from firing when compaction interleaves
4142 // with flush at async yield points.
4143 //
4144 // 2.1 Main edges table
4145 let (main_edges, edge_created_at_map, edge_updated_at_map) = {
4146 let _old_l0 = old_l0_arc.read();
4147 let mut main_edges: Vec<(
4148 uni_common::core::id::Eid,
4149 Vid,
4150 Vid,
4151 String,
4152 Properties,
4153 bool,
4154 u64,
4155 )> = Vec::new();
4156 let mut edge_created_at_map: HashMap<uni_common::core::id::Eid, i64> = HashMap::new();
4157 let mut edge_updated_at_map: HashMap<uni_common::core::id::Eid, i64> = HashMap::new();
4158
4159 for (&edge_type_id, entries) in entries_by_type.iter() {
4160 for entry in entries {
4161 let edge_type_name = self
4162 .storage
4163 .schema_manager()
4164 .edge_type_name_by_id_unified(edge_type_id)
4165 .unwrap_or_else(|| "unknown".to_string());
4166
4167 let deleted = matches!(entry.op, Op::Delete);
4168 main_edges.push((
4169 entry.eid,
4170 entry.src_vid,
4171 entry.dst_vid,
4172 edge_type_name,
4173 entry.properties.clone(),
4174 deleted,
4175 entry.version,
4176 ));
4177
4178 if let Some(ts) = entry.created_at {
4179 edge_created_at_map.insert(entry.eid, ts);
4180 }
4181 if let Some(ts) = entry.updated_at {
4182 edge_updated_at_map.insert(entry.eid, ts);
4183 }
4184 }
4185 }
4186
4187 (main_edges, edge_created_at_map, edge_updated_at_map)
4188 };
4189
4190 if !main_edges.is_empty() {
4191 let main_edge_batch = MainEdgeDataset::build_record_batch(
4192 &main_edges,
4193 Some(&edge_created_at_map),
4194 Some(&edge_updated_at_map),
4195 )?;
4196 MainEdgeDataset::write_batch(self.storage.backend(), main_edge_batch).await?;
4197 MainEdgeDataset::ensure_default_indexes(self.storage.backend()).await?;
4198 }
4199
4200 // 2.2 Main vertices table
4201 let mut main_vertices: Vec<(Vid, Vec<String>, Properties, bool, u64)> = {
4202 let old_l0 = old_l0_arc.read();
4203 let mut vertices = Vec::new();
4204
4205 // Live vertices: full-row Append on the main table (the
4206 // props_json blob is required for global ID lookups). For
4207 // partial-row VIDs (vertex_partial_keys non-empty), the
4208 // main table still needs the full props for the
4209 // ext_id-uniqueness path; we keep the Append here. The
4210 // per-label Lance write IS partial via MergeInsert.
4211 for (vid, props) in &old_l0.vertex_properties {
4212 let version = old_l0.vertex_versions.get(vid).copied().unwrap_or(0);
4213 let labels = old_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
4214 vertices.push((*vid, labels, props.clone(), false, version));
4215 }
4216
4217 // Tombstones: collected into `main_vertex_tombstones` for
4218 // the MergeInsert path below; skipping the wide-row Append.
4219 for &vid in &old_l0.vertex_tombstones {
4220 let version = old_l0.vertex_versions.get(&vid).copied().unwrap_or(0);
4221 main_vertex_tombstones.push((vid, version));
4222 }
4223
4224 vertices
4225 };
4226
4227 // M8: durable label-only mutations across flush windows.
4228 //
4229 // `SET n:Label` / `REMOVE n:Label` mark the vid in
4230 // `vertex_label_overwrites` and update `vertex_labels`, but for a
4231 // vid flushed in a PRIOR window they never re-add it to
4232 // `vertex_properties`. The loops above key off `vertex_properties`,
4233 // so such a relabel would be silently lost: absent from the main
4234 // table, the per-label datasets, and the VidLabelsIndex (and so
4235 // `rebuild_vid_labels_index` reads stale labels after a restart).
4236 // The same-window create+relabel case already works because the
4237 // create put the vid in `vertex_properties`.
4238 //
4239 // Re-derive each overwrite-only vid by fetching its persisted props
4240 // and labels, then route it into `main_vertices` (main table +
4241 // index), the new per-label datasets, and a tombstone in any
4242 // per-label dataset it left. `MATCH (n:OldLabel)` scans the
4243 // per-label table directly, so the old-label tombstone is required.
4244 let overwrite_only: Vec<(Vid, Vec<String>, u64)> = {
4245 let old_l0 = old_l0_arc.read();
4246 old_l0
4247 .vertex_label_overwrites
4248 .iter()
4249 .filter(|vid| {
4250 !old_l0.vertex_properties.contains_key(*vid)
4251 && !old_l0.vertex_tombstones.contains(*vid)
4252 })
4253 .map(|vid| {
4254 let labels = old_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
4255 let version = old_l0.vertex_versions.get(vid).copied().unwrap_or(0);
4256 (*vid, labels, version)
4257 })
4258 .collect()
4259 };
4260 for (vid, new_labels, version) in overwrite_only {
4261 // Persisted props of the prior-window row — required so the
4262 // re-Appended main row does not blank the vertex's properties.
4263 let Some(props) = MainVertexDataset::find_props_by_vid(
4264 self.storage.backend(),
4265 vid,
4266 self.storage.version_high_water_mark(),
4267 )
4268 .await?
4269 else {
4270 tracing::warn!(
4271 vid = vid.as_u64(),
4272 "label-only mutation for a vid with no persisted main row; skipping flush \
4273 of its relabel"
4274 );
4275 continue;
4276 };
4277 // Labels the vid carried BEFORE this relabel; the storage read
4278 // reflects pre-flush state. Any label no longer present must be
4279 // tombstoned in its per-label dataset.
4280 let old_labels = self
4281 .find_vertex_labels_in_storage(vid)
4282 .await?
4283 .unwrap_or_default();
4284
4285 main_vertices.push((vid, new_labels.clone(), props.clone(), false, version));
4286 for label in &new_labels {
4287 if let Some(label_id) = schema.label_id_by_name(label) {
4288 vertices_by_label.entry(label_id).or_default().push((
4289 vid,
4290 new_labels.clone(),
4291 props.clone(),
4292 false,
4293 version,
4294 ));
4295 }
4296 }
4297 for label in &old_labels {
4298 if !new_labels.contains(label)
4299 && let Some(label_id) = schema.label_id_by_name(label)
4300 {
4301 tombstones_by_label
4302 .entry(label_id)
4303 .or_default()
4304 .push((vid, version));
4305 }
4306 }
4307 }
4308
4309 if !main_vertices.is_empty() {
4310 let main_vertex_batch = MainVertexDataset::build_record_batch(
4311 &main_vertices,
4312 Some(&vertex_created_at),
4313 Some(&vertex_updated_at),
4314 )?;
4315 MainVertexDataset::write_batch(self.storage.backend(), main_vertex_batch).await?;
4316 }
4317 // Round-12 §B: tombstones via MergeInsert on the main vertices
4318 // table. Independent of `vertex_properties` length.
4319 if !main_vertex_tombstones.is_empty() {
4320 let tomb_batch = MainVertexDataset::build_tombstone_partial_batch(
4321 &main_vertex_tombstones,
4322 Some(&vertex_updated_at),
4323 )?;
4324 MainVertexDataset::merge_insert_tombstone_batch(self.storage.backend(), tomb_batch)
4325 .await?;
4326 }
4327 if !main_vertices.is_empty() || !main_vertex_tombstones.is_empty() {
4328 MainVertexDataset::ensure_default_indexes(self.storage.backend()).await?;
4329 }
4330
4331 // Keep the VidLabelsIndex current for every flushed vertex. This is the
4332 // single place that sees all vertices: the per-label fan-out below skips
4333 // undeclared (schemaless) labels, so updating the index there would miss
4334 // them. Traversal-time label predicates read this index to resolve
4335 // labels for vertices that live only in Lance — notably on a fork, whose
4336 // data is flushed to Lance before branching. (GitHub #99)
4337 for (vid, labels, _props, _deleted, _version) in &main_vertices {
4338 self.storage.update_vid_labels_index(*vid, labels.clone());
4339 }
4340 for (vid, _version) in &main_vertex_tombstones {
4341 self.storage.remove_from_vid_labels_index(*vid);
4342 }
4343
4344 // 3. For each edge type, write FWD and BWD delta runs
4345 for (&edge_type_id, entries) in entries_by_type.iter() {
4346 // Get edge type name from unified lookup (handles both schema'd and schemaless)
4347 let edge_type_name = self
4348 .storage
4349 .schema_manager()
4350 .edge_type_name_by_id_unified(edge_type_id)
4351 .ok_or_else(|| anyhow!("Edge type ID {} not found", edge_type_id))?;
4352
4353 // FWD Run (sorted by src_vid)
4354 // Round-12 §A: split entries into full-row Append and
4355 // partial MergeInsert routes based on `edge_partial_keys`.
4356 // Edges in `edge_partial_keys` were last written via
4357 // `insert_edge_partial_full`; the per-edge-type delta
4358 // tables receive only the touched schema columns plus
4359 // (when any overflow key was touched) the regenerated
4360 // `overflow_json` blob. Untouched columns retain their
4361 // previous-version value via Lance MergeInsert.
4362 let partial_eids: std::collections::HashSet<Eid> = {
4363 let old_l0 = old_l0_arc.read();
4364 entries
4365 .iter()
4366 .filter(|e| {
4367 self.config.partial_lance_writes
4368 && old_l0.edge_partial_keys.contains_key(&e.eid)
4369 })
4370 .map(|e| e.eid)
4371 .collect()
4372 };
4373 let touched_union_by_eid: HashMap<Eid, std::collections::HashSet<String>> = {
4374 let old_l0 = old_l0_arc.read();
4375 partial_eids
4376 .iter()
4377 .filter_map(|eid| old_l0.edge_partial_keys.get(eid).map(|s| (*eid, s.clone())))
4378 .collect()
4379 };
4380 let (full_entries, partial_entries): (Vec<L1Entry>, Vec<L1Entry>) = entries
4381 .clone()
4382 .into_iter()
4383 .partition(|e| !partial_eids.contains(&e.eid));
4384
4385 let backend = self.storage.backend();
4386
4387 // FWD run (sorted by src_vid)
4388 let mut fwd_full = full_entries.clone();
4389 fwd_full.sort_by_key(|e| e.src_vid);
4390 let mut fwd_partial = partial_entries.clone();
4391 fwd_partial.sort_by_key(|e| e.src_vid);
4392 let fwd_ds = self.storage.delta_dataset(&edge_type_name, "fwd")?;
4393 if !fwd_full.is_empty() {
4394 let fwd_batch = fwd_ds.build_record_batch(&fwd_full, &schema)?;
4395 fwd_ds.write_run(backend, fwd_batch).await?;
4396 }
4397 if !fwd_partial.is_empty() {
4398 let touched_union: std::collections::HashSet<String> = fwd_partial
4399 .iter()
4400 .flat_map(|e| {
4401 touched_union_by_eid
4402 .get(&e.eid)
4403 .cloned()
4404 .unwrap_or_default()
4405 .into_iter()
4406 })
4407 .collect();
4408 let fwd_partial_batch =
4409 fwd_ds.build_partial_record_batch(&fwd_partial, &touched_union, &schema)?;
4410 fwd_ds
4411 .merge_insert_partial_run(backend, fwd_partial_batch)
4412 .await?;
4413 }
4414 fwd_ds.ensure_eid_index(backend).await?;
4415
4416 // BWD Run (sorted by dst_vid)
4417 let mut bwd_full = full_entries.clone();
4418 bwd_full.sort_by_key(|e| e.dst_vid);
4419 let mut bwd_partial = partial_entries.clone();
4420 bwd_partial.sort_by_key(|e| e.dst_vid);
4421 let bwd_ds = self.storage.delta_dataset(&edge_type_name, "bwd")?;
4422 if !bwd_full.is_empty() {
4423 let bwd_batch = bwd_ds.build_record_batch(&bwd_full, &schema)?;
4424 bwd_ds.write_run(backend, bwd_batch).await?;
4425 }
4426 if !bwd_partial.is_empty() {
4427 let touched_union: std::collections::HashSet<String> = bwd_partial
4428 .iter()
4429 .flat_map(|e| {
4430 touched_union_by_eid
4431 .get(&e.eid)
4432 .cloned()
4433 .unwrap_or_default()
4434 .into_iter()
4435 })
4436 .collect();
4437 let bwd_partial_batch =
4438 bwd_ds.build_partial_record_batch(&bwd_partial, &touched_union, &schema)?;
4439 bwd_ds
4440 .merge_insert_partial_run(backend, bwd_partial_batch)
4441 .await?;
4442 }
4443 bwd_ds.ensure_eid_index(backend).await?;
4444
4445 // Update Manifest
4446 let current_snap =
4447 manifest
4448 .edges
4449 .entry(edge_type_name.to_string())
4450 .or_insert(EdgeSnapshot {
4451 version: 0,
4452 count: 0,
4453 lance_version: 0,
4454 });
4455 current_snap.version += 1;
4456 current_snap.count += entries.len() as u64;
4457 // LanceDB tables don't expose Lance version directly
4458 current_snap.lance_version = 0;
4459
4460 // Note: No CSR invalidation needed. AdjacencyManager's overlay
4461 // already has these edges via dual-write in insert_edge/delete_edge.
4462 }
4463
4464 // 4. Per-label vertex table writes
4465 // Iterate all labels that have either full-row OR partial-write
4466 // data pending. A label may appear in only one of the two maps
4467 // (e.g., all updates on this label were partial-only).
4468 let all_label_ids: std::collections::HashSet<u16> = vertices_by_label
4469 .keys()
4470 .chain(partial_by_label.keys())
4471 .chain(tombstones_by_label.keys())
4472 .copied()
4473 .collect();
4474 for label_id in all_label_ids {
4475 let vertices = vertices_by_label.remove(&label_id).unwrap_or_default();
4476 let label_name = schema
4477 .label_name_by_id(label_id)
4478 .ok_or_else(|| anyhow!("Label ID {} not found", label_id))?;
4479
4480 let ds = self.storage.vertex_dataset(label_name)?;
4481
4482 // Collect inverted index updates before consuming vertices
4483 // Maps: cfg.property -> (added, removed)
4484 type InvertedUpdateMap = HashMap<String, (HashMap<Vid, Vec<String>>, HashSet<Vid>)>;
4485 let mut inverted_updates: InvertedUpdateMap = HashMap::new();
4486
4487 for idx in &schema.indexes {
4488 if let IndexDefinition::Inverted(cfg) = idx
4489 && cfg.label == label_name
4490 {
4491 let mut added: HashMap<Vid, Vec<String>> = HashMap::new();
4492 let mut removed: HashSet<Vid> = HashSet::new();
4493
4494 for (vid, _labels, props, deleted, _version) in &vertices {
4495 if *deleted {
4496 removed.insert(*vid);
4497 } else if let Some(prop_value) = props.get(&cfg.property) {
4498 // Extract terms from the property value (List<String>)
4499 if let Some(arr) = prop_value.as_array() {
4500 let terms: Vec<String> = arr
4501 .iter()
4502 .filter_map(|v| v.as_str().map(ToString::to_string))
4503 .collect();
4504 if !terms.is_empty() {
4505 added.insert(*vid, terms);
4506 }
4507 }
4508 }
4509 }
4510 // Round-12 §B: tombstones no longer in `vertices`;
4511 // pull them from `tombstones_by_label` for inverted
4512 // index removal.
4513 if let Some(tomb_rows) = tombstones_by_label.get(&label_id) {
4514 for (vid, _) in tomb_rows {
4515 removed.insert(*vid);
4516 }
4517 }
4518
4519 if !added.is_empty() || !removed.is_empty() {
4520 inverted_updates.insert(cfg.property.clone(), (added, removed));
4521 }
4522 }
4523 }
4524
4525 let mut v_data = Vec::new();
4526 let mut d_data = Vec::new();
4527 let mut ver_data = Vec::new();
4528 for (vid, labels, props, deleted, version) in vertices {
4529 v_data.push((vid, labels, props));
4530 d_data.push(deleted);
4531 ver_data.push(version);
4532 }
4533
4534 let backend = self.storage.backend();
4535
4536 // Skip the full-row Append entirely if this label only has
4537 // partial-write rows pending.
4538 if !v_data.is_empty() {
4539 let batch = ds.build_record_batch_with_timestamps(
4540 &v_data,
4541 &d_data,
4542 &ver_data,
4543 &schema,
4544 Some(&vertex_created_at),
4545 Some(&vertex_updated_at),
4546 )?;
4547 ds.write_batch(backend, batch, &schema).await?;
4548 }
4549
4550 // Partial-column batch (Lance MergeInsert path). The flag
4551 // gates whether the routing classified any VIDs as partial;
4552 // outside the flag this collection is always empty so the
4553 // call below is a cheap no-op.
4554 if let Some(partial_rows) = partial_by_label.remove(&label_id)
4555 && !partial_rows.is_empty()
4556 {
4557 let touched_union: std::collections::HashSet<String> = partial_rows
4558 .iter()
4559 .flat_map(|(_, _, _, keys)| keys.iter().cloned())
4560 .collect();
4561 let pairs: Vec<(Vid, Properties)> = partial_rows
4562 .iter()
4563 .map(|(vid, props, _, _)| (*vid, props.clone()))
4564 .collect();
4565 let versions: Vec<u64> = partial_rows.iter().map(|(_, _, v, _)| *v).collect();
4566 let partial_batch = ds.build_partial_record_batch(
4567 &pairs,
4568 &versions,
4569 &touched_union,
4570 &schema,
4571 Some(&vertex_updated_at),
4572 )?;
4573 if partial_batch.num_rows() > 0 {
4574 ds.merge_insert_batch(backend, partial_batch).await?;
4575 }
4576 }
4577
4578 // Tombstone batch (Round-12 §B): always MergeInsert with
4579 // just `_vid`, `_deleted=true`, `_version`, `_updated_at`.
4580 // No partial_lance_writes gating — tombstones never carry
4581 // useful property payload to write. Captured tombstone vids
4582 // also drive `remove_from_vid_labels_index` below.
4583 let tombstone_rows = tombstones_by_label.remove(&label_id).unwrap_or_default();
4584 if !tombstone_rows.is_empty() {
4585 let tomb_batch =
4586 ds.build_tombstone_partial_batch(&tombstone_rows, Some(&vertex_updated_at))?;
4587 if tomb_batch.num_rows() > 0 {
4588 ds.merge_insert_batch(backend, tomb_batch).await?;
4589 }
4590 }
4591
4592 ds.ensure_default_indexes(backend).await?;
4593
4594 // VidLabelsIndex maintenance is centralized at the main-vertex
4595 // flush above (it sees both schema'd and schemaless vertices).
4596
4597 // Update Manifest
4598 let current_snap =
4599 manifest
4600 .vertices
4601 .entry(label_name.to_string())
4602 .or_insert(LabelSnapshot {
4603 version: 0,
4604 count: 0,
4605 lance_version: 0,
4606 });
4607 current_snap.version += 1;
4608 current_snap.count += v_data.len() as u64;
4609 // LanceDB tables don't expose Lance version directly
4610 current_snap.lance_version = 0;
4611
4612 // Invalidate table cache to ensure next read picks up new version
4613 self.storage.invalidate_table_cache(label_name);
4614
4615 // Apply inverted index updates incrementally
4616 #[cfg(feature = "lance-backend")]
4617 for idx in &schema.indexes {
4618 if let IndexDefinition::Inverted(cfg) = idx
4619 && cfg.label == label_name
4620 && let Some((added, removed)) = inverted_updates.get(&cfg.property)
4621 {
4622 self.storage
4623 .index_manager()
4624 .update_inverted_index_incremental(cfg, added, removed)
4625 .await?;
4626 }
4627 }
4628
4629 // Update UID index with new vertex mappings
4630 // Collect (UniId, Vid) mappings from non-deleted vertices
4631 #[cfg(feature = "lance-backend")]
4632 {
4633 let mut uid_mappings: Vec<(uni_common::core::id::UniId, Vid)> = Vec::new();
4634 for (vid, _labels, props) in &v_data {
4635 let ext_id = props.get("ext_id").and_then(|v| v.as_str());
4636 let uid = crate::storage::vertex::VertexDataset::compute_vertex_uid(
4637 label_name, ext_id, props,
4638 );
4639 uid_mappings.push((uid, *vid));
4640 }
4641
4642 if !uid_mappings.is_empty()
4643 && let Ok(uid_index) = self.storage.uid_index(label_name)
4644 {
4645 // Stamp mappings with this flush's MVCC version so a later
4646 // re-create of the same UID deterministically outranks the
4647 // stale mapping (review C3).
4648 uid_index
4649 .write_mapping_versioned(&uid_mappings, current_version)
4650 .await?;
4651 }
4652 }
4653 }
4654 Ok(FlushOutcome {
4655 manifest,
4656 snapshot_id,
4657 })
4658 }
4659
4660 /// Composition entry that assumes the caller already holds `flush_lock`.
4661 /// Runs rotate + stream + finalize_locked in sequence. Used by
4662 /// [`Writer::flush_to_l1`] (acquires the lock first) and by
4663 /// `commit_transaction_l0`'s post-merge auto-flush branch (which already
4664 /// holds the lock from the commit critical section).
4665 #[instrument(
4666 skip(self),
4667 fields(snapshot_id, mutations_count, size_bytes),
4668 level = "info"
4669 )]
4670 async fn flush_inline_under_lock(&self, name: Option<String>) -> Result<String> {
4671 let start = std::time::Instant::now();
4672
4673 let (initial_size, initial_count) = {
4674 let l0_arc = self.l0_manager.get_current();
4675 let l0 = l0_arc.read();
4676 (l0.estimated_size, l0.mutation_count)
4677 };
4678 tracing::Span::current().record("size_bytes", initial_size);
4679 tracing::Span::current().record("mutations_count", initial_count);
4680
4681 debug!("Starting L0 flush to L1");
4682
4683 // Phases A (WAL pre-flush), B (rotate), C (WAL handoff).
4684 // FlushInProgressGuard lives on RotateOutput and stays alive for
4685 // the full flush — including the finalize_locked call below.
4686 let RotateOutput {
4687 old_l0_arc,
4688 wal_lsn,
4689 current_version,
4690 flush_in_progress_guard: _flush_guard,
4691 } = self.flush_l0_rotate().await?;
4692
4693 // Phases D (L1 collect), E (orphan resolve), F (manifest seed),
4694 // G (Lance writes). Builds the manifest but does NOT publish it.
4695 let FlushOutcome {
4696 manifest,
4697 snapshot_id,
4698 } = self
4699 .flush_stream_l1(old_l0_arc.clone(), wal_lsn, current_version, name)
4700 .await?;
4701
4702 // Phases H..S: publish manifest, complete_flush, WAL truncate,
4703 // property cache clear, last_flush_time, metrics, l1_runs++,
4704 // compaction trigger, index-rebuild scheduling, fork tick.
4705 self.flush_finalize_locked(
4706 old_l0_arc,
4707 wal_lsn,
4708 manifest,
4709 snapshot_id,
4710 initial_size,
4711 initial_count,
4712 start,
4713 )
4714 .await
4715 }
4716
4717 /// Phases H..S of the flush: publish the manifest and run all
4718 /// post-publish bookkeeping. Assumes the caller already holds
4719 /// `flush_lock` — see [`Writer::flush_finalize_now`] for the
4720 /// lock-acquiring variant used by the async finalize path.
4721 #[allow(clippy::too_many_arguments)]
4722 async fn flush_finalize_locked(
4723 &self,
4724 old_l0_arc: Arc<RwLock<L0Buffer>>,
4725 wal_lsn: u64,
4726 manifest: SnapshotManifest,
4727 snapshot_id: String,
4728 initial_size: usize,
4729 initial_count: usize,
4730 start: std::time::Instant,
4731 ) -> Result<String> {
4732 Self::flush_finalize_body(
4733 &self.shared_ctx(),
4734 old_l0_arc,
4735 wal_lsn,
4736 manifest,
4737 snapshot_id,
4738 initial_size,
4739 initial_count,
4740 start,
4741 )
4742 .await
4743 }
4744
4745 /// Phases H..S of the flush, lock-acquiring variant. Used by the
4746 /// async-flush finalizer task (running on a spawned tokio task),
4747 /// which holds neither `&self` nor `flush_lock`. Briefly re-acquires
4748 /// `flush_lock` to serialize the publish boundary, then runs the
4749 /// same body as `flush_finalize_locked` but over a SharedFlushCtx.
4750 #[allow(clippy::too_many_arguments)]
4751 pub(crate) async fn flush_finalize_now(
4752 shared: SharedFlushCtx,
4753 old_l0_arc: Arc<RwLock<L0Buffer>>,
4754 wal_lsn: u64,
4755 manifest: SnapshotManifest,
4756 snapshot_id: String,
4757 initial_size: usize,
4758 initial_count: usize,
4759 start: std::time::Instant,
4760 ) -> Result<String> {
4761 let _flush_lock_guard = shared.flush_lock.clone().lock_owned().await;
4762 Self::flush_finalize_body(
4763 &shared,
4764 old_l0_arc,
4765 wal_lsn,
4766 manifest,
4767 snapshot_id,
4768 initial_size,
4769 initial_count,
4770 start,
4771 )
4772 .await
4773 }
4774
4775 /// Shared body of `flush_finalize_locked` and `flush_finalize_now`.
4776 /// Static over `SharedFlushCtx`; the caller is responsible for
4777 /// holding `flush_lock`.
4778 #[allow(clippy::too_many_arguments)]
4779 async fn flush_finalize_body(
4780 shared: &SharedFlushCtx,
4781 old_l0_arc: Arc<RwLock<L0Buffer>>,
4782 wal_lsn: u64,
4783 mut manifest: SnapshotManifest,
4784 snapshot_id: String,
4785 initial_size: usize,
4786 initial_count: usize,
4787 start: std::time::Instant,
4788 ) -> Result<String> {
4789 // Parent-snapshot fixup. The stream phase built `manifest` with
4790 // parent_snapshot set from cached_manifest at stream time. If
4791 // OTHER flushes (sync or async) have finalized since then,
4792 // cached_manifest has advanced. Re-link this manifest to the
4793 // current cached chain so we don't orphan their data when we
4794 // overwrite cached_manifest below.
4795 let current_parent_id = shared
4796 .cached_manifest
4797 .lock()
4798 .as_ref()
4799 .map(|m| m.snapshot_id.clone());
4800 if current_parent_id.is_some() && manifest.parent_snapshot != current_parent_id {
4801 manifest.parent_snapshot = current_parent_id;
4802 metrics::counter!("uni_flush_parent_chain_fixups_total").increment(1);
4803 }
4804
4805 // H. Publish manifest (body first, then pointer — recovery is
4806 // idempotent if we crash between the two).
4807 // A fork writer must publish to a fork-scoped namespace, never the
4808 // global `catalog/latest` that the primary reopen reads (review C1).
4809 debug_assert_eq!(
4810 shared.fork_id.is_some(),
4811 shared.storage.snapshot_manager().is_fork_scoped(),
4812 "fork writer must publish to a fork-scoped snapshot namespace (review C1)"
4813 );
4814 shared
4815 .storage
4816 .snapshot_manager()
4817 .save_snapshot(&manifest)
4818 .await?;
4819 shared
4820 .storage
4821 .snapshot_manager()
4822 .set_latest_snapshot(&manifest.snapshot_id)
4823 .await?;
4824
4825 // H2. Durability barrier (review C4). `save_snapshot` / `set_latest_snapshot`
4826 // wrote the manifest body and the `catalog/latest` pointer through the
4827 // object store, which does NOT fsync. WAL truncation (K, below) removes
4828 // the only other durable copy of this flush's data, so a crash after K
4829 // but before the OS flushed those writes would lose the snapshot —
4830 // recovery could not resolve `latest`. Make them durable now (local-fs
4831 // only; remote stores provide their own durability on `put`).
4832 crate::snapshot::manager::fsync_snapshot_pointer(
4833 shared.storage.local_fs_root().as_deref(),
4834 shared.fork_id.as_ref(),
4835 &manifest.snapshot_id,
4836 )
4837 .map_err(|e| {
4838 anyhow!(
4839 "fsync snapshot {} before WAL truncate: {}",
4840 manifest.snapshot_id,
4841 e
4842 )
4843 })?;
4844
4845 // I. Cache manifest for next flush to avoid re-reading from object store.
4846 *shared.cached_manifest.lock() = Some(manifest.clone());
4847
4848 // L. Invalidate the property cache BEFORE removing the flushed buffer
4849 // from the L0 chain (Bug #10). `clear_cache` has no dependency on the
4850 // complete_flush (J) / WAL-truncate (K) steps below, so clearing it
4851 // first closes the non-monotonic-read window: once the buffer leaves
4852 // the L0 chain at J a freshly-written value would otherwise miss the
4853 // chain and fall through to a stale cache entry. By the time finalize
4854 // runs the streamed rows are already durable in L1, so a post-clear
4855 // read falls through to fresh storage instead. The finalizer holds
4856 // `flush_lock` throughout, so reordering L ahead of J is safe.
4857 if let Some(ref pm) = shared.property_manager {
4858 pm.clear_cache().await;
4859 }
4860
4861 // J. Complete flush: remove old L0 from pending_flush. MUST happen
4862 // BEFORE WAL truncation so min_pending_wal_lsn is accurate.
4863 shared.l0_manager.complete_flush(&old_l0_arc);
4864
4865 // Test-only seam (no-op without the `failpoints` feature): pause AFTER
4866 // complete_flush removed the buffer from the L0 chain (J) but BEFORE
4867 // WAL truncation (K). The property cache is already cleared (L moved
4868 // ahead of J above), so a read in this window falls through to fresh
4869 // L1 storage rather than a stale cache entry (Bug #10 — non-monotonic
4870 // read after flush finalize).
4871 fail::fail_point!("flush::after-complete-before-cache-clear");
4872
4873 // K. Truncate WAL up to the safe LSN.
4874 let wal_handle = shared.l0_manager.get_current().read().wal.clone();
4875 if let Some(w) = wal_handle {
4876 let safe_lsn = shared
4877 .l0_manager
4878 .min_pending_wal_lsn()
4879 .map(|min_pending| min_pending.min(wal_lsn))
4880 .unwrap_or(wal_lsn);
4881 w.truncate_before(safe_lsn).await?;
4882 }
4883
4884 // M. Reset last flush time for time-based auto-flush.
4885 *shared.last_flush_time.lock() = std::time::Instant::now();
4886
4887 info!(
4888 snapshot_id,
4889 mutations_count = initial_count,
4890 size_bytes = initial_size,
4891 "L0 flush to L1 completed successfully"
4892 );
4893 metrics::histogram!("uni_flush_duration_seconds").record(start.elapsed().as_secs_f64());
4894 metrics::counter!("uni_flush_bytes_total").increment(initial_size as u64);
4895 metrics::counter!("uni_flush_rows_total").increment(initial_count as u64);
4896
4897 // P. Increment flush generation counter for write throttling.
4898 {
4899 let mut status = uni_common::sync::acquire_mutex(
4900 &shared.storage.compaction_status,
4901 "compaction_status",
4902 )?;
4903 status.l1_runs += 1;
4904 }
4905
4906 // Q. Trigger CSR compaction if enough frozen segments have accumulated.
4907 let am = shared.adjacency_manager.clone();
4908 if am.should_compact(shared.compaction_config.frozen_segments_compact_threshold) {
4909 let previous_still_running = {
4910 let guard = shared.compaction_handle.read();
4911 guard.as_ref().is_some_and(|h| !h.is_finished())
4912 };
4913 if previous_still_running {
4914 info!("Skipping compaction: previous compaction still in progress");
4915 } else {
4916 let handle = tokio::spawn(async move {
4917 am.compact();
4918 });
4919 *shared.compaction_handle.write() = Some(handle);
4920 }
4921 }
4922
4923 // R. Post-flush: check if any indexes need rebuilding based on thresholds.
4924 if shared.auto_rebuild_enabled
4925 && let Some(rebuild_mgr) = shared.index_rebuild_manager.get()
4926 {
4927 Self::schedule_index_rebuilds_if_needed_static(
4928 &manifest,
4929 rebuild_mgr.clone(),
4930 shared.schema_manager.clone(),
4931 shared.index_rebuild_config.clone(),
4932 );
4933 }
4934
4935 // S. Emit fork-fragment observability after a successful forked flush.
4936 Self::tick_fork_fragment_observability_static(
4937 shared.fork_id,
4938 shared.fork_flush_count.clone(),
4939 shared.fork_fragment_warn_fired.clone(),
4940 shared.fork_fragment_warn_threshold,
4941 );
4942
4943 Ok(snapshot_id)
4944 }
4945
4946 /// Increment fork-flush bookkeeping and fire the fragment warn
4947 /// once if the threshold is crossed.
4948 ///
4949 /// Each flush typically appends ~1 fragment per touched dataset on
4950 /// the fork's branches; without compaction (deferred to Phase 5)
4951 /// long-lived heavy-write forks degrade. The flush count is a
4952 /// proxy for actual fragment growth — reading
4953 /// `Dataset::manifest().fragments.len()` per dataset would add a
4954 /// per-flush object-store roundtrip on the hot commit path, which
4955 /// is too costly for a purely observational guard rail.
4956 ///
4957 /// No-op for primary writers (`fork_id == None`).
4958 #[allow(dead_code)] // called by tests; production path uses _static
4959 pub(crate) fn tick_fork_fragment_observability(&self) {
4960 Self::tick_fork_fragment_observability_static(
4961 self.fork_id,
4962 self.fork_flush_count.clone(),
4963 self.fork_fragment_warn_fired.clone(),
4964 self.config.fork_fragment_warn_threshold,
4965 );
4966 }
4967
4968 /// Static variant of [`Writer::tick_fork_fragment_observability`].
4969 /// Used by the async-flush finalize path, where we hold a
4970 /// [`SharedFlushCtx`] bundle of Arcs rather than `&Writer`.
4971 pub(crate) fn tick_fork_fragment_observability_static(
4972 fork_id: Option<ForkId>,
4973 fork_flush_count: Arc<AtomicU64>,
4974 fork_fragment_warn_fired: Arc<AtomicBool>,
4975 warn_threshold: usize,
4976 ) {
4977 let Some(fork_id) = fork_id else { return };
4978 // `Relaxed` is sufficient: observational counter, no synchronizes-with.
4979 let new_count = fork_flush_count.fetch_add(1, Ordering::Relaxed) + 1;
4980 let fork_label = fork_id.to_string();
4981 metrics::gauge!(
4982 "uni_fork_l1_flushes",
4983 "fork" => fork_label.clone(),
4984 )
4985 .set(new_count as f64);
4986 let threshold = warn_threshold as u64;
4987 if !fork_fragment_warn_fired.load(Ordering::Relaxed)
4988 && threshold > 0
4989 && new_count >= threshold
4990 {
4991 fork_fragment_warn_fired.store(true, Ordering::Relaxed);
4992 tracing::warn!(
4993 fork = %fork_label,
4994 flush_count = new_count,
4995 threshold,
4996 "fork has exceeded the L1 flush-count threshold; \
4997 fork compaction is deferred to Phase 5 — consider \
4998 drop+recreate or promotion to bound fragment growth"
4999 );
5000 }
5001 }
5002
5003 /// Check rebuild thresholds and schedule background index rebuilds for
5004 /// labels that exceed growth or age limits. Marks affected indexes as
5005 /// `Stale` and spawns an async task to schedule the rebuild.
5006 #[allow(dead_code)] // production path uses _static; kept as the
5007 // documented instance entry point.
5008 fn schedule_index_rebuilds_if_needed(
5009 &self,
5010 manifest: &SnapshotManifest,
5011 rebuild_mgr: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
5012 ) {
5013 Self::schedule_index_rebuilds_if_needed_static(
5014 manifest,
5015 rebuild_mgr,
5016 self.schema_manager.clone(),
5017 self.config.index_rebuild.clone(),
5018 );
5019 }
5020
5021 /// Static variant of [`Writer::schedule_index_rebuilds_if_needed`].
5022 /// Used by the async-flush finalize path, where we hold the
5023 /// [`SchemaManager`] via `SharedFlushCtx` rather than `&Writer`.
5024 pub(crate) fn schedule_index_rebuilds_if_needed_static(
5025 manifest: &SnapshotManifest,
5026 rebuild_mgr: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
5027 schema_manager: Arc<uni_common::core::schema::SchemaManager>,
5028 index_rebuild_config: uni_common::config::IndexRebuildConfig,
5029 ) {
5030 let checker =
5031 crate::storage::index_rebuild::RebuildTriggerChecker::new(index_rebuild_config);
5032 let schema = schema_manager.schema();
5033 let labels = checker.labels_needing_rebuild(manifest, &schema.indexes);
5034
5035 if labels.is_empty() {
5036 return;
5037 }
5038
5039 // Mark affected indexes as Stale
5040 for label in &labels {
5041 for idx in &schema.indexes {
5042 if idx.label() == label {
5043 let _ = schema_manager.update_index_metadata(idx.name(), |m| {
5044 m.status = uni_common::core::schema::IndexStatus::Stale;
5045 });
5046 }
5047 }
5048 }
5049
5050 tokio::spawn(async move {
5051 if let Err(e) = rebuild_mgr.schedule(labels).await {
5052 tracing::warn!("Failed to schedule index rebuild: {e}");
5053 }
5054 });
5055 }
5056}
5057
5058/// `FinalizeFn` implementation that the `FlushCoordinator` invokes from
5059/// its single-task finalizer loop. Unit struct on purpose: it must NOT
5060/// hold `Arc<Writer>` (that would create a reference cycle Writer ->
5061/// FlushCoordinator -> Arc<dyn FinalizeFn> -> Writer). All state needed
5062/// for finalize travels in via `SharedFlushCtx`.
5063pub(crate) struct WriterFinalizer;
5064
5065impl FinalizeFn for WriterFinalizer {
5066 fn finalize<'a>(
5067 &'a self,
5068 rotated: RotatedFlush,
5069 outcome: AsyncFlushOutcome,
5070 shared: SharedFlushCtx,
5071 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<String>> + Send + 'a>> {
5072 Box::pin(async move {
5073 // Read initial_size / initial_count from the rotated L0 so
5074 // we don't have to plumb them through the coordinator
5075 // submission. The buffer is still alive in pending_flush
5076 // until `complete_flush` (J) below pops it.
5077 let (initial_size, initial_count) = {
5078 let l0 = rotated.old_l0_arc.read();
5079 (l0.estimated_size, l0.mutation_count)
5080 };
5081 let result = Writer::flush_finalize_now(
5082 shared,
5083 rotated.old_l0_arc.clone(),
5084 rotated.wal_lsn,
5085 outcome.new_manifest,
5086 outcome.snapshot_id,
5087 initial_size,
5088 initial_count,
5089 std::time::Instant::now(),
5090 )
5091 .await;
5092 // `rotated` (permit + flush_in_progress_guard) drops here.
5093 drop(rotated.permit);
5094 result
5095 })
5096 }
5097
5098 fn finalize_failure<'a>(
5099 &'a self,
5100 rotated: RotatedFlush,
5101 err: anyhow::Error,
5102 _shared: SharedFlushCtx,
5103 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Error> + Send + 'a>> {
5104 Box::pin(async move {
5105 tracing::warn!(
5106 error = %err,
5107 seq = rotated.seq,
5108 "async flush stream failed; old L0 remains in pending_flush, \
5109 WAL retains its data, recovery via WAL replay on restart"
5110 );
5111 metrics::counter!("uni_flush_failures_total").increment(1);
5112 // Permit + guard drop here so back-pressure releases even on
5113 // failure.
5114 drop(rotated.permit);
5115 err
5116 })
5117 }
5118}
5119
5120#[cfg(test)]
5121mod tests {
5122 use super::*;
5123 use tempfile::tempdir;
5124
5125 /// Test that commit_transaction writes mutations to WAL before merging to main L0.
5126 /// This verifies fix for issue #137 (transaction commit atomicity).
5127 #[tokio::test]
5128 async fn test_commit_transaction_wal_before_merge() -> Result<()> {
5129 use crate::runtime::wal::WriteAheadLog;
5130 use crate::storage::manager::StorageManager;
5131 use object_store::local::LocalFileSystem;
5132 use object_store::path::Path as ObjectStorePath;
5133 use uni_common::core::schema::SchemaManager;
5134
5135 let dir = tempdir()?;
5136 let path = dir.path().to_str().unwrap();
5137 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5138 let schema_path = ObjectStorePath::from("schema.json");
5139
5140 let schema_manager =
5141 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5142 let _label_id = schema_manager.add_label("Test")?;
5143 schema_manager.save().await?;
5144
5145 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5146
5147 // Create WAL for main L0
5148 let wal_path = ObjectStorePath::from("wal");
5149 let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
5150
5151 let writer = Writer::new_with_config(
5152 storage.clone(),
5153 schema_manager.clone(),
5154 1,
5155 UniConfig::default(),
5156 Some(wal),
5157 None,
5158 )
5159 .await?;
5160
5161 // Begin transaction — create a transaction L0
5162 let tx_l0 = writer.create_transaction_l0();
5163
5164 // Insert data in transaction
5165 let vid_a = writer.next_vid().await?;
5166 let vid_b = writer.next_vid().await?;
5167
5168 let mut props = std::collections::HashMap::new();
5169 props.insert("test".to_string(), Value::String("data".to_string()));
5170
5171 writer
5172 .insert_vertex_with_labels(vid_a, props.clone(), &["Test".to_string()], Some(&tx_l0))
5173 .await?;
5174 writer
5175 .insert_vertex_with_labels(
5176 vid_b,
5177 std::collections::HashMap::new(),
5178 &["Test".to_string()],
5179 Some(&tx_l0),
5180 )
5181 .await?;
5182
5183 let eid = writer.next_eid(1).await?;
5184 writer
5185 .insert_edge(
5186 vid_a,
5187 vid_b,
5188 1,
5189 eid,
5190 std::collections::HashMap::new(),
5191 None,
5192 Some(&tx_l0),
5193 )
5194 .await?;
5195
5196 // Get WAL before commit
5197 let l0 = writer.l0_manager.get_current();
5198 let wal = l0.read().wal.clone().expect("Main L0 should have WAL");
5199 let mutations_before = wal.replay().await?;
5200 let count_before = mutations_before.len();
5201
5202 // Commit transaction - this should write to WAL first
5203 let writer = Arc::new(writer);
5204 writer.commit_transaction_l0(tx_l0).await?;
5205
5206 // Verify WAL has the new mutations
5207 let mutations_after = wal.replay().await?;
5208 assert!(
5209 mutations_after.len() > count_before,
5210 "WAL should contain transaction mutations after commit"
5211 );
5212
5213 // Verify mutations are in correct order: vertices first, then edges
5214 let new_mutations: Vec<_> = mutations_after.into_iter().skip(count_before).collect();
5215
5216 let mut saw_vertex_a = false;
5217 let mut saw_vertex_b = false;
5218 let mut saw_edge = false;
5219
5220 for mutation in &new_mutations {
5221 match mutation {
5222 crate::runtime::wal::Mutation::InsertVertex { vid, .. } => {
5223 if *vid == vid_a {
5224 saw_vertex_a = true;
5225 }
5226 if *vid == vid_b {
5227 saw_vertex_b = true;
5228 }
5229 // Vertices should come before edges
5230 assert!(!saw_edge, "Vertices should be logged to WAL before edges");
5231 }
5232 crate::runtime::wal::Mutation::InsertEdge { eid: e, .. } => {
5233 if *e == eid {
5234 saw_edge = true;
5235 }
5236 // Edges should come after vertices
5237 assert!(
5238 saw_vertex_a && saw_vertex_b,
5239 "Edge should be logged after both vertices"
5240 );
5241 }
5242 _ => {}
5243 }
5244 }
5245
5246 assert!(saw_vertex_a, "Vertex A should be in WAL");
5247 assert!(saw_vertex_b, "Vertex B should be in WAL");
5248 assert!(saw_edge, "Edge should be in WAL");
5249
5250 // Verify data is also in main L0
5251 let l0_read = l0.read();
5252 assert!(
5253 l0_read.vertex_properties.contains_key(&vid_a),
5254 "Vertex A should be in main L0"
5255 );
5256 assert!(
5257 l0_read.vertex_properties.contains_key(&vid_b),
5258 "Vertex B should be in main L0"
5259 );
5260 assert!(
5261 l0_read.edge_endpoints.contains_key(&eid),
5262 "Edge should be in main L0"
5263 );
5264
5265 Ok(())
5266 }
5267
5268 /// Test that failed WAL flush leaves transaction intact for retry or rollback.
5269 #[tokio::test]
5270 async fn test_commit_transaction_wal_failure_rollback() -> Result<()> {
5271 use crate::runtime::wal::WriteAheadLog;
5272 use crate::storage::manager::StorageManager;
5273 use object_store::local::LocalFileSystem;
5274 use object_store::path::Path as ObjectStorePath;
5275 use uni_common::core::schema::SchemaManager;
5276
5277 let dir = tempdir()?;
5278 let path = dir.path().to_str().unwrap();
5279 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5280 let schema_path = ObjectStorePath::from("schema.json");
5281
5282 let schema_manager =
5283 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5284 let _label_id = schema_manager.add_label("Test")?;
5285 let _baseline_label_id = schema_manager.add_label("Baseline")?;
5286 let _txdata_label_id = schema_manager.add_label("TxData")?;
5287 schema_manager.save().await?;
5288
5289 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5290
5291 // Create WAL for main L0
5292 let wal_path = ObjectStorePath::from("wal");
5293 let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
5294
5295 let writer = Writer::new_with_config(
5296 storage.clone(),
5297 schema_manager.clone(),
5298 1,
5299 UniConfig::default(),
5300 Some(wal),
5301 None,
5302 )
5303 .await?;
5304
5305 // Insert baseline data (outside transaction)
5306 let baseline_vid = writer.next_vid().await?;
5307 writer
5308 .insert_vertex_with_labels(
5309 baseline_vid,
5310 [("baseline".to_string(), Value::Bool(true))]
5311 .into_iter()
5312 .collect(),
5313 &["Baseline".to_string()],
5314 None,
5315 )
5316 .await?;
5317
5318 // Begin transaction — create a transaction L0
5319 let tx_l0 = writer.create_transaction_l0();
5320
5321 // Insert data in transaction
5322 let tx_vid = writer.next_vid().await?;
5323 writer
5324 .insert_vertex_with_labels(
5325 tx_vid,
5326 [("tx_data".to_string(), Value::Bool(true))]
5327 .into_iter()
5328 .collect(),
5329 &["TxData".to_string()],
5330 Some(&tx_l0),
5331 )
5332 .await?;
5333
5334 // Capture main L0 state before rollback
5335 let l0 = writer.l0_manager.get_current();
5336 let vertex_count_before = l0.read().vertex_properties.len();
5337
5338 // Rollback transaction (simulating what would happen after WAL flush failure)
5339 drop(tx_l0);
5340
5341 // Verify main L0 is unchanged
5342 let vertex_count_after = l0.read().vertex_properties.len();
5343 assert_eq!(
5344 vertex_count_before, vertex_count_after,
5345 "Main L0 should not change after rollback"
5346 );
5347
5348 // Baseline should still be present
5349 assert!(
5350 l0.read().vertex_properties.contains_key(&baseline_vid),
5351 "Baseline data should remain"
5352 );
5353
5354 // Transaction data should NOT be in main L0
5355 assert!(
5356 !l0.read().vertex_properties.contains_key(&tx_vid),
5357 "Transaction data should not be in main L0 after rollback"
5358 );
5359
5360 Ok(())
5361 }
5362
5363 /// Test that batch insert with shared labels does not clone labels per vertex.
5364 /// This verifies fix for issue #161 (redundant label cloning).
5365 #[tokio::test]
5366 async fn test_batch_insert_shared_labels() -> Result<()> {
5367 use crate::storage::manager::StorageManager;
5368 use object_store::local::LocalFileSystem;
5369 use object_store::path::Path as ObjectStorePath;
5370 use uni_common::core::schema::SchemaManager;
5371
5372 let dir = tempdir()?;
5373 let path = dir.path().to_str().unwrap();
5374 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5375 let schema_path = ObjectStorePath::from("schema.json");
5376
5377 let schema_manager =
5378 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5379 let _label_id = schema_manager.add_label("Person")?;
5380 schema_manager.save().await?;
5381
5382 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5383
5384 let writer = Writer::new(storage.clone(), schema_manager.clone(), 1).await?;
5385
5386 // Shared labels - should not be cloned per vertex
5387 let labels = &["Person".to_string()];
5388
5389 // Insert batch of vertices with same labels
5390 let mut vids = Vec::new();
5391 for i in 0..100 {
5392 let vid = writer.next_vid().await?;
5393 let mut props = std::collections::HashMap::new();
5394 props.insert("id".to_string(), Value::Int(i));
5395 writer
5396 .insert_vertex_with_labels(vid, props, labels, None)
5397 .await?;
5398 vids.push(vid);
5399 }
5400
5401 // Verify all vertices have the correct labels
5402 let l0 = writer.l0_manager.get_current();
5403 for vid in vids {
5404 let l0_guard = l0.read();
5405 let vertex_labels = l0_guard.vertex_labels.get(&vid);
5406 assert!(vertex_labels.is_some(), "Vertex should have labels");
5407 assert_eq!(
5408 vertex_labels.unwrap(),
5409 &vec!["Person".to_string()],
5410 "Labels should match"
5411 );
5412 }
5413
5414 Ok(())
5415 }
5416
5417 /// Test that estimated_size tracks mutations correctly and approximates size_bytes().
5418 /// This verifies fix for issue #147 (O(V+E) size_bytes() in metrics).
5419 #[tokio::test]
5420 async fn test_estimated_size_tracks_mutations() -> Result<()> {
5421 use crate::storage::manager::StorageManager;
5422 use object_store::local::LocalFileSystem;
5423 use object_store::path::Path as ObjectStorePath;
5424 use uni_common::core::schema::SchemaManager;
5425
5426 let dir = tempdir()?;
5427 let path = dir.path().to_str().unwrap();
5428 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5429 let schema_path = ObjectStorePath::from("schema.json");
5430
5431 let schema_manager =
5432 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5433 let _label_id = schema_manager.add_label("Test")?;
5434 schema_manager.save().await?;
5435
5436 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5437
5438 let writer = Writer::new(storage.clone(), schema_manager.clone(), 1).await?;
5439
5440 let l0 = writer.l0_manager.get_current();
5441
5442 // Initial state should be empty
5443 let initial_estimated = l0.read().estimated_size;
5444 let initial_actual = l0.read().size_bytes();
5445 assert_eq!(initial_estimated, 0, "Initial estimated_size should be 0");
5446 assert_eq!(initial_actual, 0, "Initial size_bytes should be 0");
5447
5448 // Insert vertices with properties
5449 let mut vids = Vec::new();
5450 for i in 0..10 {
5451 let vid = writer.next_vid().await?;
5452 let mut props = std::collections::HashMap::new();
5453 props.insert("name".to_string(), Value::String(format!("vertex_{}", i)));
5454 props.insert("index".to_string(), Value::Int(i));
5455 writer
5456 .insert_vertex_with_labels(vid, props, &[], None)
5457 .await?;
5458 vids.push(vid);
5459 }
5460
5461 // Verify estimated_size grew
5462 let after_vertices_estimated = l0.read().estimated_size;
5463 let after_vertices_actual = l0.read().size_bytes();
5464 assert!(
5465 after_vertices_estimated > 0,
5466 "estimated_size should grow after insertions"
5467 );
5468
5469 // Verify estimated_size is within reasonable bounds of actual size (within 2x)
5470 let ratio = after_vertices_estimated as f64 / after_vertices_actual as f64;
5471 assert!(
5472 (0.5..=2.0).contains(&ratio),
5473 "estimated_size ({}) should be within 2x of size_bytes ({}), ratio: {}",
5474 after_vertices_estimated,
5475 after_vertices_actual,
5476 ratio
5477 );
5478
5479 // Insert edges with a simple edge type
5480 let edge_type = 1u32;
5481 for i in 0..9 {
5482 let eid = writer.next_eid(edge_type).await?;
5483 writer
5484 .insert_edge(
5485 vids[i],
5486 vids[i + 1],
5487 edge_type,
5488 eid,
5489 std::collections::HashMap::new(),
5490 Some("NEXT".to_string()),
5491 None,
5492 )
5493 .await?;
5494 }
5495
5496 // Verify estimated_size grew further
5497 let after_edges_estimated = l0.read().estimated_size;
5498 let after_edges_actual = l0.read().size_bytes();
5499 assert!(
5500 after_edges_estimated > after_vertices_estimated,
5501 "estimated_size should grow after edge insertions"
5502 );
5503
5504 // Verify still within reasonable bounds
5505 let ratio = after_edges_estimated as f64 / after_edges_actual as f64;
5506 assert!(
5507 (0.5..=2.0).contains(&ratio),
5508 "estimated_size ({}) should be within 2x of size_bytes ({}), ratio: {}",
5509 after_edges_estimated,
5510 after_edges_actual,
5511 ratio
5512 );
5513
5514 Ok(())
5515 }
5516
5517 /// Test that flushing WAL on a writer with no mutations succeeds cleanly.
5518 #[tokio::test]
5519 async fn test_flush_wal_empty_l0_is_noop() -> Result<()> {
5520 use crate::runtime::wal::WriteAheadLog;
5521 use crate::storage::manager::StorageManager;
5522 use object_store::local::LocalFileSystem;
5523 use object_store::path::Path as ObjectStorePath;
5524 use uni_common::core::schema::SchemaManager;
5525
5526 let dir = tempdir()?;
5527 let path = dir.path().to_str().unwrap();
5528 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5529 let schema_path = ObjectStorePath::from("schema.json");
5530
5531 let schema_manager =
5532 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5533 schema_manager.save().await?;
5534
5535 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5536
5537 let wal_path = ObjectStorePath::from("wal");
5538 let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
5539
5540 let writer = Writer::new_with_config(
5541 storage.clone(),
5542 schema_manager.clone(),
5543 1,
5544 UniConfig::default(),
5545 Some(wal.clone()),
5546 None,
5547 )
5548 .await?;
5549
5550 // Flush with no mutations — should succeed cleanly
5551 let lsn = writer.flush_wal().await?;
5552 // LSN should be 0 or 1 (no real mutations flushed)
5553 assert!(lsn <= 1, "Empty flush should produce low LSN, got {}", lsn);
5554
5555 Ok(())
5556 }
5557
5558 /// Test that transaction data does not leak into main L0 without commit.
5559 #[tokio::test]
5560 async fn test_transaction_isolation_without_commit() -> Result<()> {
5561 use crate::runtime::wal::WriteAheadLog;
5562 use crate::storage::manager::StorageManager;
5563 use object_store::local::LocalFileSystem;
5564 use object_store::path::Path as ObjectStorePath;
5565 use uni_common::core::schema::SchemaManager;
5566
5567 let dir = tempdir()?;
5568 let path = dir.path().to_str().unwrap();
5569 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5570 let schema_path = ObjectStorePath::from("schema.json");
5571
5572 let schema_manager =
5573 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5574 let _label_id = schema_manager.add_label("Person")?;
5575 schema_manager.save().await?;
5576
5577 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5578
5579 let wal_path = ObjectStorePath::from("wal");
5580 let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
5581
5582 let writer = Writer::new_with_config(
5583 storage.clone(),
5584 schema_manager.clone(),
5585 1,
5586 UniConfig::default(),
5587 Some(wal),
5588 None,
5589 )
5590 .await?;
5591
5592 // Create transaction L0
5593 let tx_l0 = writer.create_transaction_l0();
5594
5595 // Insert vertex into transaction L0
5596 let vid = writer.next_vid().await?;
5597 writer
5598 .insert_vertex_with_labels(
5599 vid,
5600 [("name".to_string(), Value::String("Ghost".to_string()))]
5601 .into_iter()
5602 .collect(),
5603 &["Person".to_string()],
5604 Some(&tx_l0),
5605 )
5606 .await?;
5607
5608 // Verify data is in transaction L0
5609 assert!(
5610 tx_l0.read().vertex_properties.contains_key(&vid),
5611 "Transaction L0 should contain the vertex"
5612 );
5613
5614 // Verify data is NOT in main L0
5615 let main_l0 = writer.l0_manager.get_current();
5616 assert!(
5617 !main_l0.read().vertex_properties.contains_key(&vid),
5618 "Main L0 should NOT contain uncommitted transaction data"
5619 );
5620
5621 // Drop transaction without committing — data should be lost
5622 drop(tx_l0);
5623
5624 // Main L0 still should not have it
5625 assert!(
5626 !main_l0.read().vertex_properties.contains_key(&vid),
5627 "Main L0 should remain clean after dropped transaction"
5628 );
5629
5630 Ok(())
5631 }
5632
5633 /// Phase 2 Day 12: the fork-fragment warn fires exactly once when
5634 /// the flush count crosses the configured threshold and stays
5635 /// silent on subsequent flushes for the lifetime of the writer.
5636 /// Primary writers (`fork_id == None`) never fire it.
5637 ///
5638 /// Tested directly against `tick_fork_fragment_observability` so
5639 /// the contract is locked in independently of the broader
5640 /// `flush_to_l1` path (the end-to-end fork-flush path is blocked
5641 /// on Day 10's on-the-fly schema overlay growth).
5642 #[tokio::test]
5643 async fn fork_fragment_warn_fires_once_then_silences() -> Result<()> {
5644 use crate::storage::manager::StorageManager;
5645 use object_store::local::LocalFileSystem;
5646 use object_store::path::Path as ObjectStorePath;
5647 use uni_common::core::fork::ForkId;
5648 use uni_common::core::schema::SchemaManager;
5649
5650 let dir = tempdir()?;
5651 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5652 let schema_path = ObjectStorePath::from("schema.json");
5653 let schema_manager =
5654 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5655 let storage = Arc::new(
5656 StorageManager::new(dir.path().to_str().unwrap(), schema_manager.clone()).await?,
5657 );
5658
5659 let config = UniConfig {
5660 fork_fragment_warn_threshold: 3,
5661 ..Default::default()
5662 };
5663 let mut writer =
5664 Writer::new_with_config(storage, schema_manager, 1, config, None, None).await?;
5665
5666 // Primary path: never fires.
5667 for _ in 0..10 {
5668 writer.tick_fork_fragment_observability();
5669 }
5670 assert!(!writer.fork_fragment_warn_fired.load(Ordering::Relaxed));
5671 assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 0);
5672
5673 // Fork path: tag and tick. Below threshold → no fire.
5674 writer.fork_id = Some(ForkId::new());
5675 writer.tick_fork_fragment_observability();
5676 writer.tick_fork_fragment_observability();
5677 assert!(!writer.fork_fragment_warn_fired.load(Ordering::Relaxed));
5678 assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 2);
5679
5680 // Crossing threshold → fires once.
5681 writer.tick_fork_fragment_observability();
5682 assert!(writer.fork_fragment_warn_fired.load(Ordering::Relaxed));
5683 assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 3);
5684
5685 // Subsequent ticks bump the gauge but do not re-fire.
5686 let fired_after = writer.fork_fragment_warn_fired.load(Ordering::Relaxed);
5687 for _ in 0..5 {
5688 writer.tick_fork_fragment_observability();
5689 }
5690 assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 8);
5691 assert_eq!(
5692 writer.fork_fragment_warn_fired.load(Ordering::Relaxed),
5693 fired_after
5694 );
5695
5696 Ok(())
5697 }
5698
5699 /// The hot-path mutators must not write to any `Writer` struct field.
5700 /// Phase 2 of the refactor
5701 /// gave them `&self` receivers, which the compiler enforces against
5702 /// direct `self.x = y` assignment — but interior-mutable writes
5703 /// (Mutex/Atomic/OnceLock) still compile. This regression test snapshots
5704 /// every potentially-writable field, calls each hot-path mutator, and
5705 /// asserts no field changed.
5706 ///
5707 /// Cold-path methods (`flush_to_l1`, `commit_transaction_l0`,
5708 /// `tick_fork_fragment_observability`) DO mutate fields by design and
5709 /// are intentionally out of scope here.
5710 #[tokio::test]
5711 async fn hot_path_mutators_do_not_change_writer_fields() -> Result<()> {
5712 use crate::storage::manager::StorageManager;
5713 use object_store::local::LocalFileSystem;
5714 use object_store::path::Path as ObjectStorePath;
5715 use uni_common::core::schema::SchemaManager;
5716
5717 let dir = tempdir()?;
5718 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5719 let schema_path = ObjectStorePath::from("schema.json");
5720 let schema_manager =
5721 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5722 schema_manager.add_label("Person")?;
5723 schema_manager.save().await?;
5724 let storage = Arc::new(
5725 StorageManager::new(dir.path().to_str().unwrap(), schema_manager.clone()).await?,
5726 );
5727
5728 let writer =
5729 Writer::new_with_config(storage, schema_manager, 1, UniConfig::default(), None, None)
5730 .await?;
5731
5732 /// Captures every `Writer` field that *could* be written by a
5733 /// hot-path mutator (i.e., every non-Arc, non-immutable-after-
5734 /// construction field). Arc'd substructures (`l0_manager`,
5735 /// `storage`, etc.) are intentionally not checked — they are
5736 /// re-pointed only at construction.
5737 #[derive(Debug, PartialEq)]
5738 struct Snapshot {
5739 last_flush_time: std::time::Instant,
5740 cached_manifest_some: bool,
5741 fork_flush_count: u64,
5742 fork_fragment_warn_fired: bool,
5743 xervo_runtime_some: bool,
5744 index_rebuild_manager_some: bool,
5745 fork_id: Option<ForkId>,
5746 }
5747
5748 fn snap(w: &Writer) -> Snapshot {
5749 Snapshot {
5750 last_flush_time: *w.last_flush_time.lock(),
5751 cached_manifest_some: w.cached_manifest.lock().is_some(),
5752 fork_flush_count: w.fork_flush_count.load(Ordering::Relaxed),
5753 fork_fragment_warn_fired: w.fork_fragment_warn_fired.load(Ordering::Relaxed),
5754 xervo_runtime_some: w.xervo_runtime.get().is_some(),
5755 index_rebuild_manager_some: w.index_rebuild_manager.get().is_some(),
5756 fork_id: w.fork_id,
5757 }
5758 }
5759
5760 // 1. insert_vertex_with_labels
5761 let before = snap(&writer);
5762 let vid = writer.next_vid().await?;
5763 writer
5764 .insert_vertex_with_labels(vid, Properties::new(), &["Person".to_string()], None)
5765 .await?;
5766 assert_eq!(
5767 snap(&writer),
5768 before,
5769 "insert_vertex_with_labels mutated a Writer field"
5770 );
5771
5772 // 2. insert_vertices_batch
5773 let before = snap(&writer);
5774 let vids = writer.allocate_vids(2).await?;
5775 writer
5776 .insert_vertices_batch(
5777 vids,
5778 vec![Properties::new(), Properties::new()],
5779 vec!["Person".into()],
5780 None,
5781 )
5782 .await?;
5783 assert_eq!(
5784 snap(&writer),
5785 before,
5786 "insert_vertices_batch mutated a Writer field"
5787 );
5788
5789 // 3. delete_vertex
5790 let before = snap(&writer);
5791 writer.delete_vertex(vid, None, None).await?;
5792 assert_eq!(
5793 snap(&writer),
5794 before,
5795 "delete_vertex mutated a Writer field"
5796 );
5797
5798 // (insert_edge / delete_edge are skipped here: their fixture cost is
5799 // disproportionate to the audit's marginal value, and the same
5800 // structural argument plus the compiler-enforced `&self` covers them.)
5801
5802 Ok(())
5803 }
5804}