Skip to main content

uni_store/runtime/
writer.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4#[cfg(feature = "lance-backend")]
5use crate::backend::table_names;
6use crate::runtime::context::QueryContext;
7use crate::runtime::flush_coordinator::{
8    FinalizeFn, FlushCoordinator, FlushOutcome as AsyncFlushOutcome, RotatedFlush, SharedFlushCtx,
9};
10use crate::runtime::id_allocator::IdAllocator;
11use crate::runtime::l0::{L0Buffer, serialize_constraint_key};
12use crate::runtime::l0_manager::L0Manager;
13use crate::runtime::property_manager::PropertyManager;
14use crate::runtime::wal::WriteAheadLog;
15use crate::storage::adjacency_manager::AdjacencyManager;
16use crate::storage::delta::{L1Entry, Op};
17use crate::storage::main_edge::MainEdgeDataset;
18use crate::storage::main_vertex::MainVertexDataset;
19use crate::storage::manager::StorageManager;
20use anyhow::{Result, anyhow};
21use chrono::Utc;
22use metrics;
23use parking_lot::{Mutex as PlMutex, RwLock};
24use std::collections::{BTreeMap, HashMap, HashSet};
25use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
26use std::sync::{Arc, OnceLock};
27use tracing::{debug, info, instrument};
28use uni_common::Properties;
29use uni_common::Value;
30use uni_common::config::UniConfig;
31use uni_common::core::fork::ForkId;
32use uni_common::core::id::{Eid, Vid};
33use uni_common::core::schema::{ConstraintTarget, ConstraintType, IndexDefinition};
34use uni_common::core::snapshot::{EdgeSnapshot, LabelSnapshot, SnapshotManifest};
35use uni_xervo::runtime::ModelRuntime;
36use uuid::Uuid;
37
38/// Whether `property` on `label` is a multi-vector (`List<Vector>`) column — the
39/// late-interaction (ColBERT) shape that auto-embeds per-token via the multi-vector model
40/// (issue #104). A plain `Vector` column uses the dense single-vector model.
41fn is_multivector_property(
42    schema: &uni_common::core::schema::Schema,
43    label: &str,
44    property: &str,
45) -> bool {
46    schema
47        .properties
48        .get(label)
49        .and_then(|p| p.get(property))
50        .is_some_and(|m| {
51            matches!(&m.r#type, uni_common::DataType::List(inner)
52                if matches!(**inner, uni_common::DataType::Vector { .. }))
53        })
54}
55
56/// Convert per-token embedding vectors into the stored `List<Vector>` representation:
57/// a `Value::List` whose elements are each a `Value::List<Float>` (one token vector).
58fn multivec_to_value(tokens: &[Vec<f32>]) -> Value {
59    Value::List(
60        tokens
61            .iter()
62            .map(|tok| Value::List(tok.iter().map(|f| Value::Float(*f as f64)).collect()))
63            .collect(),
64    )
65}
66
67/// One coalesced auto-embed group: all targets that share an `alias` + `source_properties`,
68/// split into dense (`Vector`) vs multi-vector (`List<Vector>`) target columns. When a group
69/// has BOTH kinds, a single hybrid inference fills both (single forward pass).
70struct EmbedGroupSpec {
71    source_properties: Vec<String>,
72    document_prefix: Option<String>,
73    dense: Vec<String>,
74    multi: Vec<String>,
75    sparse: Vec<String>,
76}
77
78/// Convert one xervo sparse embedding (`(term_id, weight)` pairs, possibly
79/// unsorted / with duplicate terms) into a `Value::SparseVector`. Sorting (via
80/// `BTreeMap`) and summing duplicates yields the sorted-unique invariant the
81/// index and codec require; non-finite weights are dropped (never poison a row).
82fn sparse_pairs_to_value(pairs: &[(u32, f32)]) -> Value {
83    let mut by_term: std::collections::BTreeMap<u32, f32> = std::collections::BTreeMap::new();
84    for &(term, weight) in pairs {
85        if weight.is_finite() {
86            *by_term.entry(term).or_insert(0.0) += weight;
87        }
88    }
89    Value::SparseVector {
90        indices: by_term.keys().copied().collect(),
91        values: by_term.values().copied().collect(),
92    }
93}
94
95/// Run ONE embedding inference for a coalesced group of auto-embed targets sharing an alias +
96/// source text, returning `(dense_per_text, multi_vector_per_text)` (each `Some` iff that head
97/// was requested). When both heads are needed this uses the hybrid model
98/// (`hybrid_embedder`), so a multi-functional model (e.g. BGE-M3) produces the dense + ColBERT
99/// heads from a SINGLE forward pass; otherwise it uses the single-head dense / multi-vector
100/// embedder (today's behavior for non-mixed groups).
101#[allow(clippy::type_complexity)]
102async fn embed_group(
103    runtime: &ModelRuntime,
104    alias: &str,
105    texts: &[&str],
106    want_dense: bool,
107    want_multi: bool,
108    want_sparse: bool,
109) -> Result<(
110    Option<Vec<Vec<f32>>>,
111    Option<Vec<Vec<Vec<f32>>>>,
112    Option<Vec<Vec<(u32, f32)>>>,
113)> {
114    use uni_xervo::traits::hybrid::HeadSet;
115    let heads_wanted = u8::from(want_dense) + u8::from(want_multi) + u8::from(want_sparse);
116    if heads_wanted > 1 {
117        // Single-pass hybrid: one model, one inference, all requested heads
118        // (e.g. BGE-M3 fills dense + sparse from one forward pass).
119        let mut heads = HeadSet::empty();
120        if want_dense {
121            heads |= HeadSet::DENSE;
122        }
123        if want_multi {
124            heads |= HeadSet::MULTI_VECTOR;
125        }
126        if want_sparse {
127            heads |= HeadSet::SPARSE;
128        }
129        let embedder = runtime.hybrid_embedder(alias).await?;
130        let res = embedder.embed(texts, heads).await?;
131        let dense = if want_dense {
132            Some(res.dense.ok_or_else(|| {
133                anyhow!("hybrid model '{alias}' returned no dense head for a Vector target")
134            })?)
135        } else {
136            None
137        };
138        let multi = if want_multi {
139            Some(res.multi_vector.ok_or_else(|| {
140                anyhow!(
141                    "hybrid model '{alias}' returned no multi-vector head for a List<Vector> target"
142                )
143            })?)
144        } else {
145            None
146        };
147        let sparse = if want_sparse {
148            Some(res.sparse.ok_or_else(|| {
149                anyhow!("hybrid model '{alias}' returned no sparse head for a SparseVector target")
150            })?)
151        } else {
152            None
153        };
154        Ok((dense, multi, sparse))
155    } else if want_multi {
156        let embedder = runtime.multi_vector_embedder(alias).await?;
157        Ok((None, Some(embedder.embed(texts).await?.vectors), None))
158    } else if want_sparse {
159        let embedder = runtime.sparse_embedder(alias).await?;
160        Ok((None, None, Some(embedder.embed(texts).await?.vectors)))
161    } else {
162        let embedder = runtime.embedding(alias).await?;
163        Ok((Some(embedder.embed(texts).await?.vectors), None, None))
164    }
165}
166
167/// Group a label's auto-embed configs by `(alias, source_properties)`, classifying each target
168/// column as dense vs multi-vector. A group with both kinds is a single-pass hybrid source.
169fn collect_embed_groups(
170    schema: &uni_common::core::schema::Schema,
171    label: &str,
172) -> std::collections::BTreeMap<(String, Vec<String>), EmbedGroupSpec> {
173    use std::collections::BTreeMap;
174    let mut groups: BTreeMap<(String, Vec<String>), EmbedGroupSpec> = BTreeMap::new();
175    fn spec_for<'a>(
176        groups: &'a mut std::collections::BTreeMap<(String, Vec<String>), EmbedGroupSpec>,
177        emb: &uni_common::core::schema::EmbeddingConfig,
178    ) -> &'a mut EmbedGroupSpec {
179        groups
180            .entry((emb.alias.clone(), emb.source_properties.clone()))
181            .or_insert_with(|| EmbedGroupSpec {
182                source_properties: emb.source_properties.clone(),
183                document_prefix: emb.document_prefix.clone(),
184                dense: Vec::new(),
185                multi: Vec::new(),
186                sparse: Vec::new(),
187            })
188    }
189    for idx in &schema.indexes {
190        if let IndexDefinition::Vector(v_config) = idx
191            && v_config.label == label
192            && let Some(emb) = &v_config.embedding_config
193        {
194            let g = spec_for(&mut groups, emb);
195            if is_multivector_property(schema, label, &v_config.property) {
196                g.multi.push(v_config.property.clone());
197            } else {
198                g.dense.push(v_config.property.clone());
199            }
200        } else if let IndexDefinition::Sparse(s_config) = idx
201            && s_config.label == label
202            && let Some(emb) = &s_config.embedding_config
203        {
204            spec_for(&mut groups, emb)
205                .sparse
206                .push(s_config.property.clone());
207        }
208    }
209    groups
210}
211
212/// On a partial / `SET` write, when the write touches a *source* property of an
213/// auto-embed group, drop that group's target columns from `props` so the
214/// `!contains_key` guard in `process_embeddings_*` re-embeds them (otherwise the
215/// re-read old embedding is preserved → stale), and add the targets to
216/// `touched_keys` so the partial Lance write persists the refresh. Mirrors the
217/// MUVERA derived-column touched-keys handling. A target the caller set
218/// explicitly in the SAME write (already in `touched_keys`) is left intact.
219fn refresh_touched_embed_targets(
220    schema: &uni_common::core::schema::Schema,
221    props: &mut Properties,
222    touched_keys: &mut HashSet<String>,
223    labels: &[String],
224) {
225    let Some(label) = labels.first() else {
226        return;
227    };
228    let user_touched = touched_keys.clone();
229    for (_key, group) in collect_embed_groups(schema, label) {
230        if !group
231            .source_properties
232            .iter()
233            .any(|s| touched_keys.contains(s))
234        {
235            continue;
236        }
237        for target in group
238            .dense
239            .iter()
240            .chain(group.multi.iter())
241            .chain(group.sparse.iter())
242        {
243            if user_touched.contains(target) {
244                continue;
245            }
246            props.remove(target);
247            touched_keys.insert(target.clone());
248        }
249    }
250}
251
252#[derive(Clone, Debug)]
253pub struct WriterConfig {
254    pub max_mutations: usize,
255    /// Enable the partial-column MergeInsert path for SET-only flushes.
256    ///
257    /// When `true`, `Writer::insert_vertex_partial` records the touched
258    /// property keys into `L0Buffer::vertex_partial_keys` and the flush
259    /// routes those VIDs through Lance `MergeInsertBuilder` with a
260    /// subset-of-schema source, skipping the read of (and write of)
261    /// the unchanged columns — including wide ones like embeddings.
262    ///
263    /// When `false`, `insert_vertex_partial` falls back to the
264    /// read-modify-write `insert_vertex_with_labels` path (preserving
265    /// bit-for-bit equivalence with prior releases). Default `false`
266    /// for the first release; flip to `true` after telemetry on the
267    /// issue #72 ingest workload confirms the win.
268    ///
269    /// See the soundness probe at
270    /// `crates/uni-store/tests/common/storage/lance_merge_insert_probe.rs`.
271    pub partial_lance_writes: bool,
272}
273
274impl Default for WriterConfig {
275    fn default() -> Self {
276        Self {
277            max_mutations: 10_000,
278            partial_lance_writes: false,
279        }
280    }
281}
282
283/// Parent state captured atomically at a fork point under `flush_lock`.
284///
285/// Holds the allocator high-water marks and every existing dataset's
286/// Lance main-branch version at the instant the parent's L0 was
287/// flushed. Because [`Writer::flush_and_capture_fork_point`] reads these
288/// while still holding `flush_lock`, no concurrent commit or flush can
289/// advance the allocator or any dataset tip between the flush and the
290/// reads. A fork built from these values therefore cannot collide VIDs
291/// with the parent nor inherit rows committed after the fork point.
292#[derive(Clone, Debug, Default)]
293pub struct ForkPoint {
294    /// Next vertex id the parent would allocate at the fork point.
295    pub vid_hwm: u64,
296    /// Next edge id the parent would allocate at the fork point.
297    pub eid_hwm: u64,
298    /// `dataset_name` → Lance main-branch version at the fork point.
299    ///
300    /// Keys use the same dataset naming as the fork branch loop
301    /// (`vertices`, `edges`, `vertices_{label}`, `deltas_{type}_{dir}`,
302    /// `adjacency_{type}_{dir}`). A dataset with no `.lance` directory
303    /// on disk at the fork point has no entry.
304    pub dataset_versions: BTreeMap<String, u64>,
305    /// Parent's MVCC version high-water-mark at the fork point: the
306    /// largest `_version` any inherited row can carry. A fork bootstraps
307    /// its own version counter to this floor so a fork transaction's
308    /// `_version <= pin` read still sees inherited (base_paths) rows,
309    /// while the fork's own writes get versions above it.
310    pub version_hwm: u64,
311}
312
313/// RAII latch on [`StorageManager::flush_in_progress`].
314///
315/// Sets the flag to `true` on construction (via CAS) and back to `false` on
316/// drop, so any `?` early-exit inside `flush_to_l1` cannot leave the flag
317/// stuck. Returns `None` if a flush is already in progress, providing
318/// forward-compatible exclusion once the outer writer-RwLock is removed in
319/// Phase 4 of the concurrent-writer refactor.
320// FlushInProgressGuard moved to storage/manager.rs so flush_coordinator.rs
321// can hold it on RotatedFlush without a writer.rs back-import cycle.
322pub use crate::storage::manager::FlushInProgressGuard;
323
324/// Output of [`Writer::flush_l0_rotate`]: the to-be-flushed L0 buffer,
325/// captured WAL LSN, current_version, and the in-progress guard whose
326/// lifetime spans the full flush (including the future async stream
327/// phase that runs on a spawned task).
328struct RotateOutput {
329    old_l0_arc: Arc<RwLock<L0Buffer>>,
330    wal_lsn: u64,
331    current_version: u64,
332    flush_in_progress_guard: FlushInProgressGuard,
333}
334
335/// Project a property map to a subset selected by `keys`. Used to
336/// run `touched_needs_full_read` against just the SET-touched keys
337/// when the caller passes a fully-merged `props` map.
338fn props_subset(props: &Properties, keys: &HashSet<String>) -> Properties {
339    let mut out = Properties::new();
340    for k in keys {
341        if let Some(v) = props.get(k) {
342            out.insert(k.clone(), v.clone());
343        }
344    }
345    out
346}
347
348/// Join a storage base URI and a dataset name into a `.lance` URI.
349///
350/// Mirrors the fork branch loop's `join_uri` so the versions captured
351/// by [`Writer::flush_and_capture_fork_point`] key the exact same
352/// datasets the fork later branches.
353fn join_lance_uri(base: &str, dataset: &str) -> String {
354    if base.ends_with('/') {
355        format!("{base}{dataset}.lance")
356    } else {
357        format!("{base}/{dataset}.lance")
358    }
359}
360
361/// Cheap on-disk existence check for a dataset `.lance` directory.
362///
363/// Local-fs heuristic: a URI with a `://` scheme is assumed remote and
364/// reported present, deferring the real check to `current_version`.
365/// Mirrors the fork branch loop's `path_exists`.
366fn lance_path_exists(uri: &str) -> bool {
367    if uri.contains("://") {
368        return true;
369    }
370    std::path::Path::new(uri).exists()
371}
372
373/// Output of [`Writer::flush_stream_l1`]: the built (but not yet
374/// published) snapshot manifest and its id. Finalize is responsible
375/// for `save_snapshot` + `set_latest_snapshot` + `cached_manifest`
376/// update.
377struct FlushOutcome {
378    manifest: SnapshotManifest,
379    snapshot_id: String,
380}
381
382pub struct Writer {
383    pub l0_manager: Arc<L0Manager>,
384    pub storage: Arc<StorageManager>,
385    pub schema_manager: Arc<uni_common::core::schema::SchemaManager>,
386    pub allocator: Arc<IdAllocator>,
387    pub config: UniConfig,
388    /// Optional embedding runtime. `OnceLock` so the initializer can run
389    /// on `&self` after the `Writer` has been wrapped in `Arc<Writer>`
390    /// (Phase 4 of concurrent_writer.md). Read through
391    /// [`Writer::xervo_runtime`] — the field itself is private to keep
392    /// callers oblivious to the OnceLock representation.
393    xervo_runtime: OnceLock<Arc<ModelRuntime>>,
394    /// Property manager for cache invalidation after flush
395    pub property_manager: Option<Arc<PropertyManager>>,
396    /// Adjacency manager for dual-write (edges survive flush).
397    adjacency_manager: Arc<AdjacencyManager>,
398    /// Timestamp of last flush or creation. Interior-mutable so that
399    /// `&self` callers can update it; uncontended in practice because all
400    /// writes happen inside the single-flusher critical section.
401    /// Arc-wrapped so it can travel into the SharedFlushCtx that the
402    /// async-flush coordinator passes to spawned stream/finalize tasks.
403    last_flush_time: Arc<PlMutex<std::time::Instant>>,
404    /// Background compaction task handle (prevents concurrent compaction races)
405    compaction_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
406    /// Optional index rebuild manager for post-flush automatic rebuild scheduling.
407    /// `OnceLock` for the same reason as `xervo_runtime`.
408    /// Wrapped in `Arc` so the async-flush finalize path can read it
409    /// from a spawned task via `SharedFlushCtx`.
410    index_rebuild_manager: Arc<OnceLock<Arc<crate::storage::index_rebuild::IndexRebuildManager>>>,
411    /// Cached snapshot manifest from the last flush. Avoids re-reading from
412    /// object store on every flush_to_l1 call. Wrapped in a `Mutex` for
413    /// `&self` access; uncontended because all access is inside the
414    /// single-flusher critical section.
415    cached_manifest: Arc<PlMutex<Option<SnapshotManifest>>>,
416    /// Identifier of the fork this writer serves, if any. `None` for
417    /// primary's writer. Set by [`crate::fork::writer_factory::new_for_fork`]
418    /// and read in `flush_to_l1` to emit fork-tagged metrics and to fire
419    /// the fragment-count guard rail (Phase 2 Day 12).
420    pub fork_id: Option<ForkId>,
421    /// Number of `flush_to_l1` calls since this writer was constructed.
422    /// Used as a proxy for L1 fragment growth on the fork's branches:
423    /// each flush typically appends ~1 fragment per touched dataset, so
424    /// the count tracks the order of magnitude of fragment accumulation.
425    /// Reading the actual `Dataset::manifest().fragments.len()` per
426    /// flush would add a per-dataset object-store roundtrip on the hot
427    /// commit path; the proxy keeps the guard rail purely observational
428    /// (Phase 5 introduces fork compaction proper). Only meaningful when
429    /// `fork_id.is_some()`. `Relaxed` is sufficient — observational only.
430    fork_flush_count: Arc<AtomicU64>,
431    /// Whether the fork-fragment warning has already fired at the
432    /// configured threshold. One-shot per writer lifetime. `Relaxed` is
433    /// sufficient — observational only.
434    fork_fragment_warn_fired: Arc<AtomicBool>,
435    /// Dedicated lock for the genuinely-exclusive flush path. Acquired by
436    /// the [`Writer::flush_to_l1`] entry and by `commit_transaction_l0`
437    /// across its WAL-append + L0-merge window. Replaces the outer
438    /// `Arc<RwLock<Writer>>` for flush exclusion once Phase 4 drops it.
439    /// Arc-wrapped so async-flush coordinator's finalize path can
440    /// re-acquire it from a spawned task via SharedFlushCtx.
441    flush_lock: Arc<tokio::sync::Mutex<()>>,
442    /// Coordinator for async-flush pipeline. Owns the back-pressure
443    /// semaphore, rotate-order sequence, single-finalizer task, and
444    /// pending-flush counter. Always present even when async flush is
445    /// disabled — the sync `flush_to_l1` path uses it for the future
446    /// `FlushInProgressGuard`/permit ownership model.
447    /// Coordinator is `None` when `async_flush_enabled = false`. The
448    /// coordinator's finalizer task captures `SharedFlushCtx` which
449    /// includes `Arc<StorageManager>`; on a fork-scoped Writer that
450    /// also pins the fork's `ForkScope` via `storage.fork_scope`, so
451    /// the holder count never drops. Constructing it only when the
452    /// feature is actually on avoids that side-effect for all
453    /// existing sync-flush paths. When async-flush graduates from
454    /// opt-in to default (Commit 12), `drop_fork` (Commit 8) handles
455    /// the drain explicitly.
456    #[allow(dead_code)] // first production use lands in Commit 6/7
457    pub(crate) flush_coordinator: Option<Arc<crate::runtime::flush_coordinator::FlushCoordinator>>,
458    /// Optimistic-concurrency commit-sequence counter (SSI). Incremented once
459    /// per successful commit under `flush_lock`; a transaction captures the
460    /// current value at begin as its read sequence (`L0Buffer::occ_read_seq`).
461    ///
462    /// Always allocated; consulted only when `config.ssi_enabled` is `true`.
463    ///
464    /// Typed through the [`crate::runtime::sync`] shim so the OCC commit core can
465    /// be model-checked under loom/shuttle; aliases to `std::AtomicU64` normally.
466    commit_sequence: Arc<crate::runtime::sync::AtomicU64>,
467    /// Bounded log of recently-committed write-sets for OCC conflict detection.
468    /// Read and updated only under `flush_lock`.
469    ///
470    /// Always allocated; consulted only when `config.ssi_enabled` is `true`.
471    committed_writes: Arc<PlMutex<crate::runtime::occ::CommitRegistry>>,
472    /// Per-row pessimistic locks for `FOR UPDATE` (SSI escape hatch), keyed by
473    /// canonical (label, key-props) bytes. A transaction holds the lock from
474    /// MATCH until commit/rollback, serializing concurrent `FOR UPDATE` writers
475    /// on the same key (avoiding optimistic abort-retry on hot keys).
476    ///
477    /// Always allocated; populated only when `config.ssi_enabled` is `true`.
478    for_update_locks: Arc<dashmap::DashMap<Vec<u8>, Arc<tokio::sync::Mutex<()>>>>,
479}
480
481/// Number of recent commits retained for OCC conflict detection. Large enough
482/// that under-run — and the resulting conservative abort — is rare in practice;
483/// each entry is a small set of touched ids.
484const OCC_REGISTRY_CAPACITY: usize = 4096;
485
486impl Writer {
487    pub async fn new(
488        storage: Arc<StorageManager>,
489        schema_manager: Arc<uni_common::core::schema::SchemaManager>,
490        start_version: u64,
491    ) -> Result<Self> {
492        Self::new_with_config(
493            storage,
494            schema_manager,
495            start_version,
496            UniConfig::default(),
497            None,
498            None,
499        )
500        .await
501    }
502
503    pub async fn new_with_config(
504        storage: Arc<StorageManager>,
505        schema_manager: Arc<uni_common::core::schema::SchemaManager>,
506        start_version: u64,
507        config: UniConfig,
508        wal: Option<Arc<WriteAheadLog>>,
509        allocator: Option<Arc<IdAllocator>>,
510    ) -> Result<Self> {
511        let allocator = if let Some(a) = allocator {
512            a
513        } else {
514            let store = storage.store();
515            let path = object_store::path::Path::from("id_allocator.json");
516            Arc::new(IdAllocator::new(store, path, 1000).await?)
517        };
518
519        let l0_manager = Arc::new(L0Manager::new(start_version, wal));
520
521        let property_manager = Some(Arc::new(PropertyManager::new(
522            storage.clone(),
523            schema_manager.clone(),
524            1000,
525        )));
526
527        let adjacency_manager = storage.adjacency_manager();
528
529        // Hoist the Arc'd fields so we can both stash them on Writer and
530        // hand the same Arcs to the SharedFlushCtx that FlushCoordinator
531        // captures. Single-source-of-truth for each piece of mutable
532        // shared state.
533        let last_flush_time = Arc::new(PlMutex::new(std::time::Instant::now()));
534        let cached_manifest = Arc::new(PlMutex::new(None));
535        let fork_flush_count = Arc::new(AtomicU64::new(0));
536        let fork_fragment_warn_fired = Arc::new(AtomicBool::new(false));
537        let flush_lock = Arc::new(tokio::sync::Mutex::new(()));
538        let compaction_handle = Arc::new(RwLock::new(None));
539        let index_rebuild_manager: Arc<
540            OnceLock<Arc<crate::storage::index_rebuild::IndexRebuildManager>>,
541        > = Arc::new(OnceLock::new());
542
543        let flush_coordinator = if config.async_flush_enabled {
544            let shared = SharedFlushCtx {
545                storage: storage.clone(),
546                l0_manager: l0_manager.clone(),
547                adjacency_manager: adjacency_manager.clone(),
548                property_manager: property_manager.clone(),
549                schema_manager: schema_manager.clone(),
550                cached_manifest: cached_manifest.clone(),
551                last_flush_time: last_flush_time.clone(),
552                fork_id: None,
553                fork_flush_count: fork_flush_count.clone(),
554                fork_fragment_warn_fired: fork_fragment_warn_fired.clone(),
555                fork_fragment_warn_threshold: config.fork_fragment_warn_threshold,
556                flush_lock: flush_lock.clone(),
557                index_rebuild_manager: index_rebuild_manager.clone(),
558                compaction_handle: compaction_handle.clone(),
559                compaction_config: config.compaction.clone(),
560                index_rebuild_config: config.index_rebuild.clone(),
561                auto_rebuild_enabled: config.index_rebuild.auto_rebuild_enabled,
562            };
563            let finalize_fn: Arc<dyn FinalizeFn> = Arc::new(WriterFinalizer);
564            Some(Arc::new(FlushCoordinator::new(
565                config.max_pending_flushes,
566                shared,
567                finalize_fn,
568            )))
569        } else {
570            None
571        };
572
573        let commit_sequence = Arc::new(crate::runtime::sync::AtomicU64::new(0));
574        let committed_writes = Arc::new(PlMutex::new(crate::runtime::occ::CommitRegistry::new(
575            OCC_REGISTRY_CAPACITY,
576        )));
577        let for_update_locks = Arc::new(dashmap::DashMap::new());
578
579        Ok(Self {
580            l0_manager,
581            storage,
582            schema_manager,
583            allocator,
584            config,
585            xervo_runtime: OnceLock::new(),
586            property_manager,
587            adjacency_manager,
588            last_flush_time,
589            compaction_handle,
590            index_rebuild_manager,
591            cached_manifest,
592            fork_id: None,
593            fork_flush_count,
594            fork_fragment_warn_fired,
595            flush_lock,
596            flush_coordinator,
597            commit_sequence,
598            committed_writes,
599            for_update_locks,
600        })
601    }
602
603    /// Returns the shared pessimistic lock handle for a `FOR UPDATE` row key,
604    /// creating it on first use. The caller `.lock_owned().await`s the returned
605    /// mutex and holds the guard for the transaction's lifetime.
606    pub fn row_lock_handle(&self, key: &[u8]) -> Arc<tokio::sync::Mutex<()>> {
607        self.for_update_locks
608            .entry(key.to_vec())
609            .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
610            .clone()
611    }
612
613    /// Prunes `FOR UPDATE` lock-map entries for `keys` that no live transaction
614    /// holds anymore, so the map does not grow without bound across the keyspace.
615    ///
616    /// Called when a transaction ends, **after** its guards have been dropped.
617    /// `remove_if` evaluates its predicate under the DashMap shard lock, which is
618    /// the same lock `row_lock_handle` takes to clone an entry — so the check
619    /// `strong_count == 1` (only the map holds the `Arc`) is race-free: a
620    /// concurrent acquirer either already cloned the `Arc` (count ≥ 2 → we skip
621    /// removal) or has not yet taken the shard lock (it will mint a fresh entry
622    /// after we remove). Either way no two transactions ever lock different
623    /// `Mutex` instances for the same key.
624    pub fn release_for_update_locks(&self, keys: &[Vec<u8>]) {
625        for key in keys {
626            self.for_update_locks
627                .remove_if(key, |_, handle| Arc::strong_count(handle) == 1);
628        }
629    }
630
631    /// Number of live entries in the `FOR UPDATE` lock map. Introspection for
632    /// tests that the map does not leak entries across transactions (G5).
633    pub fn for_update_lock_count(&self) -> usize {
634        self.for_update_locks.len()
635    }
636
637    /// The current OCC commit sequence. A `FOR UPDATE` acquisition re-stamps a
638    /// fresh transaction's `occ_read_seq` to this so its conflict-detection
639    /// baseline advances to lock-acquisition time (read-latest under the lock).
640    pub fn current_commit_sequence(&self) -> u64 {
641        self.commit_sequence
642            .load(crate::runtime::sync::Ordering::Relaxed)
643    }
644
645    /// Build a fresh `SharedFlushCtx` from this Writer's current state.
646    /// Used by the async-flush stream/finalize paths to pass into spawned
647    /// tasks without smuggling `Arc<Writer>` (which would create a cycle
648    /// with `flush_coordinator -> FinalizeFn -> Writer`).
649    pub(crate) fn shared_ctx(&self) -> SharedFlushCtx {
650        SharedFlushCtx {
651            storage: self.storage.clone(),
652            l0_manager: self.l0_manager.clone(),
653            adjacency_manager: self.adjacency_manager.clone(),
654            property_manager: self.property_manager.clone(),
655            schema_manager: self.schema_manager.clone(),
656            cached_manifest: self.cached_manifest.clone(),
657            last_flush_time: self.last_flush_time.clone(),
658            fork_id: self.fork_id,
659            fork_flush_count: self.fork_flush_count.clone(),
660            fork_fragment_warn_fired: self.fork_fragment_warn_fired.clone(),
661            fork_fragment_warn_threshold: self.config.fork_fragment_warn_threshold,
662            flush_lock: self.flush_lock.clone(),
663            index_rebuild_manager: self.index_rebuild_manager.clone(),
664            compaction_handle: self.compaction_handle.clone(),
665            compaction_config: self.config.compaction.clone(),
666            index_rebuild_config: self.config.index_rebuild.clone(),
667            auto_rebuild_enabled: self.config.index_rebuild.auto_rebuild_enabled,
668        }
669    }
670
671    /// Borrow the flush coordinator if async flush is enabled.
672    /// Returns `None` when `config.async_flush_enabled = false`.
673    /// External callers (`drop_fork`) use this to drain pending streams.
674    pub fn flush_coordinator(
675        &self,
676    ) -> Option<&Arc<crate::runtime::flush_coordinator::FlushCoordinator>> {
677        self.flush_coordinator.as_ref()
678    }
679
680    /// Set the index rebuild manager for post-flush automatic rebuild scheduling.
681    ///
682    /// One-shot: returns `Err` if already set. The receiver is `&self` so this
683    /// can be called after the `Writer` has been wrapped in `Arc<Writer>`.
684    pub fn set_index_rebuild_manager(
685        &self,
686        manager: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
687    ) -> Result<()> {
688        self.index_rebuild_manager
689            .set(manager)
690            .map_err(|_| anyhow!("index_rebuild_manager already set"))
691    }
692
693    /// Replay WAL mutations into the current L0 buffer.
694    pub async fn replay_wal(&self, wal_high_water_mark: u64) -> Result<usize> {
695        let l0 = self.l0_manager.get_current();
696        let wal = l0.read().wal.clone();
697
698        if let Some(wal) = wal {
699            wal.initialize().await?;
700            let mutations = wal.replay_since(wal_high_water_mark).await?;
701            let count = mutations.len();
702
703            if count > 0 {
704                log::info!(
705                    "Replaying {} mutations from WAL (LSN > {})",
706                    count,
707                    wal_high_water_mark
708                );
709                let mut l0_guard = l0.write();
710                l0_guard.replay_mutations(mutations)?;
711                // Rebuild the UNIQUE constraint index over the recovered rows
712                // (Bug #9 Mechanism B). `replay_mutations` restores
713                // vertices/properties/labels but never repopulates
714                // `constraint_index` (its only other caller is the live insert
715                // path). Without this, a unique key that lives only in the WAL
716                // (committed but not yet flushed to Lance) is invisible to
717                // `check_unique_constraint_multi` after recovery and a
718                // duplicate of it could be created.
719                self.rebuild_constraint_index(&mut l0_guard);
720            }
721
722            Ok(count)
723        } else {
724            Ok(0)
725        }
726    }
727
728    /// Rebuild the UNIQUE constraint index on a recovered L0 buffer.
729    ///
730    /// Scans every recovered vertex's properties and, for each enabled UNIQUE
731    /// constraint whose target label the vertex carries and whose member
732    /// properties are all present, inserts the same constraint key the live
733    /// insert path builds (`serialize_constraint_key`). Tombstoned vertices are
734    /// skipped. Called after [`L0Buffer::replay_mutations`] under the buffer's
735    /// write lock; the schema is already loaded on the `Writer`.
736    fn rebuild_constraint_index(&self, l0_guard: &mut L0Buffer) {
737        let schema = self.schema_manager.schema();
738        // Collect entries first to avoid borrowing `vertex_properties`
739        // immutably while mutating `constraint_index` through the same guard.
740        let mut keys: Vec<(Vec<u8>, Vid)> = Vec::new();
741        for (&vid, props) in &l0_guard.vertex_properties {
742            if l0_guard.vertex_tombstones.contains(&vid) {
743                continue;
744            }
745            let Some(labels) = l0_guard.vertex_labels.get(&vid) else {
746                continue;
747            };
748            for label in labels {
749                for constraint in &schema.constraints {
750                    if !constraint.enabled {
751                        continue;
752                    }
753                    let ConstraintTarget::Label(l) = &constraint.target else {
754                        continue;
755                    };
756                    if l != label {
757                        continue;
758                    }
759                    let ConstraintType::Unique {
760                        properties: unique_props,
761                    } = &constraint.constraint_type
762                    else {
763                        continue;
764                    };
765                    let mut key_values = Vec::new();
766                    let mut all_present = true;
767                    for prop in unique_props {
768                        if let Some(val) = props.get(prop) {
769                            key_values.push((prop.clone(), val.clone()));
770                        } else {
771                            all_present = false;
772                            break;
773                        }
774                    }
775                    if all_present {
776                        keys.push((serialize_constraint_key(label, &key_values), vid));
777                    }
778                }
779            }
780        }
781        for (key, vid) in keys {
782            l0_guard.insert_constraint_key(key, vid);
783        }
784    }
785
786    /// Allocates the next VID (pure auto-increment).
787    pub async fn next_vid(&self) -> Result<Vid> {
788        self.allocator.allocate_vid().await
789    }
790
791    /// Allocates multiple VIDs at once for bulk operations.
792    /// This is more efficient than calling next_vid() in a loop.
793    pub async fn allocate_vids(&self, count: usize) -> Result<Vec<Vid>> {
794        self.allocator.allocate_vids(count).await
795    }
796
797    /// Allocates the next EID (pure auto-increment).
798    pub async fn next_eid(&self, _type_id: u32) -> Result<Eid> {
799        self.allocator.allocate_eid().await
800    }
801
802    /// Allocates multiple EIDs at once for bulk operations.
803    /// This is more efficient than calling next_eid() in a loop.
804    pub async fn allocate_eids(&self, count: usize) -> Result<Vec<Eid>> {
805        self.allocator.allocate_eids(count).await
806    }
807
808    /// Install the embedding runtime exactly once. Receiver is `&self` so it
809    /// can be called after the `Writer` has been wrapped in `Arc<Writer>`.
810    pub fn set_xervo_runtime(&self, runtime: Arc<ModelRuntime>) -> Result<()> {
811        self.xervo_runtime
812            .set(runtime)
813            .map_err(|_| anyhow!("xervo_runtime already set"))
814    }
815
816    pub fn xervo_runtime(&self) -> Option<Arc<ModelRuntime>> {
817        self.xervo_runtime.get().cloned()
818    }
819
820    /// Create a new empty L0 buffer for transaction-scoped mutations.
821    ///
822    /// Only reads the current version — no exclusive lock required on Writer.
823    /// The returned buffer has no WAL reference; mutations are logged at
824    /// commit time via [`Self::commit_transaction_l0`].
825    pub fn create_transaction_l0(&self) -> Arc<RwLock<L0Buffer>> {
826        let current_version = self.l0_manager.get_current().read().current_version;
827        // Transaction mutations are logged to WAL at COMMIT time, not during the transaction.
828        let buf = L0Buffer::new(current_version, None);
829        // SSI: stamp the OCC read sequence at begin so commit can detect any
830        // transaction that committed since. Gated on the runtime `ssi_enabled`
831        // toggle — when off, `occ_read_set` stays `None` and every downstream
832        // read-set recording / commit validation self-gates to a no-op.
833        let buf = if self.config.ssi_enabled {
834            let mut buf = buf;
835            buf.occ_read_seq = self
836                .commit_sequence
837                .load(crate::runtime::sync::Ordering::Relaxed);
838            // The read path records observed ids here for SSI antidependency
839            // detection; commit consults it.
840            buf.occ_read_set = Some(Arc::new(parking_lot::Mutex::new(
841                crate::runtime::l0::OccReadSet::default(),
842            )));
843            buf
844        } else {
845            buf
846        };
847        Arc::new(RwLock::new(buf))
848    }
849
850    /// Resolve the target L0 buffer for a mutation.
851    ///
852    /// When `tx_l0` is `Some`, the mutation targets a transaction-private buffer.
853    /// When `None`, it targets the global L0 from the manager.
854    fn resolve_l0(&self, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) -> Arc<RwLock<L0Buffer>> {
855        tx_l0
856            .cloned()
857            .unwrap_or_else(|| self.l0_manager.get_current())
858    }
859
860    fn update_metrics(&self) {
861        let l0 = self.l0_manager.get_current();
862        let size = l0.read().estimated_size;
863        metrics::gauge!("l0_buffer_size_bytes").set(size as f64);
864    }
865
866    /// Overlay-aware issue-#77 edge-endpoint validation.
867    ///
868    /// The current buffer alone does not hold all committed-but-unflushed
869    /// tombstones — a flush rotation moves them onto `pending_flush` until
870    /// the Lance write completes. A vertex is effectively deleted iff,
871    /// walking newest-first (tx → current → pending newest→oldest), the
872    /// first buffer that knows the vid says "tombstoned" (an insert clears
873    /// the tombstone within a buffer, so props/tombstone are mutually
874    /// exclusive per buffer).
875    ///
876    /// Must run under `flush_lock` so the overlay cannot change before the
877    /// merge, and BEFORE the durable WAL flush (see the call site).
878    fn validate_edge_endpoints_overlay(&self, tx_l0: &L0Buffer) -> Result<()> {
879        let chain = self.l0_manager.get_pending_flush();
880        let current = self.l0_manager.get_current();
881        let effectively_deleted = |vid: &Vid| -> bool {
882            if tx_l0.vertex_properties.contains_key(vid) {
883                return false;
884            }
885            if tx_l0.vertex_tombstones.contains(vid) {
886                return true;
887            }
888            {
889                let cur = current.read();
890                if cur.vertex_properties.contains_key(vid) {
891                    return false;
892                }
893                if cur.vertex_tombstones.contains(vid) {
894                    return true;
895                }
896            }
897            for frozen in chain.iter().rev() {
898                let g = frozen.read();
899                if g.vertex_properties.contains_key(vid) {
900                    return false;
901                }
902                if g.vertex_tombstones.contains(vid) {
903                    return true;
904                }
905            }
906            false
907        };
908        for (eid, (src_vid, dst_vid, _etype)) in &tx_l0.edge_endpoints {
909            if tx_l0.tombstones.contains_key(eid) {
910                continue; // a deletion, not an insertion — never resurrects a vertex
911            }
912            if effectively_deleted(src_vid) {
913                anyhow::bail!(
914                    "Cannot insert edge {}: source vertex {} has been deleted (issue #77)",
915                    eid,
916                    src_vid
917                );
918            }
919            if effectively_deleted(dst_vid) {
920                anyhow::bail!(
921                    "Cannot insert edge {}: destination vertex {} has been deleted (issue #77)",
922                    eid,
923                    dst_vid
924                );
925            }
926        }
927        Ok(())
928    }
929
930    /// Seed `main_l0` (the current buffer) with the newest pending-overlay
931    /// value for each CRDT property the transaction writes, so the commit
932    /// merge MERGES against the committed CRDT state instead of shadowing it
933    /// (the carve-out lets concurrent CRDT writers commit on the assumption
934    /// that the merge sees the committed value — true only while that value
935    /// lives in current, not mid-flush on `pending_flush`). No-op when
936    /// nothing is pending or the property already exists in current. Vertex
937    /// properties only, mirroring the carve-out itself.
938    fn seed_crdt_state_from_chain(&self, tx_l0: &L0Buffer, main_l0: &mut L0Buffer) {
939        let chain = self.l0_manager.get_pending_flush();
940        if chain.is_empty() {
941            return;
942        }
943        for (vid, props) in &tx_l0.vertex_properties {
944            Self::seed_crdt_props(&chain, *vid, props, main_l0);
945        }
946    }
947
948    /// Per-vertex CRDT seeding (see [`Self::seed_crdt_state_from_chain`]).
949    /// Also used by the non-transactional vertex write path, which CRDT-merges
950    /// into the current buffer directly and has the same shadowing hazard
951    /// during a flush window.
952    fn seed_crdt_props(
953        chain: &[Arc<RwLock<L0Buffer>>],
954        vid: Vid,
955        props: &Properties,
956        target: &mut L0Buffer,
957    ) {
958        for (key, value) in props {
959            if crate::runtime::l0::try_as_crdt(value).is_none() {
960                continue;
961            }
962            if target
963                .vertex_properties
964                .get(&vid)
965                .is_some_and(|p| p.contains_key(key))
966            {
967                continue;
968            }
969            // Newest generation first: the first hit is the live state.
970            for frozen in chain.iter().rev() {
971                let g = frozen.read();
972                if let Some(v) = g.vertex_properties.get(&vid).and_then(|p| p.get(key)) {
973                    if crate::runtime::l0::try_as_crdt(v).is_some() {
974                        target
975                            .vertex_properties
976                            .entry(vid)
977                            .or_default()
978                            .insert(key.clone(), v.clone());
979                    }
980                    break;
981                }
982            }
983        }
984    }
985
986    /// Commit an externally-owned transaction L0 buffer.
987    ///
988    /// Writes mutations to WAL, flushes, merges into main L0, and replays
989    /// edges into the AdjacencyManager. Returns the WAL LSN of the commit
990    /// (0 when no WAL is configured).
991    /// Commit a transaction's private L0 buffer into main L0.
992    ///
993    /// Returns `(wal_lsn, flush_pending)`. When `flush_pending == true`, the
994    /// post-commit `should_flush()` predicate fired but no flush ran — the
995    /// caller is expected to spawn a background `flush_to_l1`. This is the
996    /// shape used when `UniConfig::async_flush_enabled` is set, so commits
997    /// don't block on L1-streaming I/O.
998    pub async fn commit_transaction_l0(
999        self: &Arc<Self>,
1000        tx_l0_arc: Arc<RwLock<L0Buffer>>,
1001    ) -> Result<(u64, bool)> {
1002        // Hold `flush_lock` across WAL append + flush + main-L0 merge.
1003        // Two concurrent commits serialize here; in Phase 3 the outer
1004        // `Arc<RwLock<Writer>>` already provides this exclusion, so the
1005        // acquisition is uncontended. Phase 4 drops the outer lock and
1006        // this becomes the load-bearing serialization point.
1007        let _flush_lock_guard = self.flush_lock.lock().await;
1008
1009        // Crash-recovery seam: simulate process death immediately after winning
1010        // the commit serialization point but before any durable work. No-op
1011        // unless built with `--features failpoints`. (See ssi_resilience tests.)
1012        fail::fail_point!("commit::after-flush-lock");
1013
1014        // SSI: optimistic conflict detection. This MUST run before any WAL
1015        // write — `flush_wal()` below is the durable commit point and the WAL
1016        // has no abort marker, so aborting after it would resurrect this
1017        // transaction on crash recovery. The write-set is reused for
1018        // registration after a successful merge.
1019        // Runtime-gated on `config.ssi_enabled`. When off, no validation runs
1020        // and `occ_write_set` is `None`, so the post-merge registration below
1021        // is skipped — reproducing last-writer-wins exactly.
1022        let occ_write_set: Option<crate::runtime::occ::WriteSet> = if self.config.ssi_enabled {
1023            let tx_l0 = tx_l0_arc.read();
1024            let read_seq = tx_l0.occ_read_seq;
1025            let write_set = crate::runtime::occ::WriteSet::from_l0(&tx_l0);
1026            if !write_set.is_empty() {
1027                // Telemetry: one validation per non-empty (writing) commit. The
1028                // ratio of conflicts to validations is the headline abort rate.
1029                metrics::counter!("uni_ssi_commit_validations_total").increment(1);
1030                // Read-set is consulted only for writing transactions, so a
1031                // read-only commit (empty write-set) runs at snapshot isolation.
1032                let read_guard = tx_l0.occ_read_set.as_ref().map(|rs| rs.lock());
1033                if let Some(conflict) =
1034                    self.committed_writes
1035                        .lock()
1036                        .check(read_seq, &write_set, read_guard.as_deref())
1037                {
1038                    use crate::runtime::occ::Conflict;
1039                    match &conflict {
1040                        Conflict::WriteWrite { .. } => metrics::counter!(
1041                            "uni_ssi_serialization_conflicts_total",
1042                            "kind" => "write_write",
1043                        )
1044                        .increment(1),
1045                        Conflict::ReadWrite { .. } => metrics::counter!(
1046                            "uni_ssi_serialization_conflicts_total",
1047                            "kind" => "read_write",
1048                        )
1049                        .increment(1),
1050                        Conflict::HistoryTruncated { .. } => {
1051                            metrics::counter!("uni_ssi_history_truncated_total").increment(1)
1052                        }
1053                    }
1054                    return Err(anyhow::Error::new(
1055                        uni_common::UniError::SerializationConflict {
1056                            message: conflict.to_string(),
1057                        },
1058                    ));
1059                }
1060            }
1061
1062            // Validate against the committed-but-unflushed overlay under
1063            // `flush_lock`: serializable MERGE uniqueness + CRDT carve-out
1064            // soundness. The current buffer alone does not hold all committed
1065            // state — a flush rotation moves it onto `pending_flush` until the
1066            // Lance write completes (the Bug #9A window, here at the
1067            // commit-time layer) — so every check walks [current, pending…].
1068            {
1069                let pending = self.l0_manager.get_pending_flush();
1070                let main_l0 = self.l0_manager.get_current();
1071                let overlay: Vec<Arc<RwLock<L0Buffer>>> =
1072                    std::iter::once(main_l0).chain(pending).collect();
1073
1074                // SSI / serializable MERGE: abort if a concurrent transaction has
1075                // already committed a row with one of this transaction's unique
1076                // keys. Commits serialize here, so this closes the race window
1077                // left by the per-insert check. (Empty index → no iterations.)
1078                for (key, vid) in &tx_l0.constraint_index {
1079                    if overlay
1080                        .iter()
1081                        .any(|b| b.read().has_constraint_key(key, *vid))
1082                    {
1083                        metrics::counter!("uni_ssi_constraint_conflicts_total").increment(1);
1084                        return Err(anyhow::Error::new(
1085                            uni_common::UniError::ConstraintConflict {
1086                                message: "unique key already committed by a concurrent \
1087                                          transaction"
1088                                    .to_string(),
1089                            },
1090                        ));
1091                    }
1092                }
1093
1094                // Implicit MERGE phantom guard: a `MERGE` that *created* a node
1095                // registered its (label, key-props) here even with no declared
1096                // UNIQUE constraint. If a concurrent transaction already committed
1097                // the same MERGE key, abort retriably so the two converge to one
1098                // node on retry (the loser's MATCH then finds the committed row).
1099                // Only MERGE-creates register keys, so a plain CREATE of the same
1100                // properties never lands here. (Empty index → no iterations.)
1101                for (key, vid) in &tx_l0.merge_guard_index {
1102                    if overlay
1103                        .iter()
1104                        .any(|b| b.read().has_merge_guard_key(key, *vid))
1105                    {
1106                        metrics::counter!("uni_ssi_constraint_conflicts_total").increment(1);
1107                        return Err(anyhow::Error::new(
1108                            uni_common::UniError::ConstraintConflict {
1109                                message: "MERGE key already committed by a concurrent \
1110                                          transaction"
1111                                    .to_string(),
1112                            },
1113                        ));
1114                    }
1115                }
1116
1117                // Same race window for global ext_id uniqueness: the per-insert
1118                // check ran against an older main L0; re-probe the committed
1119                // index here, where commits serialize.
1120                for (ext_id, vid) in &tx_l0.extid_index {
1121                    let taken = overlay.iter().any(|b| {
1122                        matches!(b.read().extid_index.get(ext_id), Some(&owner) if owner != *vid)
1123                    });
1124                    if taken {
1125                        metrics::counter!("uni_ssi_constraint_conflicts_total").increment(1);
1126                        return Err(anyhow::Error::new(
1127                            uni_common::UniError::ConstraintConflict {
1128                                message: format!(
1129                                    "ext_id '{ext_id}' already committed by a concurrent \
1130                                     transaction"
1131                                ),
1132                            },
1133                        ));
1134                    }
1135                }
1136
1137                // CRDT carve-out soundness: a pure-CRDT write was dropped from the
1138                // write-set assuming its merge commutes. If the overlay holds a
1139                // *different* CRDT variant for the same property, the merge would
1140                // silently overwrite it — abort instead of losing the update.
1141                // (Checked against every overlay buffer: conservative if an old
1142                // generation held a different variant that a newer commit already
1143                // replaced, but an abort+retry is always sound.)
1144                for buf in &overlay {
1145                    if let Some(conflict) =
1146                        crate::runtime::occ::crdt_carveout_overwrite(&tx_l0, &buf.read())
1147                    {
1148                        metrics::counter!("uni_ssi_crdt_aborts_total").increment(1);
1149                        return Err(anyhow::Error::new(
1150                            uni_common::UniError::SerializationConflict {
1151                                message: conflict.to_string(),
1152                            },
1153                        ));
1154                    }
1155                }
1156            }
1157            Some(write_set)
1158        } else {
1159            None
1160        };
1161
1162        // Issue #77: an edge whose endpoint is effectively deleted makes the
1163        // merge below bail. That bail MUST happen before the durable WAL flush —
1164        // after it the transaction is committed-but-unmerged (a ghost commit),
1165        // and WAL replay re-hits the same bail, making the database unopenable.
1166        // SSI validation was deliberately placed before the flush for exactly
1167        // this reason; the endpoint check belongs here too. Runs unconditionally
1168        // (issue #77 is not SSI-gated) under `flush_lock`, so the overlay
1169        // tombstone state cannot change between here and the merge. A
1170        // tombstone may live in a flush-rotated pending buffer rather than
1171        // the current buffer, so the check walks the overlay newest-first.
1172        {
1173            let tx_l0 = tx_l0_arc.read();
1174            self.validate_edge_endpoints_overlay(&tx_l0)?;
1175        }
1176
1177        // Crash-recovery seam: SSI validation has passed; the transaction is
1178        // about to become durable. A crash here must leave NO trace (validation
1179        // happens before the WAL is touched). No-op unless `failpoints`.
1180        fail::fail_point!("commit::after-validate");
1181
1182        // 1. Write transaction mutations to WAL BEFORE merging into main L0
1183        // This ensures durability before visibility.
1184        {
1185            let tx_l0 = tx_l0_arc.read();
1186            let main_l0_arc = self.l0_manager.get_current();
1187            let main_l0 = main_l0_arc.read();
1188
1189            // If WAL exists, write mutations to it for durability
1190            if let Some(wal) = main_l0.wal.as_ref() {
1191                // Order: vertices first, then edges (to ensure src/dst exist on replay)
1192
1193                // Vertex insertions
1194                for (vid, properties) in &tx_l0.vertex_properties {
1195                    if !tx_l0.vertex_tombstones.contains(vid) {
1196                        let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
1197                        wal.append(crate::runtime::wal::Mutation::InsertVertex {
1198                            vid: *vid,
1199                            properties: properties.clone(),
1200                            labels,
1201                        })?;
1202                    }
1203                }
1204
1205                // Vertex deletions
1206                for vid in &tx_l0.vertex_tombstones {
1207                    let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
1208                    wal.append(crate::runtime::wal::Mutation::DeleteVertex { vid: *vid, labels })?;
1209                }
1210
1211                // Label-only mutations (SET n:Label / REMOVE n:Label). After
1212                // vertex inserts (so the vertex exists on replay), before edges,
1213                // and skipping vertices deleted in this same commit.
1214                for vid in &tx_l0.vertex_label_overwrites {
1215                    if tx_l0.vertex_tombstones.contains(vid) {
1216                        continue;
1217                    }
1218                    let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
1219                    wal.append(crate::runtime::wal::Mutation::SetVertexLabels {
1220                        vid: *vid,
1221                        labels,
1222                    })?;
1223                }
1224
1225                // Crash-recovery seam: vertices appended, edges not yet. Tests
1226                // assert that a crash here (before `flush_wal`) recovers NOTHING
1227                // — the durable commit point is the flush below, not append.
1228                fail::fail_point!("commit::mid-wal");
1229
1230                // Edge insertions and deletions from edge_endpoints
1231                for (eid, (src_vid, dst_vid, edge_type)) in &tx_l0.edge_endpoints {
1232                    if tx_l0.tombstones.contains_key(eid) {
1233                        let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
1234                        wal.append(crate::runtime::wal::Mutation::DeleteEdge {
1235                            eid: *eid,
1236                            src_vid: *src_vid,
1237                            dst_vid: *dst_vid,
1238                            edge_type: *edge_type,
1239                            version,
1240                        })?;
1241                    } else {
1242                        let properties =
1243                            tx_l0.edge_properties.get(eid).cloned().unwrap_or_default();
1244                        let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
1245                        let edge_type_name = tx_l0.edge_types.get(eid).cloned();
1246                        wal.append(crate::runtime::wal::Mutation::InsertEdge {
1247                            src_vid: *src_vid,
1248                            dst_vid: *dst_vid,
1249                            edge_type: *edge_type,
1250                            eid: *eid,
1251                            version,
1252                            properties,
1253                            edge_type_name,
1254                        })?;
1255                    }
1256                }
1257
1258                // Tombstones for edges that only exist in the global L0 (not in
1259                // this transaction's edge_endpoints).  Without this, deletes of
1260                // pre-existing edges would be silently lost.
1261                for (eid, tombstone) in &tx_l0.tombstones {
1262                    if !tx_l0.edge_endpoints.contains_key(eid) {
1263                        let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
1264                        wal.append(crate::runtime::wal::Mutation::DeleteEdge {
1265                            eid: *eid,
1266                            src_vid: tombstone.src_vid,
1267                            dst_vid: tombstone.dst_vid,
1268                            edge_type: tombstone.edge_type,
1269                            version,
1270                        })?;
1271                    }
1272                }
1273            }
1274        }
1275
1276        // 2. Flush WAL to durable storage - THIS IS THE COMMIT POINT
1277        let wal_lsn = self.flush_wal().await?;
1278
1279        // Crash-recovery seam: the WAL is durable but main L0 has NOT merged.
1280        // A crash here must RECOVER the transaction on replay (it is committed),
1281        // even though it was never made visible in-process. No-op unless `failpoints`.
1282        fail::fail_point!("commit::after-wal-flush");
1283
1284        // Component C1: if an outstanding snapshot pins the current generation,
1285        // clone it aside (lazy copy-on-write) before merging, so the pinning
1286        // transaction's reads stay isolated from this commit. No-op — and zero
1287        // cost — when nothing is pinned (the common case). We hold `flush_lock`,
1288        // so this cannot race a flush rotate or another commit's merge; the merge
1289        // below re-fetches `get_current()`, landing in the fresh post-freeze buffer.
1290        // Self-gates on the runtime SSI toggle: a snapshot is only ever pinned by
1291        // a transaction begun under `ssi_enabled`, so `is_current_pinned()` is
1292        // always false when SSI is off and this is a zero-cost no-op.
1293        if self.l0_manager.is_current_pinned() {
1294            self.l0_manager.freeze_current_for_snapshot();
1295            metrics::counter!("uni_l0_snapshot_freezes_total").increment(1);
1296        }
1297
1298        // 3. Merge into main L0 and make visible
1299        {
1300            // Write-lock the tx buffer: `merge_take` moves its property maps
1301            // into main L0 instead of cloning them. The commit consumes the
1302            // transaction, so the drained maps are never observed afterwards;
1303            // everything read below (endpoints, versions, tombstones) is left
1304            // intact.
1305            let mut tx_l0 = tx_l0_arc.write();
1306            let main_l0_arc = self.l0_manager.get_current();
1307            let mut main_l0 = main_l0_arc.write();
1308            // A CRDT property's committed state may live only in a
1309            // flush-rotated pending buffer (the post-rotation current is
1310            // empty until the Lance write completes — the Bug #9A window).
1311            // `merge_crdt_properties` merges against the CURRENT buffer's
1312            // value — without seeding, the tx's CRDT state would SHADOW the
1313            // pending buffer's at read time (newest buffer wins per property)
1314            // and concurrent increments would be lost. Seed the newest
1315            // overlay value for each CRDT property the tx writes that current
1316            // lacks, so the merge below merges instead of replaces.
1317            self.seed_crdt_state_from_chain(&tx_l0, &mut main_l0);
1318            main_l0.merge_take(&mut tx_l0)?;
1319
1320            // Replay transaction edges into the AdjacencyManager overlay
1321            for (eid, (src, dst, etype)) in &tx_l0.edge_endpoints {
1322                let edge_version = tx_l0
1323                    .edge_versions
1324                    .get(eid)
1325                    .copied()
1326                    .unwrap_or(main_l0.current_version);
1327                if tx_l0.tombstones.contains_key(eid) {
1328                    self.adjacency_manager
1329                        .add_tombstone(*eid, *src, *dst, *etype, edge_version);
1330                } else {
1331                    self.adjacency_manager
1332                        .insert_edge(*src, *dst, *eid, *etype, edge_version);
1333                }
1334            }
1335
1336            // Replay tombstones for edges that only exist in the global L0
1337            // (not in this transaction's edge_endpoints).
1338            for (eid, tombstone) in &tx_l0.tombstones {
1339                if !tx_l0.edge_endpoints.contains_key(eid) {
1340                    let edge_version = tx_l0
1341                        .edge_versions
1342                        .get(eid)
1343                        .copied()
1344                        .unwrap_or(main_l0.current_version);
1345                    self.adjacency_manager.add_tombstone(
1346                        *eid,
1347                        tombstone.src_vid,
1348                        tombstone.dst_vid,
1349                        tombstone.edge_type,
1350                        edge_version,
1351                    );
1352                }
1353            }
1354        }
1355
1356        // Crash-recovery seam: durable AND merged, but the in-memory commit
1357        // registry has not recorded this write-set yet. A crash here is
1358        // indistinguishable from one at `after-wal-flush` on reopen (the
1359        // registry is in-memory and rebuilt empty); the tx still recovers.
1360        fail::fail_point!("commit::after-merge");
1361
1362        // SSI: register this commit's write-set under a fresh commit sequence so
1363        // later transactions detect conflicts against it. Still under
1364        // `flush_lock`, before the async-flush branch can drop the guard.
1365        // `occ_write_set` is `Some` only when `config.ssi_enabled`.
1366        if let Some(write_set) = occ_write_set
1367            && !write_set.is_empty()
1368        {
1369            // Bump-then-record via the shared OCC seam (see `CommitRegistry::commit`)
1370            // so production and the loom/shuttle models exercise identical logic.
1371            self.committed_writes
1372                .lock()
1373                .commit(&self.commit_sequence, write_set);
1374        }
1375
1376        self.update_metrics();
1377
1378        // 4. Best-effort post-commit auto-flush.
1379        //
1380        // Two paths:
1381        // - async_flush_enabled = false (default): inline under our
1382        //   existing flush_lock guard via flush_inline_under_lock.
1383        // - async_flush_enabled = true: rotate inline, drop flush_lock,
1384        //   then submit the stream phase to the coordinator. Gated on
1385        //   `pending_flush_count() < max_pending_flushes` so we don't
1386        //   stack up rotations beyond the configured pipeline depth.
1387        //   `try_acquire_permit` is non-blocking: if we lose the race
1388        //   for the last permit, we just skip this trigger (the next
1389        //   commit retries).
1390        let mut flush_pending = false;
1391        if self.should_flush() {
1392            if self.config.async_flush_enabled
1393                && let Some(coord) = self.flush_coordinator.as_ref()
1394                && coord.pending_flush_count() < self.config.max_pending_flushes
1395            {
1396                match coord.try_acquire_permit() {
1397                    Some(permit) => {
1398                        match self.flush_l0_rotate().await {
1399                            Ok(rotate_out) => {
1400                                // Allocate the rotate seq and bump pending ONLY
1401                                // after the rotate succeeds (Bug #3). A failed
1402                                // rotate must consume neither: the finalizer
1403                                // advances strictly in consecutive seq order and
1404                                // only decrements pending on finalize, so a
1405                                // leaked seq/pending from a failed rotate would
1406                                // wedge the finalizer forever and climb pending
1407                                // toward `max_pending_flushes`. The seq is still
1408                                // allocated under `flush_lock` (immediately after
1409                                // the rotate, before the guard drops below), so
1410                                // concurrent rotates keep seq order == rotation
1411                                // order, and the seq is not used until submit.
1412                                let seq = coord.next_rotate_seq();
1413                                coord.note_pending();
1414                                // Release flush_lock BEFORE the spawn so concurrent
1415                                // commits can proceed while the stream runs.
1416                                drop(_flush_lock_guard);
1417                                let parent_manifest = self.cached_manifest.lock().clone();
1418                                let rotated = crate::runtime::flush_coordinator::RotatedFlush {
1419                                    seq,
1420                                    old_l0_arc: rotate_out.old_l0_arc.clone(),
1421                                    wal_lsn: rotate_out.wal_lsn,
1422                                    current_version: rotate_out.current_version,
1423                                    name: None,
1424                                    parent_manifest,
1425                                    permit,
1426                                    flush_in_progress_guard: rotate_out.flush_in_progress_guard,
1427                                };
1428                                let writer = self.clone();
1429                                let _ticket = coord.submit_for_stream(
1430                                    rotated,
1431                                    move |old_l0, wal, ver, n| async move {
1432                                        let outcome =
1433                                            writer.flush_stream_l1(old_l0, wal, ver, n).await?;
1434                                        Ok(crate::runtime::flush_coordinator::FlushOutcome {
1435                                            new_manifest: outcome.manifest,
1436                                            snapshot_id: outcome.snapshot_id,
1437                                        })
1438                                    },
1439                                );
1440                                flush_pending = true;
1441                                // Early return — flush_lock already dropped.
1442                                return Ok((wal_lsn, flush_pending));
1443                            }
1444                            Err(e) => {
1445                                tracing::warn!("Async rotate failed (non-critical): {}", e);
1446                                // No seq was allocated and pending was not
1447                                // bumped (both moved into the Ok arm for Bug
1448                                // #3), so the finalizer is not wedged. The
1449                                // permit drops here, freeing the slot.
1450                            }
1451                        }
1452                    }
1453                    None => {
1454                        // Race: someone else grabbed the last permit. Skip;
1455                        // next commit will retry should_flush().
1456                        metrics::counter!("uni_flush_trigger_skipped_total").increment(1);
1457                    }
1458                }
1459            } else if let Err(e) = self.flush_inline_under_lock(None).await {
1460                tracing::warn!("Post-commit flush check failed (non-critical): {}", e);
1461            }
1462        }
1463
1464        Ok((wal_lsn, flush_pending))
1465    }
1466
1467    /// Flush the WAL buffer to durable storage.
1468    ///
1469    /// Returns the LSN of the flushed segment, or `0` when no WAL is configured.
1470    pub async fn flush_wal(&self) -> Result<u64> {
1471        let l0 = self.l0_manager.get_current();
1472        let wal = l0.read().wal.clone();
1473
1474        match wal {
1475            Some(wal) => Ok(wal.flush().await?),
1476            None => Ok(0),
1477        }
1478    }
1479
1480    /// Record property removals in the active L0 mutation stats.
1481    ///
1482    /// Routes to the transaction L0 if provided, otherwise to the main L0.
1483    pub fn track_properties_removed(&self, count: usize, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) {
1484        if count == 0 {
1485            return;
1486        }
1487        let l0 = self.resolve_l0(tx_l0);
1488        l0.write().mutation_stats.properties_removed += count;
1489    }
1490
1491    /// Validates vertex constraints for the given properties.
1492    /// In the new design, label is passed as a parameter since VID no longer embeds label.
1493    async fn validate_vertex_constraints_for_label(
1494        &self,
1495        vid: Vid,
1496        properties: &Properties,
1497        label: &str,
1498        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1499    ) -> Result<()> {
1500        self.validate_vertex_constraints_for_label_impl(vid, properties, label, tx_l0, false)
1501            .await
1502    }
1503
1504    /// Partial-update sibling: validates only constraints touching keys
1505    /// present in `properties` (the touched set). NOT NULL is checked
1506    /// only for touched keys; multi-key UNIQUE / CHECK / EXISTS are
1507    /// skipped when any referenced key is absent (the caller is
1508    /// expected to have routed to the full-row path in that case via
1509    /// `touched_needs_full_read`).
1510    async fn validate_vertex_constraints_for_label_partial(
1511        &self,
1512        vid: Vid,
1513        properties: &Properties,
1514        label: &str,
1515        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1516    ) -> Result<()> {
1517        self.validate_vertex_constraints_for_label_impl(vid, properties, label, tx_l0, true)
1518            .await
1519    }
1520
1521    async fn validate_vertex_constraints_for_label_impl(
1522        &self,
1523        vid: Vid,
1524        properties: &Properties,
1525        label: &str,
1526        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1527        partial: bool,
1528    ) -> Result<()> {
1529        let schema = self.schema_manager.schema();
1530
1531        {
1532            // 1. Check NOT NULL constraints (from Property definitions).
1533            //    Under partial-update mode, skip properties NOT in
1534            //    `properties` — they retain their previous (already-
1535            //    validated) value.
1536            if let Some(props_meta) = schema.properties.get(label) {
1537                for (prop_name, meta) in props_meta {
1538                    if !meta.nullable {
1539                        let present = properties.get(prop_name);
1540                        if partial && present.is_none() {
1541                            continue;
1542                        }
1543                        if present.is_none_or(|v| v.is_null()) {
1544                            log::warn!(
1545                                "Constraint violation: Property '{}' cannot be null for label '{}'",
1546                                prop_name,
1547                                label
1548                            );
1549                            return Err(anyhow!(
1550                                "Constraint violation: Property '{}' cannot be null",
1551                                prop_name
1552                            ));
1553                        }
1554                    }
1555                }
1556            }
1557
1558            // 2. Check Explicit Constraints (Unique, Check, etc.)
1559            for constraint in &schema.constraints {
1560                if !constraint.enabled {
1561                    continue;
1562                }
1563                match &constraint.target {
1564                    ConstraintTarget::Label(l) if l == label => {}
1565                    _ => continue,
1566                }
1567
1568                match &constraint.constraint_type {
1569                    ConstraintType::Unique {
1570                        properties: unique_props,
1571                    } => {
1572                        // Support single and multi-property unique constraints
1573                        if !unique_props.is_empty() {
1574                            let mut key_values = Vec::new();
1575                            let mut missing = false;
1576                            for prop in unique_props {
1577                                if let Some(val) = properties.get(prop) {
1578                                    key_values.push((prop.clone(), val.clone()));
1579                                } else {
1580                                    missing = true; // Can't enforce if property missing (partial update?)
1581                                    // For INSERT, missing means null?
1582                                    // If property is nullable, unique constraint typically allows multiple nulls or ignores?
1583                                    // For now, only check if ALL keys are present
1584                                }
1585                            }
1586
1587                            if !missing {
1588                                self.check_unique_constraint_multi(label, &key_values, vid, tx_l0)
1589                                    .await?;
1590                            }
1591                        }
1592                    }
1593                    ConstraintType::Exists { property } => {
1594                        if properties.get(property).is_none_or(|v| v.is_null()) {
1595                            log::warn!(
1596                                "Constraint violation: Property '{}' must exist for label '{}'",
1597                                property,
1598                                label
1599                            );
1600                            return Err(anyhow!(
1601                                "Constraint violation: Property '{}' must exist",
1602                                property
1603                            ));
1604                        }
1605                    }
1606                    ConstraintType::Check { expression } => {
1607                        if !self.evaluate_check_constraint(expression, properties)? {
1608                            return Err(anyhow!(
1609                                "CHECK constraint '{}' violated: expression '{}' evaluated to false",
1610                                constraint.name,
1611                                expression
1612                            ));
1613                        }
1614                    }
1615                    _ => {
1616                        return Err(anyhow!("Unsupported constraint type"));
1617                    }
1618                }
1619            }
1620        }
1621        Ok(())
1622    }
1623
1624    /// Validates vertex constraints for a vertex with the given labels.
1625    /// Labels must be passed explicitly since the vertex may not yet be in L0.
1626    /// Unknown labels (not in schema) are skipped.
1627    async fn validate_vertex_constraints(
1628        &self,
1629        vid: Vid,
1630        properties: &Properties,
1631        labels: &[String],
1632        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1633    ) -> Result<()> {
1634        let schema = self.schema_manager.schema();
1635
1636        // Validate constraints only for known labels
1637        for label in labels {
1638            // Skip unknown labels (schemaless support)
1639            if schema.get_label_case_insensitive(label).is_none() {
1640                continue;
1641            }
1642            self.validate_vertex_constraints_for_label(vid, properties, label, tx_l0)
1643                .await?;
1644        }
1645
1646        // Check global ext_id uniqueness if ext_id is provided
1647        if let Some(ext_id) = properties.get("ext_id").and_then(|v| v.as_str()) {
1648            self.check_extid_globally_unique(ext_id, vid, tx_l0).await?;
1649        }
1650
1651        Ok(())
1652    }
1653
1654    /// Partial sibling of `validate_vertex_constraints` — validates only
1655    /// constraints touching keys present in `properties`. Used by
1656    /// `insert_vertex_partial`'s fast path; the caller pre-screens for
1657    /// multi-key UNIQUE constraints via `touched_needs_full_read`.
1658    async fn validate_vertex_constraints_partial(
1659        &self,
1660        vid: Vid,
1661        touched: &Properties,
1662        labels: &[String],
1663        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1664    ) -> Result<()> {
1665        let schema = self.schema_manager.schema();
1666        for label in labels {
1667            if schema.get_label_case_insensitive(label).is_none() {
1668                continue;
1669            }
1670            self.validate_vertex_constraints_for_label_partial(vid, touched, label, tx_l0)
1671                .await?;
1672        }
1673        if let Some(ext_id) = touched.get("ext_id").and_then(|v| v.as_str()) {
1674            self.check_extid_globally_unique(ext_id, vid, tx_l0).await?;
1675        }
1676        Ok(())
1677    }
1678
1679    /// Collect ext_ids and unique constraint keys from an iterator of vertex properties.
1680    ///
1681    /// Used to build a constraint key index from L0 buffers for batch validation.
1682    fn collect_constraint_keys_from_properties<'a>(
1683        properties_iter: impl Iterator<Item = &'a Properties>,
1684        label: &str,
1685        constraints: &[uni_common::core::schema::Constraint],
1686        existing_keys: &mut HashMap<String, HashSet<String>>,
1687        existing_extids: &mut HashSet<String>,
1688    ) {
1689        for props in properties_iter {
1690            if let Some(ext_id) = props.get("ext_id").and_then(|v| v.as_str()) {
1691                existing_extids.insert(ext_id.to_string());
1692            }
1693
1694            for constraint in constraints {
1695                if !constraint.enabled {
1696                    continue;
1697                }
1698                if let ConstraintTarget::Label(l) = &constraint.target {
1699                    if l != label {
1700                        continue;
1701                    }
1702                } else {
1703                    continue;
1704                }
1705
1706                if let ConstraintType::Unique {
1707                    properties: unique_props,
1708                } = &constraint.constraint_type
1709                {
1710                    let mut key_parts = Vec::new();
1711                    let mut all_present = true;
1712                    for prop in unique_props {
1713                        if let Some(val) = props.get(prop) {
1714                            key_parts.push(format!("{}:{}", prop, val));
1715                        } else {
1716                            all_present = false;
1717                            break;
1718                        }
1719                    }
1720                    if all_present {
1721                        let key = key_parts.join("|");
1722                        existing_keys
1723                            .entry(constraint.name.clone())
1724                            .or_default()
1725                            .insert(key);
1726                    }
1727                }
1728            }
1729        }
1730    }
1731
1732    /// Validates constraints for a batch of vertices efficiently.
1733    ///
1734    /// This method builds an in-memory index from L0 buffers ONCE instead of scanning
1735    /// per vertex, reducing complexity from O(n²) to O(n) for bulk inserts.
1736    ///
1737    /// # Arguments
1738    /// * `vids` - VIDs of vertices being inserted
1739    /// * `properties_batch` - Properties for each vertex
1740    /// * `label` - Label for all vertices (assumes single label for now)
1741    ///
1742    /// # Performance
1743    /// For N vertices with unique constraints:
1744    /// - Old approach: O(N²) - scan L0 buffer N times
1745    /// - New approach: O(N) - scan L0 buffer once, build HashSet, check each vertex in O(1)
1746    async fn validate_vertex_batch_constraints(
1747        &self,
1748        vids: &[Vid],
1749        properties_batch: &[Properties],
1750        label: &str,
1751        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1752    ) -> Result<()> {
1753        if vids.len() != properties_batch.len() {
1754            return Err(anyhow!("VID/properties length mismatch"));
1755        }
1756
1757        let schema = self.schema_manager.schema();
1758
1759        // 1. Validate NOT NULL constraints for each vertex
1760        if let Some(props_meta) = schema.properties.get(label) {
1761            for (idx, properties) in properties_batch.iter().enumerate() {
1762                for (prop_name, meta) in props_meta {
1763                    if !meta.nullable && properties.get(prop_name).is_none_or(|v| v.is_null()) {
1764                        return Err(anyhow!(
1765                            "Constraint violation at index {}: Property '{}' cannot be null",
1766                            idx,
1767                            prop_name
1768                        ));
1769                    }
1770                }
1771            }
1772        }
1773
1774        // 2. Build constraint key index from L0 buffers (ONCE for entire batch)
1775        let mut existing_keys: HashMap<String, HashSet<String>> = HashMap::new();
1776        let mut existing_extids: HashSet<String> = HashSet::new();
1777
1778        // Scan current L0 buffer
1779        {
1780            let l0 = self.l0_manager.get_current();
1781            let l0_guard = l0.read();
1782            Self::collect_constraint_keys_from_properties(
1783                l0_guard.vertex_properties.values(),
1784                label,
1785                &schema.constraints,
1786                &mut existing_keys,
1787                &mut existing_extids,
1788            );
1789        }
1790
1791        // Scan transaction L0 if present
1792        if let Some(tx_l0) = tx_l0 {
1793            let tx_l0_guard = tx_l0.read();
1794            Self::collect_constraint_keys_from_properties(
1795                tx_l0_guard.vertex_properties.values(),
1796                label,
1797                &schema.constraints,
1798                &mut existing_keys,
1799                &mut existing_extids,
1800            );
1801        }
1802
1803        // 3. Check batch vertices against index AND check for duplicates within batch
1804        let mut batch_keys: HashMap<String, HashMap<String, usize>> = HashMap::new();
1805        let mut batch_extids: HashMap<String, usize> = HashMap::new();
1806
1807        for (idx, (_vid, properties)) in vids.iter().zip(properties_batch.iter()).enumerate() {
1808            // Check ext_id uniqueness
1809            if let Some(ext_id) = properties.get("ext_id").and_then(|v| v.as_str()) {
1810                if existing_extids.contains(ext_id) {
1811                    return Err(anyhow!(
1812                        "Constraint violation at index {}: ext_id '{}' already exists",
1813                        idx,
1814                        ext_id
1815                    ));
1816                }
1817                if let Some(first_idx) = batch_extids.get(ext_id) {
1818                    return Err(anyhow!(
1819                        "Constraint violation: ext_id '{}' duplicated in batch at indices {} and {}",
1820                        ext_id,
1821                        first_idx,
1822                        idx
1823                    ));
1824                }
1825                // Also check the main vertices table — the L0 scans above
1826                // miss vertices already flushed to L1, so without this a
1827                // batch insert (e.g. a fork promote onto primary) silently
1828                // twins a duplicate ext_id instead of erroring. Mirrors the
1829                // single-vertex `check_extid_globally_unique`.
1830                if let Ok(Some(found_vid)) =
1831                    MainVertexDataset::find_by_ext_id(self.storage.backend(), ext_id, None).await
1832                {
1833                    return Err(anyhow!(
1834                        "Constraint violation at index {}: ext_id '{}' already exists (vertex {:?})",
1835                        idx,
1836                        ext_id,
1837                        found_vid
1838                    ));
1839                }
1840                batch_extids.insert(ext_id.to_string(), idx);
1841            }
1842
1843            // Check unique constraints
1844            for constraint in &schema.constraints {
1845                if !constraint.enabled {
1846                    continue;
1847                }
1848                if let ConstraintTarget::Label(l) = &constraint.target {
1849                    if l != label {
1850                        continue;
1851                    }
1852                } else {
1853                    continue;
1854                }
1855
1856                match &constraint.constraint_type {
1857                    ConstraintType::Unique {
1858                        properties: unique_props,
1859                    } => {
1860                        let mut key_parts = Vec::new();
1861                        let mut all_present = true;
1862                        for prop in unique_props {
1863                            if let Some(val) = properties.get(prop) {
1864                                key_parts.push(format!("{}:{}", prop, val));
1865                            } else {
1866                                all_present = false;
1867                                break;
1868                            }
1869                        }
1870
1871                        if all_present {
1872                            let key = key_parts.join("|");
1873
1874                            // Check against existing L0 keys
1875                            if let Some(keys) = existing_keys.get(&constraint.name)
1876                                && keys.contains(&key)
1877                            {
1878                                return Err(anyhow!(
1879                                    "Constraint violation at index {}: Duplicate composite key for label '{}' (constraint '{}')",
1880                                    idx,
1881                                    label,
1882                                    constraint.name
1883                                ));
1884                            }
1885
1886                            // Check for duplicates within batch
1887                            let batch_constraint_keys =
1888                                batch_keys.entry(constraint.name.clone()).or_default();
1889                            if let Some(first_idx) = batch_constraint_keys.get(&key) {
1890                                return Err(anyhow!(
1891                                    "Constraint violation: Duplicate key '{}' in batch at indices {} and {}",
1892                                    key,
1893                                    first_idx,
1894                                    idx
1895                                ));
1896                            }
1897                            batch_constraint_keys.insert(key, idx);
1898                        }
1899                    }
1900                    ConstraintType::Exists { property }
1901                        if properties.get(property).is_none_or(|v| v.is_null()) =>
1902                    {
1903                        return Err(anyhow!(
1904                            "Constraint violation at index {}: Property '{}' must exist",
1905                            idx,
1906                            property
1907                        ));
1908                    }
1909                    ConstraintType::Check { expression }
1910                        if !self.evaluate_check_constraint(expression, properties)? =>
1911                    {
1912                        return Err(anyhow!(
1913                            "Constraint violation at index {}: CHECK constraint '{}' violated",
1914                            idx,
1915                            constraint.name
1916                        ));
1917                    }
1918                    _ => {}
1919                }
1920            }
1921        }
1922
1923        // 4. Check storage for unique constraints (can batch this into a single query)
1924        for constraint in &schema.constraints {
1925            if !constraint.enabled {
1926                continue;
1927            }
1928            if let ConstraintTarget::Label(l) = &constraint.target {
1929                if l != label {
1930                    continue;
1931                }
1932            } else {
1933                continue;
1934            }
1935
1936            if let ConstraintType::Unique {
1937                properties: unique_props,
1938            } = &constraint.constraint_type
1939            {
1940                // Build compound OR filter for all batch vertices
1941                let mut or_filters = Vec::new();
1942                for properties in properties_batch.iter() {
1943                    let mut and_parts = Vec::new();
1944                    let mut all_present = true;
1945                    for prop in unique_props {
1946                        if let Some(val) = properties.get(prop) {
1947                            let val_str = match val {
1948                                Value::String(s) => format!("'{}'", s.replace('\'', "''")),
1949                                Value::Int(n) => n.to_string(),
1950                                Value::Float(f) => f.to_string(),
1951                                Value::Bool(b) => b.to_string(),
1952                                _ => {
1953                                    all_present = false;
1954                                    break;
1955                                }
1956                            };
1957                            and_parts.push(format!("{} = {}", prop, val_str));
1958                        } else {
1959                            all_present = false;
1960                            break;
1961                        }
1962                    }
1963                    if all_present {
1964                        or_filters.push(format!("({})", and_parts.join(" AND ")));
1965                    }
1966                }
1967
1968                #[cfg(feature = "lance-backend")]
1969                if !or_filters.is_empty() {
1970                    let vid_list: Vec<String> =
1971                        vids.iter().map(|v| v.as_u64().to_string()).collect();
1972                    let filter = format!(
1973                        "({}) AND _deleted = false AND _vid NOT IN ({})",
1974                        or_filters.join(" OR "),
1975                        vid_list.join(", ")
1976                    );
1977
1978                    // Count flushed duplicates through the `StorageBackend`
1979                    // (branch-aware, correct `.lance` path). A missing table
1980                    // means nothing is flushed yet — the L0/pending/tx checks
1981                    // above already covered in-memory rows — so skip cleanly;
1982                    // any other backend error must abort the write rather than
1983                    // silently fail open (the prior `open_raw()` foot-gun).
1984                    let backend = self.storage.backend();
1985                    let table = table_names::vertex_table_name(label);
1986                    if backend.table_exists(&table).await? {
1987                        let count = backend.count_rows(&table, Some(filter.as_str())).await?;
1988                        if count > 0 {
1989                            return Err(anyhow!(
1990                                "Constraint violation: Duplicate composite key for label '{}' in storage (constraint '{}')",
1991                                label,
1992                                constraint.name
1993                            ));
1994                        }
1995                    }
1996                }
1997            }
1998        }
1999
2000        Ok(())
2001    }
2002
2003    /// Checks that ext_id is globally unique across all vertices.
2004    ///
2005    /// Searches L0 buffers (current, transaction, pending) and the main vertices table
2006    /// to ensure no other vertex uses this ext_id.
2007    ///
2008    /// # Errors
2009    ///
2010    /// Returns error if another vertex with the same ext_id exists.
2011    async fn check_extid_globally_unique(
2012        &self,
2013        ext_id: &str,
2014        current_vid: Vid,
2015        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2016    ) -> Result<()> {
2017        // Check L0 buffers: current, transaction, and pending flush
2018        let l0_buffers_to_check: Vec<Arc<RwLock<L0Buffer>>> = {
2019            let mut buffers = vec![self.l0_manager.get_current()];
2020            if let Some(tx_l0) = tx_l0 {
2021                buffers.push(tx_l0.clone());
2022            }
2023            buffers.extend(self.l0_manager.get_pending_flush());
2024            buffers
2025        };
2026
2027        for l0 in &l0_buffers_to_check {
2028            // O(1) per buffer via the maintained `extid_index` (the previous
2029            // full `vertex_properties` scan made constrained ingest O(n²)).
2030            if let Some(&vid) = l0.read().extid_index.get(ext_id)
2031                && vid != current_vid
2032            {
2033                return Err(anyhow!(
2034                    "Constraint violation: ext_id '{}' already exists (vertex {:?})",
2035                    ext_id,
2036                    vid
2037                ));
2038            }
2039        }
2040
2041        // Check main vertices table (if it exists)
2042        // Pass None for global uniqueness check (not snapshot-isolated)
2043        let backend = self.storage.backend();
2044        if let Ok(Some(found_vid)) = MainVertexDataset::find_by_ext_id(backend, ext_id, None).await
2045            && found_vid != current_vid
2046        {
2047            return Err(anyhow!(
2048                "Constraint violation: ext_id '{}' already exists (vertex {:?})",
2049                ext_id,
2050                found_vid
2051            ));
2052        }
2053
2054        Ok(())
2055    }
2056
2057    /// Helper to get vertex labels from L0 buffer.
2058    fn get_vertex_labels_from_l0(&self, vid: Vid) -> Option<Vec<String>> {
2059        let l0 = self.l0_manager.get_current();
2060        let l0_guard = l0.read();
2061        // Check if vertex is tombstoned (deleted) - if so, return None
2062        if l0_guard.vertex_tombstones.contains(&vid) {
2063            return None;
2064        }
2065        l0_guard.get_vertex_labels(vid).map(|l| l.to_vec())
2066    }
2067
2068    /// Get vertex labels from all sources: current L0, pending L0s, and storage.
2069    /// This is the proper way to read vertex labels after a flush, as it checks both
2070    /// in-memory buffers and persisted storage.
2071    pub async fn get_vertex_labels(
2072        &self,
2073        vid: Vid,
2074        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2075    ) -> Option<Vec<String>> {
2076        // 1. Check current L0
2077        if let Some(labels) = self.get_vertex_labels_from_l0(vid) {
2078            return Some(labels);
2079        }
2080
2081        // 2. Check transaction L0 if present
2082        if let Some(tx_l0) = tx_l0 {
2083            let guard = tx_l0.read();
2084            if guard.vertex_tombstones.contains(&vid) {
2085                return None;
2086            }
2087            if let Some(labels) = guard.get_vertex_labels(vid) {
2088                return Some(labels.to_vec());
2089            }
2090        }
2091
2092        // 3. Check pending flush L0s
2093        for pending_l0 in self.l0_manager.get_pending_flush() {
2094            let guard = pending_l0.read();
2095            if guard.vertex_tombstones.contains(&vid) {
2096                return None;
2097            }
2098            if let Some(labels) = guard.get_vertex_labels(vid) {
2099                return Some(labels.to_vec());
2100            }
2101        }
2102
2103        // 4. Check storage
2104        self.find_vertex_labels_in_storage(vid).await.ok().flatten()
2105    }
2106
2107    /// Helper to get edge type from L0 buffer.
2108    fn get_edge_type_from_l0(&self, eid: Eid) -> Option<String> {
2109        let l0 = self.l0_manager.get_current();
2110        let l0_guard = l0.read();
2111        l0_guard.get_edge_type(eid).map(|s| s.to_string())
2112    }
2113
2114    /// Look up the edge type ID (u32) for an EID from the L0 buffer's edge endpoints.
2115    /// Falls back to the transaction L0 if available.
2116    pub fn get_edge_type_id_from_l0(
2117        &self,
2118        eid: Eid,
2119        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2120    ) -> Option<u32> {
2121        // Check transaction L0 first
2122        if let Some(tx_l0) = tx_l0 {
2123            let guard = tx_l0.read();
2124            if let Some((_, _, etype)) = guard.get_edge_endpoint_full(eid) {
2125                return Some(etype);
2126            }
2127        }
2128        // Fall back to main L0
2129        let l0 = self.l0_manager.get_current();
2130        let l0_guard = l0.read();
2131        l0_guard
2132            .get_edge_endpoint_full(eid)
2133            .map(|(_, _, etype)| etype)
2134    }
2135
2136    /// Set the type name for an edge (used for schemaless edge types).
2137    /// This is called during CREATE for edge types not found in the schema.
2138    pub fn set_edge_type(
2139        &self,
2140        eid: Eid,
2141        type_name: String,
2142        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2143    ) {
2144        self.resolve_l0(tx_l0).write().set_edge_type(eid, type_name);
2145    }
2146
2147    /// Evaluate a simple CHECK constraint expression.
2148    /// Supports: "property op value" (e.g., "age > 18", "status = 'active'")
2149    fn evaluate_check_constraint(&self, expression: &str, properties: &Properties) -> Result<bool> {
2150        let parts: Vec<&str> = expression.split_whitespace().collect();
2151        if parts.len() != 3 {
2152            // For now, only support "prop op val"
2153            // Fallback to true if too complex to avoid breaking, but warn
2154            log::warn!(
2155                "Complex CHECK constraint expression '{}' not fully supported yet; allowing write.",
2156                expression
2157            );
2158            return Ok(true);
2159        }
2160
2161        let prop_part = parts[0].trim_start_matches('(');
2162        // Handle "variable.property" format - take the part after the dot
2163        let prop_name = if let Some(idx) = prop_part.find('.') {
2164            &prop_part[idx + 1..]
2165        } else {
2166            prop_part
2167        };
2168
2169        let op = parts[1];
2170        let val_str = parts[2].trim_end_matches(')');
2171
2172        let prop_val = match properties.get(prop_name) {
2173            Some(v) => v,
2174            None => return Ok(true), // If property missing, CHECK usually passes (unless NOT NULL)
2175        };
2176
2177        // Parse value string (handle quotes for strings)
2178        let target_val = if (val_str.starts_with('\'') && val_str.ends_with('\''))
2179            || (val_str.starts_with('"') && val_str.ends_with('"'))
2180        {
2181            Value::String(val_str[1..val_str.len() - 1].to_string())
2182        } else if let Ok(n) = val_str.parse::<i64>() {
2183            Value::Int(n)
2184        } else if let Ok(n) = val_str.parse::<f64>() {
2185            Value::Float(n)
2186        } else if let Ok(b) = val_str.parse::<bool>() {
2187            Value::Bool(b)
2188        } else {
2189            // Check for internal format wrappers if they somehow leaked through
2190            if val_str.starts_with("Number(") && val_str.ends_with(')') {
2191                let n_str = &val_str[7..val_str.len() - 1];
2192                if let Ok(n) = n_str.parse::<i64>() {
2193                    Value::Int(n)
2194                } else if let Ok(n) = n_str.parse::<f64>() {
2195                    Value::Float(n)
2196                } else {
2197                    Value::String(val_str.to_string())
2198                }
2199            } else {
2200                Value::String(val_str.to_string())
2201            }
2202        };
2203
2204        match op {
2205            "=" | "==" => Ok(prop_val == &target_val),
2206            "!=" | "<>" => Ok(prop_val != &target_val),
2207            ">" => self
2208                .compare_values(prop_val, &target_val)
2209                .map(|o| o.is_gt()),
2210            "<" => self
2211                .compare_values(prop_val, &target_val)
2212                .map(|o| o.is_lt()),
2213            ">=" => self
2214                .compare_values(prop_val, &target_val)
2215                .map(|o| o.is_ge()),
2216            "<=" => self
2217                .compare_values(prop_val, &target_val)
2218                .map(|o| o.is_le()),
2219            _ => {
2220                log::warn!("Unsupported operator '{}' in CHECK constraint", op);
2221                Ok(true)
2222            }
2223        }
2224    }
2225
2226    fn compare_values(&self, a: &Value, b: &Value) -> Result<std::cmp::Ordering> {
2227        use std::cmp::Ordering;
2228
2229        fn cmp_f64(x: f64, y: f64) -> Ordering {
2230            x.partial_cmp(&y).unwrap_or(Ordering::Equal)
2231        }
2232
2233        match (a, b) {
2234            (Value::Int(n1), Value::Int(n2)) => Ok(n1.cmp(n2)),
2235            (Value::Float(f1), Value::Float(f2)) => Ok(cmp_f64(*f1, *f2)),
2236            (Value::Int(n), Value::Float(f)) => Ok(cmp_f64(*n as f64, *f)),
2237            (Value::Float(f), Value::Int(n)) => Ok(cmp_f64(*f, *n as f64)),
2238            (Value::String(s1), Value::String(s2)) => Ok(s1.cmp(s2)),
2239            _ => Err(anyhow!(
2240                "Cannot compare incompatible types: {:?} vs {:?}",
2241                a,
2242                b
2243            )),
2244        }
2245    }
2246
2247    async fn check_unique_constraint_multi(
2248        &self,
2249        label: &str,
2250        key_values: &[(String, Value)],
2251        current_vid: Vid,
2252        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2253    ) -> Result<()> {
2254        // Serialize constraint key once for O(1) lookups
2255        let key = serialize_constraint_key(label, key_values);
2256
2257        // 1. Check L0 (in-memory) using O(1) constraint index
2258        {
2259            let l0 = self.l0_manager.get_current();
2260            let l0_guard = l0.read();
2261            if l0_guard.has_constraint_key(&key, current_vid) {
2262                return Err(anyhow!(
2263                    "Constraint violation: Duplicate composite key for label '{}'",
2264                    label
2265                ));
2266            }
2267        }
2268
2269        // 1b. Check pending-flush buffers (Bug #9A). A flush rotates a key's
2270        // buffer onto `pending_flush` and installs a fresh empty current
2271        // buffer; until the rotated rows reach Lance the key is invisible to
2272        // both the current-buffer check above and the storage check below, so
2273        // a duplicate could slip through that flush window. Mirror the read
2274        // paths (e.g. `check_extid_globally_unique`, `get_vertex_labels`) that
2275        // already consult `pending_flush`.
2276        for pending_l0 in self.l0_manager.get_pending_flush() {
2277            if pending_l0.read().has_constraint_key(&key, current_vid) {
2278                return Err(anyhow!(
2279                    "Constraint violation: Duplicate composite key for label '{}' (in pending flush)",
2280                    label
2281                ));
2282            }
2283        }
2284
2285        // Check Transaction L0
2286        if let Some(tx_l0) = tx_l0 {
2287            let tx_l0_guard = tx_l0.read();
2288            if tx_l0_guard.has_constraint_key(&key, current_vid) {
2289                return Err(anyhow!(
2290                    "Constraint violation: Duplicate composite key for label '{}' (in tx)",
2291                    label
2292                ));
2293            }
2294        }
2295
2296        // 2. Check Storage (L1/L2)
2297        let filters: Vec<String> = key_values
2298            .iter()
2299            .map(|(prop, val)| {
2300                let val_str = match val {
2301                    Value::String(s) => format!("'{}'", s.replace('\'', "''")),
2302                    Value::Int(n) => n.to_string(),
2303                    Value::Float(f) => f.to_string(),
2304                    Value::Bool(b) => b.to_string(),
2305                    _ => "NULL".to_string(),
2306                };
2307                format!("{} = {}", prop, val_str)
2308            })
2309            .collect();
2310
2311        let mut filter = filters.join(" AND ");
2312        filter.push_str(&format!(
2313            " AND _deleted = false AND _vid != {}",
2314            current_vid.as_u64()
2315        ));
2316
2317        // 2. Check Storage (L1/L2) through the `StorageBackend` (branch-aware,
2318        // correct `.lance` path). Skip cleanly when the table is not yet
2319        // flushed; propagate any real backend error instead of failing open.
2320        #[cfg(feature = "lance-backend")]
2321        {
2322            let backend = self.storage.backend();
2323            let table = table_names::vertex_table_name(label);
2324            if backend.table_exists(&table).await? {
2325                let count = backend.count_rows(&table, Some(filter.as_str())).await?;
2326                if count > 0 {
2327                    return Err(anyhow!(
2328                        "Constraint violation: Duplicate composite key for label '{}' (in storage). Filter: {}",
2329                        label,
2330                        filter
2331                    ));
2332                }
2333            }
2334        }
2335
2336        Ok(())
2337    }
2338
2339    async fn check_write_pressure(&self) -> Result<()> {
2340        let status = self
2341            .storage
2342            .compaction_status()
2343            .map_err(|e| anyhow::anyhow!("Failed to get compaction status: {}", e))?;
2344        let l1_runs = status.l1_runs;
2345        let throttle = &self.config.throttle;
2346
2347        if l1_runs >= throttle.hard_limit {
2348            log::warn!("Write stalled: L1 runs ({}) at hard limit", l1_runs);
2349            // Simple polling for now
2350            while self
2351                .storage
2352                .compaction_status()
2353                .map_err(|e| anyhow::anyhow!("Failed to get compaction status: {}", e))?
2354                .l1_runs
2355                >= throttle.hard_limit
2356            {
2357                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2358            }
2359        } else if l1_runs >= throttle.soft_limit {
2360            let excess = l1_runs - throttle.soft_limit;
2361            // Cap multiplier to avoid overflow
2362            let excess = std::cmp::min(excess, 31);
2363            let multiplier = 2_u32.pow(excess as u32);
2364            let delay = throttle.base_delay * multiplier;
2365            tokio::time::sleep(delay).await;
2366        }
2367        Ok(())
2368    }
2369
2370    /// Check transaction memory limit to prevent OOM.
2371    /// No-op when no transaction is active.
2372    fn check_transaction_memory(&self, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) -> Result<()> {
2373        if let Some(tx_l0) = tx_l0 {
2374            let size = tx_l0.read().estimated_size;
2375            if size > self.config.max_transaction_memory {
2376                return Err(anyhow!(
2377                    "Transaction memory limit exceeded: {} bytes used, limit is {} bytes. \
2378                     Roll back or commit the current transaction.",
2379                    size,
2380                    self.config.max_transaction_memory
2381                ));
2382            }
2383        }
2384        Ok(())
2385    }
2386
2387    async fn get_query_context(
2388        &self,
2389        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2390    ) -> Option<QueryContext> {
2391        Some(QueryContext::new_with_pending(
2392            self.l0_manager.get_current(),
2393            tx_l0.cloned(),
2394            self.l0_manager.get_pending_flush(),
2395        ))
2396    }
2397
2398    /// Layer-1 CRDT variant enforcement, shared by the single-vertex and batch
2399    /// write paths.
2400    ///
2401    /// Rejects a declared CRDT property written as a parsed CRDT value
2402    /// (`Value::Map`) whose variant differs from the schema's declared variant.
2403    /// A mismatch would make the commit-time merge silently overwrite instead of
2404    /// merge, and the OCC CRDT carve-out (`occ::crdt_carveout_overwrite` /
2405    /// `WriteSet::from_l0`) would hide it as a lost update — so it must be caught
2406    /// at write time, on *every* write path. `try_as_crdt` is `Map`-gated, so the
2407    /// JSON-string (Cypher) form and non-CRDT values pass through untouched: they
2408    /// are never carved out and stay conflictable.
2409    fn enforce_crdt_variants(
2410        props_meta: &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
2411        properties: &Properties,
2412    ) -> Result<()> {
2413        for (key, value) in properties {
2414            let Some(meta) = props_meta.get(key) else {
2415                continue;
2416            };
2417            let uni_common::core::schema::DataType::Crdt(expected) = &meta.r#type else {
2418                continue;
2419            };
2420            if let Some(crdt) = crate::runtime::l0::try_as_crdt(value)
2421                && crdt.type_name() != expected.type_name()
2422            {
2423                return Err(anyhow::Error::new(uni_common::UniError::Constraint {
2424                    message: format!(
2425                        "CRDT property '{key}' must be written as a {} value",
2426                        expected.type_name()
2427                    ),
2428                }));
2429            }
2430        }
2431        Ok(())
2432    }
2433
2434    /// Prepare a vertex for upsert by merging CRDT properties with existing values.
2435    ///
2436    /// When `label` is provided, uses it directly to look up property metadata.
2437    /// Otherwise falls back to discovering the label from L0 buffers and storage.
2438    ///
2439    /// # Errors
2440    ///
2441    /// Returns an error if CRDT property merging fails.
2442    async fn prepare_vertex_upsert(
2443        &self,
2444        vid: Vid,
2445        properties: &mut Properties,
2446        label: Option<&str>,
2447        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2448    ) -> Result<()> {
2449        let Some(pm) = &self.property_manager else {
2450            return Ok(());
2451        };
2452
2453        let schema = self.schema_manager.schema();
2454
2455        // Resolve label: use provided label or discover from L0/storage
2456        let discovered_labels;
2457        let label_name = if let Some(l) = label {
2458            Some(l)
2459        } else {
2460            discovered_labels = self.get_vertex_labels(vid, tx_l0).await;
2461            discovered_labels
2462                .as_ref()
2463                .and_then(|l| l.first().map(|s| s.as_str()))
2464        };
2465
2466        let Some(label_str) = label_name else {
2467            return Ok(());
2468        };
2469        let Some(props_meta) = schema.properties.get(label_str) else {
2470            return Ok(());
2471        };
2472
2473        // Identify CRDT properties in the insert data
2474        let crdt_keys: Vec<String> = properties
2475            .keys()
2476            .filter(|key| {
2477                props_meta.get(*key).is_some_and(|meta| {
2478                    matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2479                })
2480            })
2481            .cloned()
2482            .collect();
2483
2484        if crdt_keys.is_empty() {
2485            return Ok(());
2486        }
2487
2488        // Enforce that each declared CRDT property written as a parsed CRDT value
2489        // (`Value::Map`) carries its declared variant. A mismatched variant makes
2490        // `merge_crdt_properties` overwrite rather than merge at commit, and the
2491        // OCC carve-out (`occ::crdt_carveout_overwrite` / `WriteSet::from_l0`)
2492        // would hide that as a silent lost update — reject it at the source.
2493        //
2494        // Only the `Map` form is checked: it is exactly the form the carve-out
2495        // applies to (`try_as_crdt` is `Map`-gated). A CRDT written as a JSON
2496        // string (the Cypher form) or a non-CRDT value is never carved out — it
2497        // stays conflictable — so it poses no carve-out soundness risk and is left
2498        // to the existing merge/parse path. This is the declared-property half of
2499        // the layered fix; the commit-time check covers undeclared CRDT-shaped values.
2500        Self::enforce_crdt_variants(props_meta, properties)?;
2501
2502        let ctx = self.get_query_context(tx_l0).await;
2503        for key in crdt_keys {
2504            let existing = pm.get_vertex_prop_with_ctx(vid, &key, ctx.as_ref()).await?;
2505            if !existing.is_null()
2506                && let Some(val) = properties.get_mut(&key)
2507            {
2508                *val = pm.merge_crdt_values(&existing, val)?;
2509            }
2510        }
2511
2512        Ok(())
2513    }
2514
2515    async fn prepare_edge_upsert(
2516        &self,
2517        eid: Eid,
2518        properties: &mut Properties,
2519        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2520    ) -> Result<()> {
2521        if let Some(pm) = &self.property_manager {
2522            let schema = self.schema_manager.schema();
2523            // Get edge type from L0 buffer instead of from EID
2524            let type_name = self.get_edge_type_from_l0(eid);
2525
2526            if let Some(ref t_name) = type_name
2527                && let Some(props_meta) = schema.properties.get(t_name)
2528            {
2529                let mut crdt_keys = Vec::new();
2530                for (key, _) in properties.iter() {
2531                    if let Some(meta) = props_meta.get(key)
2532                        && matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2533                    {
2534                        crdt_keys.push(key.clone());
2535                    }
2536                }
2537
2538                if !crdt_keys.is_empty() {
2539                    let ctx = self.get_query_context(tx_l0).await;
2540                    for key in crdt_keys {
2541                        let existing = pm.get_edge_prop(eid, &key, ctx.as_ref()).await?;
2542
2543                        if !existing.is_null()
2544                            && let Some(val) = properties.get_mut(&key)
2545                        {
2546                            *val = pm.merge_crdt_values(&existing, val)?;
2547                        }
2548                    }
2549                }
2550            }
2551        }
2552        Ok(())
2553    }
2554
2555    #[instrument(skip(self, properties), level = "trace")]
2556    pub async fn insert_vertex(
2557        &self,
2558        vid: Vid,
2559        properties: Properties,
2560        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2561    ) -> Result<()> {
2562        self.insert_vertex_with_labels(vid, properties, &[], tx_l0)
2563            .await?;
2564        Ok(())
2565    }
2566
2567    /// Component C1 (G4): before a non-transactional mutation merges into main
2568    /// L0, if an outstanding snapshot pins the current generation, freeze it
2569    /// aside so snapshots taken *before* this write stay isolated from it.
2570    ///
2571    /// `flush_lock` (acquired and released here) serializes the freeze against
2572    /// concurrent commit-time freezes/merges, matching the atomicity the tx
2573    /// commit path gets. No-op for transactional writes (their freeze happens at
2574    /// commit) and — the common case — when nothing is pinned, where it costs one
2575    /// atomic load. Freezes at most once per pinned generation: the freeze
2576    /// installs a fresh unpinned `current`, so later writes in the same bulk
2577    /// import see no pin and merge in place, and the snapshot keeps reading the
2578    /// frozen pre-import buffer.
2579    async fn freeze_for_non_tx_write_if_pinned(&self, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) {
2580        // Self-gates on the runtime SSI toggle: nothing pins a snapshot unless a
2581        // transaction began under `ssi_enabled`, so `is_current_pinned()` is
2582        // always false (one atomic load) when SSI is off.
2583        if tx_l0.is_none() && self.l0_manager.is_current_pinned() {
2584            let _flush_lock_guard = self.flush_lock.lock().await;
2585            // Re-check under the lock: a concurrent commit may have frozen first.
2586            if self.l0_manager.is_current_pinned() {
2587                self.l0_manager.freeze_current_for_snapshot();
2588                metrics::counter!("uni_l0_snapshot_freezes_total").increment(1);
2589            }
2590        }
2591    }
2592
2593    #[instrument(skip(self, properties, labels), level = "trace")]
2594    pub async fn insert_vertex_with_labels(
2595        &self,
2596        vid: Vid,
2597        mut properties: Properties,
2598        labels: &[String],
2599        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2600    ) -> Result<Properties> {
2601        let start = std::time::Instant::now();
2602        self.check_write_pressure().await?;
2603        self.check_transaction_memory(tx_l0)?;
2604
2605        // Component C1 (G4): a non-transactional write (`tx_l0 == None`, e.g. bulk
2606        // import / LOAD CSV) mutates main L0 directly, outside the commit-time
2607        // snapshot freeze. Freeze the pinned generation aside first so snapshots
2608        // taken before this write stay isolated from it.
2609        self.freeze_for_non_tx_write_if_pinned(tx_l0).await;
2610
2611        if !self.try_defer_embedding(labels, &properties, vid, tx_l0) {
2612            self.process_embeddings_for_labels(labels, &mut properties)
2613                .await?;
2614        }
2615        self.validate_vertex_constraints(vid, &properties, labels, tx_l0)
2616            .await?;
2617        self.prepare_vertex_upsert(
2618            vid,
2619            &mut properties,
2620            labels.first().map(|s| s.as_str()),
2621            tx_l0,
2622        )
2623        .await?;
2624
2625        // Clone properties and labels before moving into L0 to return them and populate constraint index
2626        let properties_copy = properties.clone();
2627        let labels_copy = labels.to_vec();
2628
2629        {
2630            // For a non-tx write, re-resolve the live `current` buffer and hold
2631            // `flush_lock` across the (synchronous) write so a concurrent flush
2632            // rotate (which takes `flush_lock` via `begin_flush`) cannot install
2633            // a fresh `current` between resolve and write, dropping our write
2634            // (Bug #4). For a tx write `resolve_l0` returns the tx-private
2635            // buffer, never the rotating `current`, so no `flush_lock` is needed
2636            // (and taking it would risk re-entrancy with the commit path).
2637            let _flush_lock_guard = if tx_l0.is_none() {
2638                Some(self.flush_lock.lock().await)
2639            } else {
2640                None
2641            };
2642            let l0 = self.resolve_l0(tx_l0);
2643            let mut l0_guard = l0.write();
2644            // Generation chaining: a non-tx CRDT write into the (post-freeze,
2645            // possibly empty) current buffer must merge against the chained
2646            // committed state, not shadow it. No-op when the chain is empty
2647            // or this is a tx-private write.
2648            if tx_l0.is_none() {
2649                let pending = self.l0_manager.get_pending_flush();
2650                if !pending.is_empty() {
2651                    Self::seed_crdt_props(&pending, vid, &properties, &mut l0_guard);
2652                }
2653            }
2654            l0_guard.insert_vertex_with_labels(vid, properties, labels);
2655
2656            // Populate constraint index for O(1) duplicate detection
2657            let schema = self.schema_manager.schema();
2658            for label in &labels_copy {
2659                if schema.get_label_case_insensitive(label).is_none() {
2660                    if self.config.strict_schema {
2661                        return Err(anyhow::anyhow!(
2662                            "Label '{}' is not defined in the schema \
2663                             (strict_schema is enabled).",
2664                            label
2665                        ));
2666                    }
2667                    continue; // Schemaless: skip unknown labels.
2668                }
2669
2670                // For each unique constraint on this label, insert into constraint index
2671                for constraint in &schema.constraints {
2672                    if !constraint.enabled {
2673                        continue;
2674                    }
2675                    if let ConstraintTarget::Label(l) = &constraint.target {
2676                        if l != label {
2677                            continue;
2678                        }
2679                    } else {
2680                        continue;
2681                    }
2682
2683                    if let ConstraintType::Unique {
2684                        properties: unique_props,
2685                    } = &constraint.constraint_type
2686                    {
2687                        let mut key_values = Vec::new();
2688                        let mut all_present = true;
2689                        for prop in unique_props {
2690                            if let Some(val) = properties_copy.get(prop) {
2691                                key_values.push((prop.clone(), val.clone()));
2692                            } else {
2693                                all_present = false;
2694                                break;
2695                            }
2696                        }
2697
2698                        if all_present {
2699                            let key = serialize_constraint_key(label, &key_values);
2700                            l0_guard.insert_constraint_key(key, vid);
2701                        }
2702                    }
2703                }
2704            }
2705        }
2706
2707        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2708        self.update_metrics();
2709
2710        if tx_l0.is_none() {
2711            self.check_flush().await?;
2712        }
2713        if start.elapsed().as_millis() > 100 {
2714            log::warn!("Slow insert_vertex: {}ms", start.elapsed().as_millis());
2715        }
2716        Ok(properties_copy)
2717    }
2718
2719    /// True iff routing this partial write through MergeInsert would
2720    /// miss a constraint check. Specifically: a multi-key UNIQUE
2721    /// constraint where the touched-set doesn't cover all member keys
2722    /// requires the unchanged keys from the existing row to compute
2723    /// the composite. Conservative: also returns true if any touched
2724    /// key is `ext_id` (uniqueness checked globally — handled in the
2725    /// full-row path).
2726    fn touched_needs_full_read(&self, touched: &Properties, labels: &[String]) -> bool {
2727        if touched.contains_key("ext_id") {
2728            return true;
2729        }
2730        let schema = self.schema_manager.schema();
2731        for label in labels {
2732            if schema.get_label_case_insensitive(label).is_none() {
2733                continue;
2734            }
2735            for constraint in &schema.constraints {
2736                if !constraint.enabled {
2737                    continue;
2738                }
2739                if let ConstraintTarget::Label(l) = &constraint.target {
2740                    if !l.eq_ignore_ascii_case(label) {
2741                        continue;
2742                    }
2743                } else {
2744                    continue;
2745                }
2746                if let ConstraintType::Unique {
2747                    properties: unique_props,
2748                } = &constraint.constraint_type
2749                {
2750                    if unique_props.len() < 2 {
2751                        continue; // single-key UNIQUE — partial path sees the key
2752                    }
2753                    if unique_props.iter().any(|p| touched.contains_key(p)) {
2754                        return true;
2755                    }
2756                }
2757            }
2758        }
2759        false
2760    }
2761
2762    /// Insert a vertex's FULL property row plus a touched-keys hint so
2763    /// the flush emits ONLY those columns via Lance MergeInsert.
2764    ///
2765    /// Caller must have read the full row (via PropertyManager) and
2766    /// applied SET-touched values on top before calling — same input
2767    /// shape as `insert_vertex_with_labels`. The new arg `touched_keys`
2768    /// is the set of property keys this SET statement actually
2769    /// assigned; L0 records it in `vertex_partial_keys[vid]` and the
2770    /// flush filters the MergeInsert source schema down to those keys.
2771    /// When `UniConfig::partial_lance_writes == false`, falls through
2772    /// to `insert_vertex_with_labels` (Append) — preserving bit-for-bit
2773    /// equivalence with prior releases.
2774    /// Refresh auto-embed targets for a partial / `SET` write whose `touched_keys`
2775    /// include an embed *source* column: drop stale target embeddings from `props`
2776    /// (so they re-embed) and add them to `touched_keys`. Public so the query
2777    /// executor can apply it on the coalesced write **before** the partial-vs-full
2778    /// branch — both branches need it. No-op for non-SET writes / non-embed labels.
2779    pub fn refresh_embed_targets(
2780        &self,
2781        props: &mut Properties,
2782        touched_keys: &mut HashSet<String>,
2783        labels: &[String],
2784    ) {
2785        refresh_touched_embed_targets(&self.schema_manager.schema(), props, touched_keys, labels);
2786    }
2787
2788    #[instrument(skip(self, props, touched_keys, labels), level = "trace")]
2789    pub async fn insert_vertex_partial_full(
2790        &self,
2791        vid: Vid,
2792        mut props: Properties,
2793        touched_keys: HashSet<String>,
2794        labels: &[String],
2795        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2796    ) -> Result<()> {
2797        if !self.config.partial_lance_writes
2798            || self.touched_needs_full_read(&props_subset(&props, &touched_keys), labels)
2799        {
2800            self.insert_vertex_with_labels(vid, props, labels, tx_l0)
2801                .await?;
2802            return Ok(());
2803        }
2804
2805        self.check_write_pressure().await?;
2806        self.check_transaction_memory(tx_l0)?;
2807        if !self.try_defer_embedding(labels, &props, vid, tx_l0) {
2808            self.process_embeddings_for_labels(labels, &mut props)
2809                .await?;
2810        }
2811        // Full-row validation runs because we have the complete map;
2812        // no need for the partial-only validator.
2813        self.validate_vertex_constraints(vid, &props, labels, tx_l0)
2814            .await?;
2815        {
2816            let l0 = self.resolve_l0(tx_l0);
2817            let mut l0_guard = l0.write();
2818            l0_guard.insert_vertex_partial_full(vid, props, touched_keys, labels);
2819        }
2820        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2821        metrics::counter!("uni_partial_writes_total").increment(1);
2822        self.update_metrics();
2823        if tx_l0.is_none() {
2824            self.check_flush().await?;
2825        }
2826        Ok(())
2827    }
2828
2829    /// Insert a vertex's *partial* property set without first reading the
2830    /// full row.
2831    ///
2832    /// When `WriterConfig::partial_lance_writes` is `true`, the touched
2833    /// keys flow into `L0Buffer::vertex_partial_keys` so the next flush
2834    /// emits them via Lance `MergeInsertBuilder` against a subset-of-
2835    /// schema source — preserving untouched columns (e.g., embeddings)
2836    /// byte-equal in Lance with no read at the caller and no write of
2837    /// those columns.
2838    ///
2839    /// When the flag is `false`, this falls back to the existing
2840    /// `insert_vertex_with_labels` path after merging `touched` with
2841    /// the current properties from L0/storage. The caller can therefore
2842    /// use this entry point unconditionally; the optimization activates
2843    /// only when the flag is on.
2844    #[instrument(skip(self, touched, labels), level = "trace")]
2845    pub async fn insert_vertex_partial(
2846        &self,
2847        vid: Vid,
2848        touched: Properties,
2849        labels: &[String],
2850        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2851    ) -> Result<()> {
2852        let needs_full_read =
2853            !self.config.partial_lance_writes || self.touched_needs_full_read(&touched, labels);
2854        if needs_full_read {
2855            // Flag-off fallback (or constraint-driven fallback): merge
2856            // `touched` with the current full property snapshot from
2857            // L0/storage and route through the existing path. Preserves
2858            // bit-for-bit equivalence with the pre-Round-11 release.
2859            let existing = if let Some(pm) = &self.property_manager {
2860                pm.get_all_vertex_props_with_ctx(vid, None)
2861                    .await
2862                    .unwrap_or_default()
2863                    .unwrap_or_default()
2864            } else {
2865                Properties::new()
2866            };
2867            let mut merged = existing;
2868            for (k, v) in touched {
2869                merged.insert(k, v);
2870            }
2871            self.insert_vertex_with_labels(vid, merged, labels, tx_l0)
2872                .await?;
2873            return Ok(());
2874        }
2875
2876        // Flag-on fast path: stage the partial update directly. Pressure
2877        // checks, embedding generation, constraint validation all still
2878        // run — but the validator is the partial-aware variant that
2879        // skips NOT NULL / multi-key UNIQUE / CHECK / EXISTS for
2880        // properties not present in `touched`. Multi-key UNIQUE that
2881        // overlaps the touched set forces a fallback above via
2882        // `touched_needs_full_read`.
2883        let mut touched = touched;
2884        self.check_write_pressure().await?;
2885        self.check_transaction_memory(tx_l0)?;
2886        if !self.try_defer_embedding(labels, &touched, vid, tx_l0) {
2887            self.process_embeddings_for_labels(labels, &mut touched)
2888                .await?;
2889        }
2890        self.validate_vertex_constraints_partial(vid, &touched, labels, tx_l0)
2891            .await?;
2892
2893        {
2894            let l0 = self.resolve_l0(tx_l0);
2895            let mut l0_guard = l0.write();
2896            l0_guard.insert_vertex_partial(vid, touched, labels);
2897        }
2898
2899        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2900        metrics::counter!("uni_partial_writes_total").increment(1);
2901        self.update_metrics();
2902        if tx_l0.is_none() {
2903            self.check_flush().await?;
2904        }
2905        Ok(())
2906    }
2907
2908    /// Insert multiple vertices with batched operations.
2909    ///
2910    /// This method uses batched operations to achieve O(N) complexity instead of O(N²)
2911    /// for bulk inserts with unique constraints.
2912    ///
2913    /// # Performance Improvements
2914    /// - Batch VID allocation: 1 call instead of N calls
2915    /// - Batch constraint validation: O(N) instead of O(N²)
2916    /// - Batch embedding generation: 1 API call per config instead of N calls
2917    /// - Transaction wrapping: Automatic flush deferral, atomicity
2918    ///
2919    /// # Arguments
2920    /// * `vids` - Pre-allocated VIDs for the vertices
2921    /// * `properties_batch` - Properties for each vertex
2922    /// * `labels` - Labels for all vertices (assumes single label for simplicity)
2923    ///
2924    /// # Errors
2925    /// Returns error if:
2926    /// - VID/properties length mismatch
2927    /// - Constraint violation detected
2928    /// - Embedding generation fails
2929    /// - Transaction commit fails
2930    ///
2931    /// # Atomicity
2932    /// If this method fails, all changes are rolled back (if transaction was started here).
2933    pub async fn insert_vertices_batch(
2934        &self,
2935        vids: Vec<Vid>,
2936        mut properties_batch: Vec<Properties>,
2937        labels: Vec<String>,
2938        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2939    ) -> Result<Vec<Properties>> {
2940        let start = std::time::Instant::now();
2941
2942        // Validate inputs
2943        if vids.len() != properties_batch.len() {
2944            return Err(anyhow!(
2945                "VID/properties size mismatch: {} vids, {} properties",
2946                vids.len(),
2947                properties_batch.len()
2948            ));
2949        }
2950
2951        if vids.is_empty() {
2952            return Ok(Vec::new());
2953        }
2954
2955        // Batch operations — writes go directly to the resolved L0.
2956        // Atomicity is guaranteed by the caller holding the writer lock.
2957        let result = async {
2958            self.check_write_pressure().await?;
2959            self.check_transaction_memory(tx_l0)?;
2960
2961            // Component C1 (G4): batch bulk-import is the canonical non-tx write —
2962            // freeze the pinned generation aside before merging so snapshot
2963            // readers stay isolated. No-op when unpinned or transactional.
2964            self.freeze_for_non_tx_write_if_pinned(tx_l0).await;
2965
2966            // Batch embedding generation (1 API call per config)
2967            self.process_embeddings_for_batch(&labels, &mut properties_batch)
2968                .await?;
2969
2970            // Batch constraint validation (O(N) instead of O(N²))
2971            let label = labels
2972                .first()
2973                .ok_or_else(|| anyhow!("No labels provided"))?;
2974            self.validate_vertex_batch_constraints(&vids, &properties_batch, label, tx_l0)
2975                .await?;
2976
2977            // Batch prepare (CRDT merging if needed)
2978            // Check schema once: skip entirely if no CRDT properties for this label.
2979            // For new vertices (freshly allocated VIDs), there are no existing CRDT
2980            // values to merge, so the per-vertex lookup is unnecessary in that case.
2981            let has_crdt_fields = {
2982                let schema = self.schema_manager.schema();
2983                schema
2984                    .properties
2985                    .get(label.as_str())
2986                    .is_some_and(|props_meta| {
2987                        props_meta.values().any(|meta| {
2988                            matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2989                        })
2990                    })
2991            };
2992
2993            if has_crdt_fields {
2994                // Layer-1 variant enforcement (G3): the batch path must reject a
2995                // declared-CRDT variant mismatch exactly as the single-vertex
2996                // `prepare_vertex_upsert` does. Without this, a wrong-variant CRDT
2997                // written via batch import slips past write-time validation and
2998                // the OCC carve-out then masks the overwrite as a lost update.
2999                {
3000                    let schema = self.schema_manager.schema();
3001                    if let Some(props_meta) = schema.properties.get(label.as_str()) {
3002                        for props in &properties_batch {
3003                            Self::enforce_crdt_variants(props_meta, props)?;
3004                        }
3005                    }
3006                }
3007
3008                // Batch fetch existing CRDT values: collect VIDs that need merging,
3009                // then query once via PropertyManager instead of per-vertex lookups.
3010                let schema = self.schema_manager.schema();
3011                let crdt_keys: Vec<String> = schema
3012                    .properties
3013                    .get(label.as_str())
3014                    .map(|props_meta| {
3015                        props_meta
3016                            .iter()
3017                            .filter(|(_, meta)| {
3018                                matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
3019                            })
3020                            .map(|(key, _)| key.clone())
3021                            .collect()
3022                    })
3023                    .unwrap_or_default();
3024
3025                if let Some(pm) = &self.property_manager {
3026                    let ctx = self.get_query_context(tx_l0).await;
3027                    for (vid, props) in vids.iter().zip(&mut properties_batch) {
3028                        for key in &crdt_keys {
3029                            if props.contains_key(key) {
3030                                let existing =
3031                                    pm.get_vertex_prop_with_ctx(*vid, key, ctx.as_ref()).await?;
3032                                if !existing.is_null()
3033                                    && let Some(val) = props.get_mut(key)
3034                                {
3035                                    *val = pm.merge_crdt_values(&existing, val)?;
3036                                }
3037                            }
3038                        }
3039                    }
3040                }
3041            }
3042
3043            // Batch L0 writes — route to active L0 (transaction L0 if active, else current).
3044            let target_l0 = self.resolve_l0(tx_l0);
3045
3046            let properties_result = properties_batch.clone();
3047            {
3048                let mut l0_guard = target_l0.write();
3049                for (vid, props) in vids.iter().zip(properties_batch.iter()) {
3050                    l0_guard.insert_vertex_with_labels(*vid, props.clone(), &labels);
3051                }
3052            }
3053
3054            // Update metrics (batch increment)
3055            metrics::counter!("uni_l0_buffer_mutations_total").increment(vids.len() as u64);
3056            self.update_metrics();
3057
3058            Ok::<Vec<Properties>, anyhow::Error>(properties_result)
3059        }
3060        .await;
3061
3062        let props = result?;
3063
3064        if start.elapsed().as_millis() > 100 {
3065            log::warn!(
3066                "Slow insert_vertices_batch ({} vertices): {}ms",
3067                vids.len(),
3068                start.elapsed().as_millis()
3069            );
3070        }
3071
3072        Ok(props)
3073    }
3074
3075    /// Delete a vertex by VID.
3076    ///
3077    /// When `labels` is provided, uses them directly to populate L0 for
3078    /// correct tombstone flushing. Otherwise discovers labels from L0
3079    /// buffers and storage (which can be slow for many vertices).
3080    ///
3081    /// # Errors
3082    ///
3083    /// Returns an error if write pressure stalls, label lookup fails, or
3084    /// the L0 delete operation fails.
3085    #[instrument(skip(self, labels), level = "trace")]
3086    pub async fn delete_vertex(
3087        &self,
3088        vid: Vid,
3089        labels: Option<Vec<String>>,
3090        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
3091    ) -> Result<()> {
3092        let start = std::time::Instant::now();
3093        self.check_write_pressure().await?;
3094        self.check_transaction_memory(tx_l0)?;
3095        self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
3096
3097        // Before deleting, ensure we have the vertex's labels stored in L0 so
3098        // the tombstone can be flushed to the correct label datasets. Discover
3099        // them up front (this may await storage) WITHOUT pinning the buffer we
3100        // will eventually mutate — for non-tx writes the live `current` buffer
3101        // is re-resolved below under `flush_lock`, so a concurrent rotate can't
3102        // drop our write (Bug #4). `resolve_l0` here is only used for cheap
3103        // reads that tolerate a racing rotate.
3104        let has_labels = {
3105            let l0_guard = self.resolve_l0(tx_l0);
3106            let guard = l0_guard.read();
3107            guard.vertex_labels.contains_key(&vid)
3108        };
3109
3110        let backfill_labels = if has_labels {
3111            None
3112        } else if let Some(provided) = labels {
3113            // Caller provided labels — skip the lookup entirely
3114            Some(provided)
3115        } else {
3116            // Discover labels from pending flush L0s, then storage
3117            let mut found = None;
3118            for pending_l0 in self.l0_manager.get_pending_flush() {
3119                let pending_guard = pending_l0.read();
3120                if let Some(l) = pending_guard.get_vertex_labels(vid) {
3121                    found = Some(l.to_vec());
3122                    break;
3123                }
3124            }
3125            if found.is_none() {
3126                found = self.find_vertex_labels_in_storage(vid).await?;
3127            }
3128            found
3129        };
3130
3131        // Test-only seam (no-op without the `failpoints` feature): pause a
3132        // non-transactional delete AFTER the awaited label discovery but BEFORE
3133        // it re-resolves the live buffer and writes the tombstone. A concurrent
3134        // flush can rotate+complete a buffer in this window; the fix re-resolves
3135        // `get_current()` and mutates it under `flush_lock`, so the tombstone
3136        // always lands in the live buffer (Bug #4 — silent lost delete across
3137        // L0 rotation).
3138        fail::fail_point!("nontx::after-capture");
3139
3140        // Apply the label backfill and the tombstone together. For a non-tx
3141        // write, hold `flush_lock` across the (synchronous) re-resolve + write
3142        // so a concurrent flush rotate (which takes `flush_lock` via
3143        // `begin_flush`) cannot install a fresh `current` between our resolve
3144        // and our write. For a tx write `resolve_l0` returns the tx-private
3145        // buffer (never the rotating `current`), so no `flush_lock` is needed
3146        // — and taking it there would risk re-entrancy with the commit path.
3147        if tx_l0.is_none() {
3148            let _flush_lock_guard = self.flush_lock.lock().await;
3149            let l0 = self.l0_manager.get_current();
3150            let mut guard = l0.write();
3151            if let Some(found_labels) = backfill_labels {
3152                guard.vertex_labels.insert(vid, found_labels);
3153            }
3154            guard.delete_vertex(vid)?;
3155        } else {
3156            let l0 = self.resolve_l0(tx_l0);
3157            let mut guard = l0.write();
3158            if let Some(found_labels) = backfill_labels {
3159                guard.vertex_labels.insert(vid, found_labels);
3160            }
3161            guard.delete_vertex(vid)?;
3162        }
3163        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
3164        self.update_metrics();
3165
3166        if tx_l0.is_none() {
3167            self.check_flush().await?;
3168        }
3169        if start.elapsed().as_millis() > 100 {
3170            log::warn!("Slow delete_vertex: {}ms", start.elapsed().as_millis());
3171        }
3172        Ok(())
3173    }
3174
3175    /// Find vertex labels from storage by querying the main vertices table.
3176    /// Returns the labels from the latest non-deleted version of the vertex.
3177    async fn find_vertex_labels_in_storage(&self, vid: Vid) -> Result<Option<Vec<String>>> {
3178        use crate::backend::types::ScanRequest;
3179        use arrow_array::Array;
3180        use arrow_array::cast::AsArray;
3181
3182        let backend = self.storage.backend();
3183        let table_name = MainVertexDataset::table_name();
3184
3185        // Check if table exists first; if not, vertex hasn't been flushed to storage yet
3186        if !backend.table_exists(table_name).await? {
3187            return Ok(None);
3188        }
3189
3190        // Query for this specific vid (don't filter by _deleted yet - we need to find the latest version first)
3191        let filter = format!("_vid = {}", vid.as_u64());
3192        let batches = backend
3193            .scan(
3194                ScanRequest::all(table_name)
3195                    .with_filter(filter)
3196                    .with_columns(vec![
3197                        "_vid".to_string(),
3198                        "labels".to_string(),
3199                        "_version".to_string(),
3200                        "_deleted".to_string(),
3201                    ]),
3202            )
3203            .await
3204            .unwrap_or_default();
3205
3206        // Find the row with the highest version number
3207        let mut max_version: Option<u64> = None;
3208        let mut labels: Option<Vec<String>> = None;
3209        let mut is_deleted = false;
3210
3211        for batch in batches {
3212            if batch.num_rows() == 0 {
3213                continue;
3214            }
3215
3216            let version_array = batch
3217                .column_by_name("_version")
3218                .unwrap()
3219                .as_primitive::<arrow_array::types::UInt64Type>();
3220
3221            let deleted_array = batch.column_by_name("_deleted").unwrap().as_boolean();
3222
3223            let labels_array = batch.column_by_name("labels").unwrap().as_list::<i32>();
3224
3225            for row_idx in 0..batch.num_rows() {
3226                let version = version_array.value(row_idx);
3227
3228                if max_version.is_none_or(|mv| version > mv) {
3229                    is_deleted = deleted_array.value(row_idx);
3230
3231                    let labels_list = labels_array.value(row_idx);
3232                    let string_array = labels_list.as_string::<i32>();
3233                    let vertex_labels: Vec<String> = (0..string_array.len())
3234                        .filter(|&i| !string_array.is_null(i))
3235                        .map(|i| string_array.value(i).to_string())
3236                        .collect();
3237
3238                    max_version = Some(version);
3239                    labels = Some(vertex_labels);
3240                }
3241            }
3242        }
3243
3244        // If the latest version is deleted, return None
3245        if is_deleted { Ok(None) } else { Ok(labels) }
3246    }
3247
3248    #[expect(clippy::too_many_arguments)]
3249    #[instrument(skip(self, props, touched_keys), level = "trace")]
3250    pub async fn insert_edge_partial_full(
3251        &self,
3252        src_vid: Vid,
3253        dst_vid: Vid,
3254        edge_type: u32,
3255        eid: Eid,
3256        props: Properties,
3257        edge_type_name: Option<String>,
3258        touched_keys: HashSet<String>,
3259        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
3260    ) -> Result<()> {
3261        self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
3262        if !self.config.partial_lance_writes {
3263            return self
3264                .insert_edge(
3265                    src_vid,
3266                    dst_vid,
3267                    edge_type,
3268                    eid,
3269                    props,
3270                    edge_type_name,
3271                    tx_l0,
3272                )
3273                .await;
3274        }
3275
3276        let start = std::time::Instant::now();
3277        self.check_write_pressure().await?;
3278        self.check_transaction_memory(tx_l0)?;
3279        let mut props = props;
3280        self.prepare_edge_upsert(eid, &mut props, tx_l0).await?;
3281
3282        let l0 = self.resolve_l0(tx_l0);
3283        l0.write().insert_edge_partial_full(
3284            src_vid,
3285            dst_vid,
3286            edge_type,
3287            eid,
3288            props,
3289            edge_type_name,
3290            touched_keys,
3291        )?;
3292
3293        if tx_l0.is_none() {
3294            let version = l0.read().current_version;
3295            self.adjacency_manager
3296                .insert_edge(src_vid, dst_vid, eid, edge_type, version);
3297        }
3298
3299        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
3300        metrics::counter!("uni_partial_writes_total").increment(1);
3301        self.update_metrics();
3302        if tx_l0.is_none() {
3303            self.check_flush().await?;
3304        }
3305        if start.elapsed().as_millis() > 100 {
3306            log::warn!(
3307                "Slow insert_edge_partial_full: {}ms",
3308                start.elapsed().as_millis()
3309            );
3310        }
3311        Ok(())
3312    }
3313
3314    #[expect(clippy::too_many_arguments)]
3315    pub async fn insert_edge(
3316        &self,
3317        src_vid: Vid,
3318        dst_vid: Vid,
3319        edge_type: u32,
3320        eid: Eid,
3321        mut properties: Properties,
3322        edge_type_name: Option<String>,
3323        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
3324    ) -> Result<()> {
3325        let start = std::time::Instant::now();
3326        self.check_write_pressure().await?;
3327        self.check_transaction_memory(tx_l0)?;
3328        self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
3329        self.prepare_edge_upsert(eid, &mut properties, tx_l0)
3330            .await?;
3331
3332        let l0 = self.resolve_l0(tx_l0);
3333        l0.write()
3334            .insert_edge(src_vid, dst_vid, edge_type, eid, properties, edge_type_name)?;
3335
3336        // Dual-write to AdjacencyManager overlay (survives flush).
3337        // Skip for transaction-local L0 -- transaction edges are overlaid separately.
3338        if tx_l0.is_none() {
3339            let version = l0.read().current_version;
3340            self.adjacency_manager
3341                .insert_edge(src_vid, dst_vid, eid, edge_type, version);
3342        }
3343
3344        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
3345        self.update_metrics();
3346
3347        if tx_l0.is_none() {
3348            self.check_flush().await?;
3349        }
3350        if start.elapsed().as_millis() > 100 {
3351            log::warn!("Slow insert_edge: {}ms", start.elapsed().as_millis());
3352        }
3353        Ok(())
3354    }
3355
3356    #[instrument(skip(self), level = "trace")]
3357    pub async fn delete_edge(
3358        &self,
3359        eid: Eid,
3360        src_vid: Vid,
3361        dst_vid: Vid,
3362        edge_type: u32,
3363        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
3364    ) -> Result<()> {
3365        let start = std::time::Instant::now();
3366        self.check_write_pressure().await?;
3367        self.check_transaction_memory(tx_l0)?;
3368        self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
3369        let l0 = self.resolve_l0(tx_l0);
3370
3371        l0.write().delete_edge(eid, src_vid, dst_vid, edge_type)?;
3372
3373        // Dual-write tombstone to AdjacencyManager overlay.
3374        if tx_l0.is_none() {
3375            let version = l0.read().current_version;
3376            self.adjacency_manager
3377                .add_tombstone(eid, src_vid, dst_vid, edge_type, version);
3378        }
3379        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
3380        self.update_metrics();
3381
3382        if tx_l0.is_none() {
3383            self.check_flush().await?;
3384        }
3385        if start.elapsed().as_millis() > 100 {
3386            log::warn!("Slow delete_edge: {}ms", start.elapsed().as_millis());
3387        }
3388        Ok(())
3389    }
3390
3391    /// Decide whether a flush should be triggered based on mutation count
3392    /// or elapsed time since the last flush.
3393    ///
3394    /// Extracted from [`Writer::check_flush`] so `commit_transaction_l0` can
3395    /// reuse the decision while bypassing the lock-acquiring entry point
3396    /// (it already holds `flush_lock`).
3397    fn should_flush(&self) -> bool {
3398        let count = self.l0_manager.get_current().read().mutation_count;
3399        if count == 0 {
3400            return false;
3401        }
3402        if count >= self.config.auto_flush_threshold {
3403            return true;
3404        }
3405        if let Some(interval) = self.config.auto_flush_interval
3406            && self.last_flush_time.lock().elapsed() >= interval
3407            && count >= self.config.auto_flush_min_mutations
3408        {
3409            return true;
3410        }
3411        false
3412    }
3413
3414    /// Check if flush should be triggered based on mutation count or time elapsed.
3415    /// This method is called after each write operation and can also be called
3416    /// by a background task for time-based flushing.
3417    pub async fn check_flush(&self) -> Result<()> {
3418        if self.should_flush() {
3419            self.flush_to_l1(None).await?;
3420        }
3421        Ok(())
3422    }
3423
3424    /// Process embeddings for a vertex using labels passed directly.
3425    /// Use this when labels haven't been stored to L0 yet.
3426    async fn process_embeddings_for_labels(
3427        &self,
3428        labels: &[String],
3429        properties: &mut Properties,
3430    ) -> Result<()> {
3431        let label_name = labels.first().map(|s| s.as_str());
3432        self.process_embeddings_impl(label_name, properties).await
3433    }
3434
3435    /// Phase B: if `defer_embeddings` is enabled in `UniConfig` and the
3436    /// vertex has an embedding config that hasn't been satisfied by the
3437    /// caller-provided properties, enqueue the VID in
3438    /// `L0Buffer::pending_embeddings` and return `true`. The caller then
3439    /// skips `process_embeddings_for_labels` and the embedding is computed
3440    /// in a single batched call at flush time via
3441    /// `drain_pending_embeddings`.
3442    ///
3443    /// Returns `false` (caller falls back to today's per-row eager embed)
3444    /// if any of:
3445    ///  - the flag is off,
3446    ///  - no label has an embedding config,
3447    ///  - the user already provided the target property (matches the
3448    ///    existing skip-if-present semantics at writer.rs:2727).
3449    ///
3450    /// Trade-off: when deferral is active, in-tx reads of the embedding
3451    /// column return only what was already in storage (or nothing for
3452    /// brand-new vertices). Existing tests that RETURN n.embedding in
3453    /// the same tx as a SET on the source column must run with the flag
3454    /// off; opt in only when no such reads happen between write and
3455    /// commit.
3456    fn try_defer_embedding(
3457        &self,
3458        labels: &[String],
3459        properties: &Properties,
3460        vid: Vid,
3461        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
3462    ) -> bool {
3463        if !self.config.defer_embeddings {
3464            return false;
3465        }
3466        let Some(label) = labels.first() else {
3467            return false;
3468        };
3469
3470        let schema = self.schema_manager.schema();
3471        let mut has_unsatisfied_cfg = false;
3472        for idx in &schema.indexes {
3473            let unsatisfied = match idx {
3474                IndexDefinition::Vector(v_cfg) => {
3475                    v_cfg.label == *label
3476                        && v_cfg.embedding_config.is_some()
3477                        && !properties.contains_key(&v_cfg.property)
3478                }
3479                IndexDefinition::Sparse(s_cfg) => {
3480                    s_cfg.label == *label
3481                        && s_cfg.embedding_config.is_some()
3482                        && !properties.contains_key(&s_cfg.property)
3483                }
3484                _ => false,
3485            };
3486            if unsatisfied {
3487                has_unsatisfied_cfg = true;
3488                break;
3489            }
3490        }
3491        if !has_unsatisfied_cfg {
3492            return false;
3493        }
3494
3495        let l0 = self.resolve_l0(tx_l0);
3496        let mut guard = l0.write();
3497        guard.pending_embeddings.insert(vid, label.clone());
3498        true
3499    }
3500
3501    /// Drain `pending_embeddings` from the rotated old-L0 right before
3502    /// `flush_stream_l1` reads it. Groups by label, issues one batched
3503    /// `process_embeddings_for_batch` call per label, and writes the
3504    /// resulting embedding vectors into each VID's `vertex_properties`
3505    /// map. After this returns, the flush proceeds against an L0 that
3506    /// looks no different from one whose embeddings were generated
3507    /// per-row at insert.
3508    ///
3509    /// Idempotent: a VID whose embedding was already materialized
3510    /// (e.g., by on-demand read paths in a future Phase B revision) is
3511    /// detected via `properties.contains_key(target_prop)` inside
3512    /// `process_embeddings_for_batch` (writer.rs:~2650), so re-running
3513    /// the drain is safe.
3514    async fn drain_pending_embeddings(&self, old_l0_arc: &Arc<RwLock<L0Buffer>>) -> Result<()> {
3515        let by_label: HashMap<String, Vec<Vid>> = {
3516            let guard = old_l0_arc.read();
3517            if guard.pending_embeddings.is_empty() {
3518                return Ok(());
3519            }
3520            let mut m: HashMap<String, Vec<Vid>> = HashMap::new();
3521            for (vid, label) in &guard.pending_embeddings {
3522                m.entry(label.clone()).or_default().push(*vid);
3523            }
3524            m
3525        };
3526
3527        for (label, vids) in by_label {
3528            let mut properties_batch: Vec<Properties> = {
3529                let guard = old_l0_arc.read();
3530                vids.iter()
3531                    .map(|vid| {
3532                        guard
3533                            .vertex_properties
3534                            .get(vid)
3535                            .cloned()
3536                            .unwrap_or_default()
3537                    })
3538                    .collect()
3539            };
3540
3541            self.process_embeddings_for_batch(std::slice::from_ref(&label), &mut properties_batch)
3542                .await?;
3543
3544            let mut guard = old_l0_arc.write();
3545            for (vid, props) in vids.iter().zip(properties_batch) {
3546                let target = guard.vertex_properties.entry(*vid).or_default();
3547                for (k, v) in props {
3548                    target.insert(k, v);
3549                }
3550                guard.pending_embeddings.remove(vid);
3551            }
3552        }
3553        Ok(())
3554    }
3555
3556    /// Materialise MUVERA FDE columns for the about-to-flush L0. Mirrors
3557    /// [`Self::drain_pending_embeddings`]: for each MUVERA index, compute the derived
3558    /// Fixed-Dimensional Encoding from each row's source multi-vector and inject it into
3559    /// that row's `vertex_properties` (so the normal column builder writes the
3560    /// `__fde_*` column with no hot-path change). For partial-write rows that touched the
3561    /// source column, the derived column is added to `vertex_partial_keys` so the partial
3562    /// MergeInsert batch carries the recomputed FDE (avoids staleness on `SET`).
3563    ///
3564    /// No-op when the schema has no MUVERA index. Unlike auto-embed, the FDE is a pure,
3565    /// deterministic, in-process transform — no runtime/embedding service needed.
3566    fn materialize_fde_columns(&self, old_l0_arc: &Arc<RwLock<L0Buffer>>) -> Result<()> {
3567        let schema = self.schema_manager.schema();
3568        let specs = crate::storage::muvera_index::fde_specs(&schema);
3569        if specs.is_empty() {
3570            return Ok(());
3571        }
3572        let mut guard = old_l0_arc.write();
3573        for spec in &specs {
3574            let encoder = uni_common::muvera::FdeEncoder::new(&spec.params)
3575                .map_err(|e| anyhow!("MUVERA index '{}': {e}", spec.index_name))?;
3576            // VIDs of this label currently in L0 (collect first to avoid a borrow
3577            // conflict with the per-row mutation below).
3578            let vids: Vec<Vid> = guard
3579                .vertex_labels
3580                .iter()
3581                .filter(|(_, labels)| labels.contains(&spec.label))
3582                .map(|(vid, _)| *vid)
3583                .collect();
3584            for vid in vids {
3585                // Decode the source multi-vector tokens (borrow ends before the mutation).
3586                let tokens = match guard
3587                    .vertex_properties
3588                    .get(&vid)
3589                    .and_then(|p| p.get(&spec.source_prop))
3590                {
3591                    Some(v) => crate::storage::muvera_index::value_to_multivec(v),
3592                    None => continue, // source absent → leave the FDE column NULL
3593                };
3594                // A source token with the wrong dimension makes `encode_doc` error.
3595                // Skipping the row (leaving the FDE column NULL → ranks last under the
3596                // mandatory Dot metric; harmless) keeps one malformed document from
3597                // wedging *every* flush of this label, matching the normal multi-vector
3598                // column path, which null-fills a dimension mismatch rather than failing
3599                // the flush (issue #96).
3600                let fde = match encoder.encode_doc(&tokens) {
3601                    Ok(fde) => fde,
3602                    Err(e) => {
3603                        tracing::warn!(
3604                            index = %spec.index_name,
3605                            vid = ?vid,
3606                            error = %e,
3607                            "muvera.fde.skip_malformed: leaving FDE NULL for a source \
3608                             multi-vector that failed encoding"
3609                        );
3610                        continue;
3611                    }
3612                };
3613                if let Some(props) = guard.vertex_properties.get_mut(&vid) {
3614                    props.insert(spec.derived_col.clone(), Value::Vector(fde));
3615                }
3616                if let Some(touched) = guard.vertex_partial_keys.get_mut(&vid)
3617                    && touched.contains(&spec.source_prop)
3618                {
3619                    touched.insert(spec.derived_col.clone());
3620                }
3621            }
3622        }
3623        Ok(())
3624    }
3625
3626    /// Process embeddings for a batch of vertices efficiently.
3627    ///
3628    /// Groups vertices by embedding config and makes batched API calls to the
3629    /// embedding service instead of calling once per vertex.
3630    ///
3631    /// # Performance
3632    /// For N vertices with embedding config:
3633    /// - Old approach: N API calls to embedding service
3634    /// - New approach: 1 API call per embedding config (usually 1 total)
3635    async fn process_embeddings_for_batch(
3636        &self,
3637        labels: &[String],
3638        properties_batch: &mut [Properties],
3639    ) -> Result<()> {
3640        let Some(label) = labels.first().map(|s| s.as_str()) else {
3641            return Ok(());
3642        };
3643        let schema = self.schema_manager.schema();
3644
3645        // Group auto-embed targets by (alias, source). A group with both a dense Vector and a
3646        // multi-vector List<Vector> column is a single-pass hybrid source: one inference fills
3647        // both. Non-mixed groups use the dense / multi-vector embedder as before.
3648        let groups = collect_embed_groups(&schema, label);
3649        if groups.is_empty() {
3650            return Ok(());
3651        }
3652
3653        for (key, group) in groups {
3654            let alias = &key.0;
3655            let want_dense = !group.dense.is_empty();
3656            let want_multi = !group.multi.is_empty();
3657            let want_sparse = !group.sparse.is_empty();
3658
3659            // A row needs this group's inference if it has the source text and is still missing
3660            // at least one of the group's target columns (user-supplied values are preserved).
3661            let mut input_texts: Vec<String> = Vec::new();
3662            let mut needs: Vec<usize> = Vec::new();
3663            for (idx, properties) in properties_batch.iter().enumerate() {
3664                let all_present = group
3665                    .dense
3666                    .iter()
3667                    .chain(group.multi.iter())
3668                    .chain(group.sparse.iter())
3669                    .all(|t| properties.contains_key(t));
3670                if all_present {
3671                    continue;
3672                }
3673                let mut inputs = Vec::new();
3674                for src in &group.source_properties {
3675                    if let Some(val) = properties.get(src)
3676                        && let Some(s) = val.as_str()
3677                    {
3678                        inputs.push(s.to_string());
3679                    }
3680                }
3681                if inputs.is_empty() {
3682                    continue;
3683                }
3684                let text = inputs.join(" ");
3685                let text = match &group.document_prefix {
3686                    Some(prefix) => format!("{prefix}{text}"),
3687                    None => text,
3688                };
3689                input_texts.push(text);
3690                needs.push(idx);
3691            }
3692            if input_texts.is_empty() {
3693                continue;
3694            }
3695
3696            let runtime = self
3697                .xervo_runtime
3698                .get()
3699                .ok_or_else(|| anyhow!("Uni-Xervo runtime not configured for auto-embedding"))?;
3700            let input_refs: Vec<&str> = input_texts.iter().map(|s| s.as_str()).collect();
3701            let (dense, multi, sparse) = embed_group(
3702                runtime,
3703                alias,
3704                &input_refs,
3705                want_dense,
3706                want_multi,
3707                want_sparse,
3708            )
3709            .await?;
3710
3711            for (i, &row) in needs.iter().enumerate() {
3712                if let Some(vec) = dense.as_ref().and_then(|d| d.get(i)) {
3713                    let vals: Vec<Value> = vec.iter().map(|f| Value::Float(*f as f64)).collect();
3714                    for t in &group.dense {
3715                        if !properties_batch[row].contains_key(t) {
3716                            properties_batch[row].insert(t.clone(), Value::List(vals.clone()));
3717                        }
3718                    }
3719                }
3720                if let Some(tokens) = multi.as_ref().and_then(|m| m.get(i)) {
3721                    let mv = multivec_to_value(tokens);
3722                    for t in &group.multi {
3723                        if !properties_batch[row].contains_key(t) {
3724                            properties_batch[row].insert(t.clone(), mv.clone());
3725                        }
3726                    }
3727                }
3728                if let Some(pairs) = sparse.as_ref().and_then(|s| s.get(i)) {
3729                    let sv = sparse_pairs_to_value(pairs);
3730                    for t in &group.sparse {
3731                        if !properties_batch[row].contains_key(t) {
3732                            properties_batch[row].insert(t.clone(), sv.clone());
3733                        }
3734                    }
3735                }
3736            }
3737        }
3738
3739        Ok(())
3740    }
3741
3742    async fn process_embeddings_impl(
3743        &self,
3744        label_name: Option<&str>,
3745        properties: &mut Properties,
3746    ) -> Result<()> {
3747        let schema = self.schema_manager.schema();
3748
3749        let Some(label) = label_name else {
3750            return Ok(());
3751        };
3752
3753        // Same (alias, source) grouping as the deferred path: a mixed dense + multi-vector
3754        // group is a single-pass hybrid source (one inference fills both columns).
3755        let groups = collect_embed_groups(&schema, label);
3756        if groups.is_empty() {
3757            log::info!("No embedding config found for label {}", label);
3758            return Ok(());
3759        }
3760
3761        for (key, group) in groups {
3762            let alias = &key.0;
3763            // Skip if every target already present (user-supplied values win).
3764            if group
3765                .dense
3766                .iter()
3767                .chain(group.multi.iter())
3768                .chain(group.sparse.iter())
3769                .all(|t| properties.contains_key(t))
3770            {
3771                continue;
3772            }
3773
3774            let mut inputs = Vec::new();
3775            for src in &group.source_properties {
3776                if let Some(val) = properties.get(src)
3777                    && let Some(s) = val.as_str()
3778                {
3779                    inputs.push(s.to_string());
3780                }
3781            }
3782            if inputs.is_empty() {
3783                continue;
3784            }
3785            let text = inputs.join(" ");
3786            let text = match &group.document_prefix {
3787                Some(prefix) => format!("{prefix}{text}"),
3788                None => text,
3789            };
3790
3791            let runtime = self
3792                .xervo_runtime
3793                .get()
3794                .ok_or_else(|| anyhow!("Uni-Xervo runtime not configured for auto-embedding"))?;
3795            let want_dense = !group.dense.is_empty();
3796            let want_multi = !group.multi.is_empty();
3797            let want_sparse = !group.sparse.is_empty();
3798            let (dense, multi, sparse) = embed_group(
3799                runtime,
3800                alias,
3801                &[text.as_str()],
3802                want_dense,
3803                want_multi,
3804                want_sparse,
3805            )
3806            .await?;
3807
3808            if let Some(vec) = dense.as_ref().and_then(|d| d.first()) {
3809                let vals: Vec<Value> = vec.iter().map(|f| Value::Float(*f as f64)).collect();
3810                for t in &group.dense {
3811                    if !properties.contains_key(t) {
3812                        properties.insert(t.clone(), Value::List(vals.clone()));
3813                    }
3814                }
3815            }
3816            if let Some(tokens) = multi.as_ref().and_then(|m| m.first()) {
3817                let mv = multivec_to_value(tokens);
3818                for t in &group.multi {
3819                    if !properties.contains_key(t) {
3820                        properties.insert(t.clone(), mv.clone());
3821                    }
3822                }
3823            }
3824            if let Some(pairs) = sparse.as_ref().and_then(|s| s.first()) {
3825                let sv = sparse_pairs_to_value(pairs);
3826                for t in &group.sparse {
3827                    if !properties.contains_key(t) {
3828                        properties.insert(t.clone(), sv.clone());
3829                    }
3830                }
3831            }
3832        }
3833        Ok(())
3834    }
3835
3836    /// Flushes the current in-memory L0 buffer to L1 storage.
3837    ///
3838    /// # Lock Ordering
3839    ///
3840    /// To prevent deadlocks, locks must be acquired in the following order:
3841    /// 1. `Writer` lock (held by caller via outer `Arc<RwLock<Writer>>`; removed in Phase 4)
3842    /// 2. `flush_lock` (acquired by this entry point; held across the whole flush)
3843    /// 3. `L0Manager` lock (via `begin_flush` / `get_current`)
3844    /// 4. `L0Buffer` lock (individual buffer RWLocks)
3845    /// 5. `Index` / `Storage` locks (during actual flush)
3846    ///
3847    /// Callers that already hold `flush_lock` (today only `commit_transaction_l0`)
3848    /// must call `flush_inline_under_lock` (private) directly to avoid a re-entrant
3849    /// `tokio::sync::Mutex` deadlock — see concurrent_writer.md §5.5.
3850    pub async fn flush_to_l1(&self, name: Option<String>) -> Result<String> {
3851        // Drain any in-flight async flushes first. `flush_to_l1` is a
3852        // SYNCHRONIZATION BARRIER — callers (test fixtures, fork
3853        // setup, shutdown paths) rely on it as "all writes are now
3854        // durably in Lance". Without the drain, an async stream from
3855        // a recent commit might still be writing to Lance when
3856        // `flush_to_l1` returns, leaving a window where forks branch
3857        // off pre-write Lance state and lose data.
3858        if let Some(coord) = self.flush_coordinator.as_ref() {
3859            let _ = coord.drain(self.config.drop_fork_drain_timeout).await;
3860        }
3861        let _flush_lock_guard = self.flush_lock.lock().await;
3862        self.flush_inline_under_lock(name).await
3863    }
3864
3865    /// Flush L0→L1 and capture the fork point under one held `flush_lock`.
3866    ///
3867    /// Drains in-flight async flushes, takes `flush_lock`, runs the inline
3868    /// flush, and then — still holding the lock — reads the allocator
3869    /// high-water marks and each existing candidate dataset's Lance
3870    /// version. Capturing under the held lock is what makes the fork point
3871    /// atomic: no concurrent commit can advance the allocator and no
3872    /// concurrent flush can advance a dataset tip between the flush and the
3873    /// reads. See [`ForkPoint`].
3874    ///
3875    /// `candidate_dataset_names` are resolved to `{base_uri}/{name}.lance`;
3876    /// names with no `.lance` directory on disk are skipped (returned map
3877    /// has no entry for them), matching the fork branch loop's existence
3878    /// check.
3879    ///
3880    /// # Errors
3881    /// Propagates flush failures from `flush_inline_under_lock` and any
3882    /// per-dataset version read failure from `lance_branch::current_version`.
3883    ///
3884    /// # Deadlocks
3885    /// Must not be called by a task already holding `flush_lock` (e.g.
3886    /// `commit_transaction_l0`); the `tokio::sync::Mutex` is not reentrant.
3887    /// Fork creation never holds the lock, so the sole call site is safe.
3888    pub async fn flush_and_capture_fork_point(
3889        &self,
3890        candidate_dataset_names: &[String],
3891    ) -> Result<ForkPoint> {
3892        if let Some(coord) = self.flush_coordinator.as_ref() {
3893            let _ = coord.drain(self.config.drop_fork_drain_timeout).await;
3894        }
3895        let _flush_lock_guard = self.flush_lock.lock().await;
3896        self.flush_inline_under_lock(None).await?;
3897
3898        // Still under `flush_lock`: capture the allocator HWM, the MVCC
3899        // version HWM, and every existing dataset's Lance version so
3900        // nothing can interleave.
3901        let (vid_hwm, eid_hwm) = self.allocator.current_hwm().await;
3902        // The parent's current L0 version is the largest `_version` any
3903        // inherited row can carry (flushed or in-memory). A fork bootstraps
3904        // its version floor to this so a fork tx read still sees inherited
3905        // rows. Cheap read lock; no buffer clone.
3906        let version_hwm = self.l0_manager.get_current().read().current_version;
3907
3908        let base = self.storage.base_uri();
3909        let mut dataset_versions = BTreeMap::new();
3910        for name in candidate_dataset_names {
3911            let uri = join_lance_uri(base, name);
3912            if !lance_path_exists(&uri) {
3913                continue;
3914            }
3915            let version = crate::backend::lance_branch::current_version(&uri).await?;
3916            dataset_versions.insert(name.clone(), version);
3917        }
3918
3919        Ok(ForkPoint {
3920            vid_hwm,
3921            eid_hwm,
3922            dataset_versions,
3923            version_hwm,
3924        })
3925    }
3926
3927    /// Async-flush entry point: rotate under `flush_lock`, release the
3928    /// lock, then submit the stream phase to the [`FlushCoordinator`].
3929    /// Returns a [`FlushTicket`](crate::runtime::flush_coordinator::FlushTicket)
3930    /// that resolves when finalize completes.
3931    ///
3932    /// Errors if `config.async_flush_enabled = false` (the coordinator
3933    /// is `None` in that case — see `flush_coordinator` field doc).
3934    pub async fn flush_to_l1_async(
3935        self: &Arc<Self>,
3936        name: Option<String>,
3937    ) -> Result<crate::runtime::flush_coordinator::FlushTicket> {
3938        let coord = self
3939            .flush_coordinator
3940            .as_ref()
3941            .ok_or_else(|| anyhow!("async flush not enabled (config.async_flush_enabled=false)"))?
3942            .clone();
3943        // 1. Acquire permit FIRST (outside flush_lock) so we don't
3944        //    introduce a permit-while-holding-flush-lock convoy.
3945        let permit = coord.acquire_permit().await?;
3946        // 2. Rotate under flush_lock (µs work), then allocate the rotate seq
3947        //    and bump pending ONLY after the rotate succeeds (Bug #3). A failed
3948        //    rotate (the `?` below) must consume neither: the finalizer
3949        //    advances in strictly consecutive seq order and only decrements
3950        //    pending on finalize, so a leaked seq/pending would wedge it
3951        //    forever. The seq is allocated under `flush_lock`, immediately
3952        //    after the rotate and before the guard drops, so concurrent rotates
3953        //    keep seq order == rotation order, and the seq is unused until
3954        //    submit. On the `?` error path the permit drops, freeing the slot.
3955        let (
3956            RotateOutput {
3957                old_l0_arc,
3958                wal_lsn,
3959                current_version,
3960                flush_in_progress_guard,
3961            },
3962            seq,
3963        ) = {
3964            let _flush_lock_guard = self.flush_lock.lock().await;
3965            let rotate_out = self.flush_l0_rotate().await?;
3966            let seq = coord.next_rotate_seq();
3967            coord.note_pending();
3968            (rotate_out, seq)
3969        };
3970        // 3. Build the coordinator's RotatedFlush. parent_manifest is the
3971        //    cached_manifest snapshot at this moment.
3972        let parent_manifest = self.cached_manifest.lock().clone();
3973        let rotated = RotatedFlush {
3974            seq,
3975            old_l0_arc: old_l0_arc.clone(),
3976            wal_lsn,
3977            current_version,
3978            name: name.clone(),
3979            parent_manifest,
3980            permit,
3981            flush_in_progress_guard,
3982        };
3983        // 4. Spawn the stream phase via the coordinator. The closure
3984        //    captures Arc<Writer> transiently — drops when stream
3985        //    completes (bounded, ~50-500 ms).
3986        let writer = self.clone();
3987        let ticket = coord.submit_for_stream(rotated, move |old_l0, wal, ver, n| async move {
3988            let outcome = writer.flush_stream_l1(old_l0, wal, ver, n).await?;
3989            Ok(crate::runtime::flush_coordinator::FlushOutcome {
3990                new_manifest: outcome.manifest,
3991                snapshot_id: outcome.snapshot_id,
3992            })
3993        });
3994        Ok(ticket)
3995    }
3996
3997    /// Phase A+B+C of the flush: flush the WAL, rotate L0 (so the
3998    /// to-be-flushed buffer moves to `pending_flush` and a fresh L0 takes
3999    /// its place), and hand off the WAL to the new L0.
4000    ///
4001    /// Runs in microseconds. Must be called under `flush_lock` (the caller
4002    /// is responsible). The returned [`RotateOutput`] carries everything
4003    /// the subsequent stream + finalize phases need; in particular the
4004    /// [`FlushInProgressGuard`] is bound to the return value so it stays
4005    /// alive for the full flush lifetime — including any future async
4006    /// path where stream runs on a spawned task.
4007    async fn flush_l0_rotate(&self) -> Result<RotateOutput> {
4008        // Acquire the in-progress counter BEFORE any heavy work. The
4009        // guard lives on RotateOutput; dropping RotateOutput drops the
4010        // guard, so the counter goes back to zero exactly when the flush
4011        // is fully done.
4012        let flush_in_progress_guard = FlushInProgressGuard::new(&self.storage);
4013
4014        // A. Flush WAL BEFORE rotating L0. If WAL flush fails, the
4015        // current L0 is still active and mutations are retained in
4016        // memory until restart/retry.
4017        let wal_for_truncate = {
4018            let current_l0 = self.l0_manager.get_current();
4019            let l0_guard = current_l0.read();
4020            l0_guard.wal.clone()
4021        };
4022        // Test-only seam (no-op without the `failpoints` feature): inject a
4023        // WAL-flush failure here to drive the "failed async rotate wedges the
4024        // finalizer" regression (Bug #3). When configured to "return" it makes
4025        // `flush_l0_rotate` return Err exactly as a real WAL-flush failure would.
4026        fail::fail_point!("flush::rotate-fail", |_| {
4027            Err(anyhow!("flush::rotate-fail injected WAL-flush failure"))
4028        });
4029        let wal_lsn = if let Some(ref w) = wal_for_truncate {
4030            w.flush().await?
4031        } else {
4032            0
4033        };
4034
4035        // B. Begin flush: rotate L0 and keep old L0 visible to reads via
4036        // pending_flush until complete_flush is called by finalize.
4037        let old_l0_arc = self.l0_manager.begin_flush(0, None);
4038        metrics::counter!("uni_l0_buffer_rotations_total").increment(1);
4039
4040        // C. WAL handoff: record wal_lsn on old L0, transfer WAL handle
4041        // and current_version to the new L0.
4042        let current_version;
4043        {
4044            let mut old_l0_guard = old_l0_arc.write();
4045            current_version = old_l0_guard.current_version;
4046            old_l0_guard.wal_lsn_at_flush = wal_lsn;
4047            let wal = old_l0_guard.wal.take();
4048            let new_l0_arc = self.l0_manager.get_current();
4049            let mut new_l0_guard = new_l0_arc.write();
4050            new_l0_guard.wal = wal;
4051            new_l0_guard.current_version = current_version;
4052            // The new active buffer starts accumulating strictly above the
4053            // rotation point: everything <= `wal_lsn` is now owned by the old
4054            // (being-flushed) buffer or earlier. This start watermark is the
4055            // floor that keeps WAL truncation / checkpoint publication from
4056            // discarding this buffer's data if its eventual flush fails.
4057            new_l0_guard.wal_lsn_at_start = wal_lsn;
4058        }
4059
4060        Ok(RotateOutput {
4061            old_l0_arc,
4062            wal_lsn,
4063            current_version,
4064            flush_in_progress_guard,
4065        })
4066    }
4067
4068    /// Phases D, E, F, G of the flush: L1 collect, orphan resolve,
4069    /// manifest seed, Lance writes. Reads from `old_l0_arc` (kept in
4070    /// pending_flush by Phase B); writes append-only Lance datasets; does
4071    /// NOT call save_snapshot / set_latest_snapshot — those are
4072    /// finalize's job, so the manifest doesn't get published until the
4073    /// next phase.
4074    ///
4075    /// Today takes `&self`; in a follow-up commit this becomes a
4076    /// static `Send + 'static` function over `SharedFlushCtx` so it can
4077    /// run on a spawned task while concurrent commits proceed.
4078    async fn flush_stream_l1(
4079        &self,
4080        old_l0_arc: Arc<RwLock<L0Buffer>>,
4081        wal_lsn: u64,
4082        current_version: u64,
4083        name: Option<String>,
4084    ) -> Result<FlushOutcome> {
4085        // Test-only seam (no-op without the `failpoints` feature): the rotate
4086        // (begin_flush) already moved the to-be-flushed buffer onto
4087        // pending_flush and installed a fresh empty current buffer, but the
4088        // rotated rows are NOT yet durable in Lance. Pausing here holds that
4089        // window open to drive the unique-constraint-hole regression
4090        // (Bug #9 Mechanism A).
4091        fail::fail_point!("flush::after-rotate-before-lance");
4092
4093        // Phase B: materialize any deferred embeddings before column
4094        // extraction. No-op when `defer_embeddings` is off (the set will
4095        // be empty). On-demand reads of the embedding column are a TODO
4096        // for a future revision (see UniConfig::defer_embeddings docs).
4097        self.drain_pending_embeddings(&old_l0_arc).await?;
4098
4099        // Materialise MUVERA FDE columns from each row's source multi-vector (pure/sync;
4100        // no-op without a MUVERA index). Runs after embeddings so a row can be both
4101        // auto-embedded and FDE-encoded.
4102        self.materialize_fde_columns(&old_l0_arc)?;
4103
4104        let schema = self.schema_manager.schema();
4105        // 2. Acquire Read lock on Old L0 for flushing
4106        let mut entries_by_type: HashMap<u32, Vec<L1Entry>> = HashMap::new();
4107        // (Vid, labels, properties, deleted, version)
4108        type VertexEntry = (Vid, Vec<String>, Properties, bool, u64);
4109        let mut vertices_by_label: HashMap<u16, Vec<VertexEntry>> = HashMap::new();
4110        // Partial-column updates (Lance MergeInsert path). Per-VID tuple:
4111        // (vid, full L0 properties map, version, set of keys to update).
4112        // Only the keys in the HashSet are emitted to the partial source;
4113        // the full props map is retained so the per-row column extractor
4114        // can read each touched key's value.
4115        type PartialEntry = (Vid, Properties, u64, std::collections::HashSet<String>);
4116        let mut partial_by_label: HashMap<u16, Vec<PartialEntry>> = HashMap::new();
4117        // DELETE-via-MergeInsert (Round-12 §B): tombstones flush as a
4118        // partial source with just `_vid`, `_deleted=true`, `_version`,
4119        // `_updated_at`. Skips the wide-row Append payload that adds
4120        // nothing on a soft-delete.
4121        let mut tombstones_by_label: HashMap<u16, Vec<(Vid, u64)>> = HashMap::new();
4122        let mut main_vertex_tombstones: Vec<(Vid, u64)> = Vec::new();
4123        // Collect vertex timestamps from L0 for flushing to storage
4124        let mut vertex_created_at: HashMap<Vid, i64> = HashMap::new();
4125        let mut vertex_updated_at: HashMap<Vid, i64> = HashMap::new();
4126        // Track tombstones missing labels for storage query fallback
4127        let mut orphaned_tombstones: Vec<(Vid, u64)> = Vec::new();
4128
4129        {
4130            let old_l0 = old_l0_arc.read();
4131
4132            // 1. Collect all edges and tombstones from L0
4133            for edge in old_l0.graph.edges() {
4134                let properties = old_l0
4135                    .edge_properties
4136                    .get(&edge.eid)
4137                    .cloned()
4138                    .unwrap_or_default();
4139                let version = old_l0.edge_versions.get(&edge.eid).copied().unwrap_or(0);
4140
4141                // Get timestamps from L0 buffer (populated during insert)
4142                let created_at = old_l0.edge_created_at.get(&edge.eid).copied();
4143                let updated_at = old_l0.edge_updated_at.get(&edge.eid).copied();
4144
4145                entries_by_type
4146                    .entry(edge.edge_type)
4147                    .or_default()
4148                    .push(L1Entry {
4149                        src_vid: edge.src_vid,
4150                        dst_vid: edge.dst_vid,
4151                        eid: edge.eid,
4152                        op: Op::Insert,
4153                        version,
4154                        properties,
4155                        created_at,
4156                        updated_at,
4157                    });
4158            }
4159
4160            // From tombstones
4161            for tombstone in old_l0.tombstones.values() {
4162                let version = old_l0
4163                    .edge_versions
4164                    .get(&tombstone.eid)
4165                    .copied()
4166                    .unwrap_or(0);
4167                // Get timestamps - for deletes, updated_at reflects deletion time
4168                let created_at = old_l0.edge_created_at.get(&tombstone.eid).copied();
4169                let updated_at = old_l0.edge_updated_at.get(&tombstone.eid).copied();
4170
4171                entries_by_type
4172                    .entry(tombstone.edge_type)
4173                    .or_default()
4174                    .push(L1Entry {
4175                        src_vid: tombstone.src_vid,
4176                        dst_vid: tombstone.dst_vid,
4177                        eid: tombstone.eid,
4178                        op: Op::Delete,
4179                        version,
4180                        properties: HashMap::new(),
4181                        created_at,
4182                        updated_at,
4183                    });
4184            }
4185
4186            // 1b. Collect vertices by label (using vertex_labels from L0)
4187            //
4188            // Helper: fan-out a single vertex entry into per-label buckets.
4189            // Each per-label table row carries the full label set so multi-label
4190            // info is preserved after flush.
4191            let push_vertex_to_labels =
4192                |vid: Vid,
4193                 all_labels: &[String],
4194                 props: Properties,
4195                 deleted: bool,
4196                 version: u64,
4197                 out: &mut HashMap<u16, Vec<VertexEntry>>| {
4198                    for label in all_labels {
4199                        if let Some(label_id) = schema.label_id_by_name(label) {
4200                            out.entry(label_id).or_default().push((
4201                                vid,
4202                                all_labels.to_vec(),
4203                                props.clone(),
4204                                deleted,
4205                                version,
4206                            ));
4207                        }
4208                    }
4209                };
4210
4211            for (vid, props) in &old_l0.vertex_properties {
4212                let version = old_l0.vertex_versions.get(vid).copied().unwrap_or(0);
4213                // Collect timestamps for this vertex
4214                if let Some(&ts) = old_l0.vertex_created_at.get(vid) {
4215                    vertex_created_at.insert(*vid, ts);
4216                }
4217                if let Some(&ts) = old_l0.vertex_updated_at.get(vid) {
4218                    vertex_updated_at.insert(*vid, ts);
4219                }
4220                if let Some(labels) = old_l0.vertex_labels.get(vid) {
4221                    // Partial-write routing: when this VID was last
4222                    // touched via `insert_vertex_partial` AND the
4223                    // partial_lance_writes flag is on, send only the
4224                    // touched columns to a MergeInsert batch. Otherwise
4225                    // (CREATE, MERGE-ON-CREATE, full-replace SET, DELETE
4226                    // — or flag off) use the existing full-row Append.
4227                    let is_partial = self.config.partial_lance_writes
4228                        && old_l0.vertex_partial_keys.contains_key(vid);
4229                    if is_partial {
4230                        if let Some(touched) = old_l0.vertex_partial_keys.get(vid) {
4231                            for label in labels {
4232                                if let Some(label_id) = schema.label_id_by_name(label) {
4233                                    partial_by_label.entry(label_id).or_default().push((
4234                                        *vid,
4235                                        props.clone(),
4236                                        version,
4237                                        touched.clone(),
4238                                    ));
4239                                }
4240                            }
4241                        }
4242                    } else {
4243                        push_vertex_to_labels(
4244                            *vid,
4245                            labels,
4246                            props.clone(),
4247                            false,
4248                            version,
4249                            &mut vertices_by_label,
4250                        );
4251                    }
4252                }
4253            }
4254            for &vid in &old_l0.vertex_tombstones {
4255                let version = old_l0.vertex_versions.get(&vid).copied().unwrap_or(0);
4256                if let Some(&ts) = old_l0.vertex_updated_at.get(&vid) {
4257                    vertex_updated_at.insert(vid, ts);
4258                }
4259                if let Some(labels) = old_l0.vertex_labels.get(&vid) {
4260                    // Round-12 §B: tombstones flush via Lance MergeInsert
4261                    // (just `_vid`, `_deleted=true`, `_version`,
4262                    // `_updated_at`) — skipping the wide-row Append.
4263                    // Unconditional (no `partial_lance_writes` gating);
4264                    // tombstone Append carries no useful payload.
4265                    for label in labels {
4266                        if let Some(label_id) = schema.label_id_by_name(label) {
4267                            tombstones_by_label
4268                                .entry(label_id)
4269                                .or_default()
4270                                .push((vid, version));
4271                        }
4272                    }
4273                } else {
4274                    // Tombstone missing labels (old WAL format) - collect for storage query fallback
4275                    orphaned_tombstones.push((vid, version));
4276                }
4277            }
4278        } // Drop read lock
4279
4280        // Resolve orphaned tombstones (missing labels) from storage
4281        if !orphaned_tombstones.is_empty() {
4282            tracing::warn!(
4283                count = orphaned_tombstones.len(),
4284                "Tombstones missing labels in L0, querying storage as fallback"
4285            );
4286            for (vid, version) in orphaned_tombstones {
4287                if let Ok(Some(labels)) = self.find_vertex_labels_in_storage(vid).await
4288                    && !labels.is_empty()
4289                {
4290                    for label in &labels {
4291                        if let Some(label_id) = schema.label_id_by_name(label) {
4292                            // Round-12 §B: route through partial tombstone too.
4293                            tombstones_by_label
4294                                .entry(label_id)
4295                                .or_default()
4296                                .push((vid, version));
4297                        }
4298                    }
4299                }
4300            }
4301        }
4302
4303        // 1. Load previous snapshot from cache, or fall back to storage.
4304        //
4305        // Use clone() not take(): for the async path, multiple
4306        // concurrent streams may run; if we take() here, a sibling
4307        // stream sees cached_manifest = None and seeds from
4308        // load_latest_snapshot (stale), losing the chain. clone()
4309        // preserves the parent. Finalize writes back the new manifest
4310        // unconditionally.
4311        let mut manifest = if let Some(cached) = self.cached_manifest.lock().clone() {
4312            cached
4313        } else {
4314            self.storage
4315                .snapshot_manager()
4316                .load_latest_snapshot()
4317                .await?
4318                .unwrap_or_else(|| {
4319                    SnapshotManifest::new(Uuid::new_v4().to_string(), schema.schema_version)
4320                })
4321        };
4322
4323        // Update snapshot metadata
4324        // Save parent snapshot ID before generating new one (for lineage tracking)
4325        let parent_id = manifest.snapshot_id.clone();
4326        manifest.parent_snapshot = Some(parent_id);
4327        manifest.snapshot_id = Uuid::new_v4().to_string();
4328        manifest.name = name;
4329        manifest.created_at = Utc::now();
4330        manifest.version_high_water_mark = current_version;
4331        // Cap the published WAL checkpoint at the floor of any OTHER pending
4332        // flush. A still-pending flush (notably one that FAILED and left its
4333        // buffer in `pending_flush`) holds committed WAL entries above its start
4334        // that are NOT in this snapshot; recovery replays from this mark, so
4335        // claiming durability past that floor would skip them (lost commit). A
4336        // normal flush with no other pending buffer keeps `wal_lsn` unchanged.
4337        manifest.wal_high_water_mark = self
4338            .l0_manager
4339            .min_pending_wal_lsn_start(&old_l0_arc)
4340            .map_or(wal_lsn, |floor| floor.min(wal_lsn));
4341        let snapshot_id = manifest.snapshot_id.clone();
4342
4343        tracing::Span::current().record("snapshot_id", &snapshot_id);
4344
4345        // 2. Write main unified tables FIRST (before deltas).
4346        //    Ensures the dual-write invariant: by the time an EID appears in a
4347        //    delta table, it already exists in main_edges. This prevents the
4348        //    compaction debug_assert from firing when compaction interleaves
4349        //    with flush at async yield points.
4350        //
4351        // 2.1 Main edges table
4352        let (main_edges, edge_created_at_map, edge_updated_at_map) = {
4353            let _old_l0 = old_l0_arc.read();
4354            let mut main_edges: Vec<(
4355                uni_common::core::id::Eid,
4356                Vid,
4357                Vid,
4358                String,
4359                Properties,
4360                bool,
4361                u64,
4362            )> = Vec::new();
4363            let mut edge_created_at_map: HashMap<uni_common::core::id::Eid, i64> = HashMap::new();
4364            let mut edge_updated_at_map: HashMap<uni_common::core::id::Eid, i64> = HashMap::new();
4365
4366            for (&edge_type_id, entries) in entries_by_type.iter() {
4367                for entry in entries {
4368                    let edge_type_name = self
4369                        .storage
4370                        .schema_manager()
4371                        .edge_type_name_by_id_unified(edge_type_id)
4372                        .unwrap_or_else(|| "unknown".to_string());
4373
4374                    let deleted = matches!(entry.op, Op::Delete);
4375                    main_edges.push((
4376                        entry.eid,
4377                        entry.src_vid,
4378                        entry.dst_vid,
4379                        edge_type_name,
4380                        entry.properties.clone(),
4381                        deleted,
4382                        entry.version,
4383                    ));
4384
4385                    if let Some(ts) = entry.created_at {
4386                        edge_created_at_map.insert(entry.eid, ts);
4387                    }
4388                    if let Some(ts) = entry.updated_at {
4389                        edge_updated_at_map.insert(entry.eid, ts);
4390                    }
4391                }
4392            }
4393
4394            (main_edges, edge_created_at_map, edge_updated_at_map)
4395        };
4396
4397        if !main_edges.is_empty() {
4398            let main_edge_batch = MainEdgeDataset::build_record_batch(
4399                &main_edges,
4400                Some(&edge_created_at_map),
4401                Some(&edge_updated_at_map),
4402            )?;
4403            MainEdgeDataset::write_batch(self.storage.backend(), main_edge_batch).await?;
4404            MainEdgeDataset::ensure_default_indexes(self.storage.backend()).await?;
4405        }
4406
4407        // 2.2 Main vertices table
4408        let mut main_vertices: Vec<(Vid, Vec<String>, Properties, bool, u64)> = {
4409            let old_l0 = old_l0_arc.read();
4410            let mut vertices = Vec::new();
4411
4412            // Live vertices: full-row Append on the main table (the
4413            // props_json blob is required for global ID lookups). For
4414            // partial-row VIDs (vertex_partial_keys non-empty), the
4415            // main table still needs the full props for the
4416            // ext_id-uniqueness path; we keep the Append here. The
4417            // per-label Lance write IS partial via MergeInsert.
4418            for (vid, props) in &old_l0.vertex_properties {
4419                let version = old_l0.vertex_versions.get(vid).copied().unwrap_or(0);
4420                let labels = old_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
4421                vertices.push((*vid, labels, props.clone(), false, version));
4422            }
4423
4424            // Tombstones: collected into `main_vertex_tombstones` for
4425            // the MergeInsert path below; skipping the wide-row Append.
4426            for &vid in &old_l0.vertex_tombstones {
4427                let version = old_l0.vertex_versions.get(&vid).copied().unwrap_or(0);
4428                main_vertex_tombstones.push((vid, version));
4429            }
4430
4431            vertices
4432        };
4433
4434        // M8: durable label-only mutations across flush windows.
4435        //
4436        // `SET n:Label` / `REMOVE n:Label` mark the vid in
4437        // `vertex_label_overwrites` and update `vertex_labels`, but for a
4438        // vid flushed in a PRIOR window they never re-add it to
4439        // `vertex_properties`. The loops above key off `vertex_properties`,
4440        // so such a relabel would be silently lost: absent from the main
4441        // table, the per-label datasets, and the VidLabelsIndex (and so
4442        // `rebuild_vid_labels_index` reads stale labels after a restart).
4443        // The same-window create+relabel case already works because the
4444        // create put the vid in `vertex_properties`.
4445        //
4446        // Re-derive each overwrite-only vid by fetching its persisted props
4447        // and labels, then route it into `main_vertices` (main table +
4448        // index), the new per-label datasets, and a tombstone in any
4449        // per-label dataset it left. `MATCH (n:OldLabel)` scans the
4450        // per-label table directly, so the old-label tombstone is required.
4451        let overwrite_only: Vec<(Vid, Vec<String>, u64)> = {
4452            let old_l0 = old_l0_arc.read();
4453            old_l0
4454                .vertex_label_overwrites
4455                .iter()
4456                .filter(|vid| {
4457                    !old_l0.vertex_properties.contains_key(*vid)
4458                        && !old_l0.vertex_tombstones.contains(*vid)
4459                })
4460                .map(|vid| {
4461                    let labels = old_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
4462                    let version = old_l0.vertex_versions.get(vid).copied().unwrap_or(0);
4463                    (*vid, labels, version)
4464                })
4465                .collect()
4466        };
4467        for (vid, new_labels, version) in overwrite_only {
4468            // Persisted props of the prior-window row — required so the
4469            // re-Appended main row does not blank the vertex's properties.
4470            let Some(props) = MainVertexDataset::find_props_by_vid(
4471                self.storage.backend(),
4472                vid,
4473                self.storage.version_high_water_mark(),
4474            )
4475            .await?
4476            else {
4477                tracing::warn!(
4478                    vid = vid.as_u64(),
4479                    "label-only mutation for a vid with no persisted main row; skipping flush \
4480                     of its relabel"
4481                );
4482                continue;
4483            };
4484            // Labels the vid carried BEFORE this relabel; the storage read
4485            // reflects pre-flush state. Any label no longer present must be
4486            // tombstoned in its per-label dataset.
4487            let old_labels = self
4488                .find_vertex_labels_in_storage(vid)
4489                .await?
4490                .unwrap_or_default();
4491
4492            main_vertices.push((vid, new_labels.clone(), props.clone(), false, version));
4493            for label in &new_labels {
4494                if let Some(label_id) = schema.label_id_by_name(label) {
4495                    vertices_by_label.entry(label_id).or_default().push((
4496                        vid,
4497                        new_labels.clone(),
4498                        props.clone(),
4499                        false,
4500                        version,
4501                    ));
4502                }
4503            }
4504            for label in &old_labels {
4505                if !new_labels.contains(label)
4506                    && let Some(label_id) = schema.label_id_by_name(label)
4507                {
4508                    tombstones_by_label
4509                        .entry(label_id)
4510                        .or_default()
4511                        .push((vid, version));
4512                }
4513            }
4514        }
4515
4516        if !main_vertices.is_empty() {
4517            let main_vertex_batch = MainVertexDataset::build_record_batch(
4518                &main_vertices,
4519                Some(&vertex_created_at),
4520                Some(&vertex_updated_at),
4521            )?;
4522            MainVertexDataset::write_batch(self.storage.backend(), main_vertex_batch).await?;
4523        }
4524        // Round-12 §B: tombstones via MergeInsert on the main vertices
4525        // table. Independent of `vertex_properties` length.
4526        if !main_vertex_tombstones.is_empty() {
4527            let tomb_batch = MainVertexDataset::build_tombstone_partial_batch(
4528                &main_vertex_tombstones,
4529                Some(&vertex_updated_at),
4530            )?;
4531            MainVertexDataset::merge_insert_tombstone_batch(self.storage.backend(), tomb_batch)
4532                .await?;
4533        }
4534        if !main_vertices.is_empty() || !main_vertex_tombstones.is_empty() {
4535            MainVertexDataset::ensure_default_indexes(self.storage.backend()).await?;
4536        }
4537
4538        // Keep the VidLabelsIndex current for every flushed vertex. This is the
4539        // single place that sees all vertices: the per-label fan-out below skips
4540        // undeclared (schemaless) labels, so updating the index there would miss
4541        // them. Traversal-time label predicates read this index to resolve
4542        // labels for vertices that live only in Lance — notably on a fork, whose
4543        // data is flushed to Lance before branching. (GitHub #99)
4544        for (vid, labels, _props, _deleted, _version) in &main_vertices {
4545            self.storage.update_vid_labels_index(*vid, labels.clone());
4546        }
4547        for (vid, _version) in &main_vertex_tombstones {
4548            self.storage.remove_from_vid_labels_index(*vid);
4549        }
4550
4551        // 3. For each edge type, write FWD and BWD delta runs
4552        for (&edge_type_id, entries) in entries_by_type.iter() {
4553            // Get edge type name from unified lookup (handles both schema'd and schemaless)
4554            let edge_type_name = self
4555                .storage
4556                .schema_manager()
4557                .edge_type_name_by_id_unified(edge_type_id)
4558                .ok_or_else(|| anyhow!("Edge type ID {} not found", edge_type_id))?;
4559
4560            // FWD Run (sorted by src_vid)
4561            // Round-12 §A: split entries into full-row Append and
4562            // partial MergeInsert routes based on `edge_partial_keys`.
4563            // Edges in `edge_partial_keys` were last written via
4564            // `insert_edge_partial_full`; the per-edge-type delta
4565            // tables receive only the touched schema columns plus
4566            // (when any overflow key was touched) the regenerated
4567            // `overflow_json` blob. Untouched columns retain their
4568            // previous-version value via Lance MergeInsert.
4569            let partial_eids: std::collections::HashSet<Eid> = {
4570                let old_l0 = old_l0_arc.read();
4571                entries
4572                    .iter()
4573                    .filter(|e| {
4574                        self.config.partial_lance_writes
4575                            && old_l0.edge_partial_keys.contains_key(&e.eid)
4576                    })
4577                    .map(|e| e.eid)
4578                    .collect()
4579            };
4580            let touched_union_by_eid: HashMap<Eid, std::collections::HashSet<String>> = {
4581                let old_l0 = old_l0_arc.read();
4582                partial_eids
4583                    .iter()
4584                    .filter_map(|eid| old_l0.edge_partial_keys.get(eid).map(|s| (*eid, s.clone())))
4585                    .collect()
4586            };
4587            let (full_entries, partial_entries): (Vec<L1Entry>, Vec<L1Entry>) = entries
4588                .clone()
4589                .into_iter()
4590                .partition(|e| !partial_eids.contains(&e.eid));
4591
4592            let backend = self.storage.backend();
4593
4594            // FWD run (sorted by src_vid)
4595            let mut fwd_full = full_entries.clone();
4596            fwd_full.sort_by_key(|e| e.src_vid);
4597            let mut fwd_partial = partial_entries.clone();
4598            fwd_partial.sort_by_key(|e| e.src_vid);
4599            let fwd_ds = self.storage.delta_dataset(&edge_type_name, "fwd")?;
4600            if !fwd_full.is_empty() {
4601                let fwd_batch = fwd_ds.build_record_batch(&fwd_full, &schema)?;
4602                fwd_ds.write_run(backend, fwd_batch).await?;
4603            }
4604            if !fwd_partial.is_empty() {
4605                let touched_union: std::collections::HashSet<String> = fwd_partial
4606                    .iter()
4607                    .flat_map(|e| {
4608                        touched_union_by_eid
4609                            .get(&e.eid)
4610                            .cloned()
4611                            .unwrap_or_default()
4612                            .into_iter()
4613                    })
4614                    .collect();
4615                let fwd_partial_batch =
4616                    fwd_ds.build_partial_record_batch(&fwd_partial, &touched_union, &schema)?;
4617                fwd_ds
4618                    .merge_insert_partial_run(backend, fwd_partial_batch)
4619                    .await?;
4620            }
4621            fwd_ds.ensure_eid_index(backend).await?;
4622
4623            // BWD Run (sorted by dst_vid)
4624            let mut bwd_full = full_entries.clone();
4625            bwd_full.sort_by_key(|e| e.dst_vid);
4626            let mut bwd_partial = partial_entries.clone();
4627            bwd_partial.sort_by_key(|e| e.dst_vid);
4628            let bwd_ds = self.storage.delta_dataset(&edge_type_name, "bwd")?;
4629            if !bwd_full.is_empty() {
4630                let bwd_batch = bwd_ds.build_record_batch(&bwd_full, &schema)?;
4631                bwd_ds.write_run(backend, bwd_batch).await?;
4632            }
4633            if !bwd_partial.is_empty() {
4634                let touched_union: std::collections::HashSet<String> = bwd_partial
4635                    .iter()
4636                    .flat_map(|e| {
4637                        touched_union_by_eid
4638                            .get(&e.eid)
4639                            .cloned()
4640                            .unwrap_or_default()
4641                            .into_iter()
4642                    })
4643                    .collect();
4644                let bwd_partial_batch =
4645                    bwd_ds.build_partial_record_batch(&bwd_partial, &touched_union, &schema)?;
4646                bwd_ds
4647                    .merge_insert_partial_run(backend, bwd_partial_batch)
4648                    .await?;
4649            }
4650            bwd_ds.ensure_eid_index(backend).await?;
4651
4652            // Update Manifest
4653            let current_snap =
4654                manifest
4655                    .edges
4656                    .entry(edge_type_name.to_string())
4657                    .or_insert(EdgeSnapshot {
4658                        version: 0,
4659                        count: 0,
4660                        lance_version: 0,
4661                    });
4662            current_snap.version += 1;
4663            current_snap.count += entries.len() as u64;
4664            // LanceDB tables don't expose Lance version directly
4665            current_snap.lance_version = 0;
4666
4667            // Note: No CSR invalidation needed. AdjacencyManager's overlay
4668            // already has these edges via dual-write in insert_edge/delete_edge.
4669        }
4670
4671        // 4. Per-label vertex table writes
4672        // Iterate all labels that have either full-row OR partial-write
4673        // data pending. A label may appear in only one of the two maps
4674        // (e.g., all updates on this label were partial-only).
4675        let all_label_ids: std::collections::HashSet<u16> = vertices_by_label
4676            .keys()
4677            .chain(partial_by_label.keys())
4678            .chain(tombstones_by_label.keys())
4679            .copied()
4680            .collect();
4681        for label_id in all_label_ids {
4682            let vertices = vertices_by_label.remove(&label_id).unwrap_or_default();
4683            let label_name = schema
4684                .label_name_by_id(label_id)
4685                .ok_or_else(|| anyhow!("Label ID {} not found", label_id))?;
4686
4687            let ds = self.storage.vertex_dataset(label_name)?;
4688
4689            // Collect inverted index updates before consuming vertices
4690            // Maps: cfg.property -> (added, removed)
4691            type InvertedUpdateMap = HashMap<String, (HashMap<Vid, Vec<String>>, HashSet<Vid>)>;
4692            let mut inverted_updates: InvertedUpdateMap = HashMap::new();
4693
4694            for idx in &schema.indexes {
4695                if let IndexDefinition::Inverted(cfg) = idx
4696                    && cfg.label == label_name
4697                {
4698                    let mut added: HashMap<Vid, Vec<String>> = HashMap::new();
4699                    let mut removed: HashSet<Vid> = HashSet::new();
4700
4701                    for (vid, _labels, props, deleted, _version) in &vertices {
4702                        if *deleted {
4703                            removed.insert(*vid);
4704                        } else if let Some(prop_value) = props.get(&cfg.property) {
4705                            // Extract terms from the property value (List<String>)
4706                            if let Some(arr) = prop_value.as_array() {
4707                                let terms: Vec<String> = arr
4708                                    .iter()
4709                                    .filter_map(|v| v.as_str().map(ToString::to_string))
4710                                    .collect();
4711                                if !terms.is_empty() {
4712                                    added.insert(*vid, terms);
4713                                }
4714                            }
4715                        }
4716                    }
4717                    // Round-12 §B: tombstones no longer in `vertices`;
4718                    // pull them from `tombstones_by_label` for inverted
4719                    // index removal.
4720                    if let Some(tomb_rows) = tombstones_by_label.get(&label_id) {
4721                        for (vid, _) in tomb_rows {
4722                            removed.insert(*vid);
4723                        }
4724                    }
4725
4726                    if !added.is_empty() || !removed.is_empty() {
4727                        inverted_updates.insert(cfg.property.clone(), (added, removed));
4728                    }
4729                }
4730            }
4731
4732            // Collect sparse-vector index updates before consuming vertices.
4733            // Maps: cfg.property -> (added [(term_id, weight)] per vid, removed).
4734            type SparseUpdateMap = HashMap<String, (HashMap<Vid, Vec<(u32, f32)>>, HashSet<Vid>)>;
4735            let mut sparse_updates: SparseUpdateMap = HashMap::new();
4736
4737            for idx in &schema.indexes {
4738                if let IndexDefinition::Sparse(cfg) = idx
4739                    && cfg.label == label_name
4740                {
4741                    let mut added: HashMap<Vid, Vec<(u32, f32)>> = HashMap::new();
4742                    let mut removed: HashSet<Vid> = HashSet::new();
4743
4744                    for (vid, _labels, props, deleted, _version) in &vertices {
4745                        if *deleted {
4746                            removed.insert(*vid);
4747                        } else if let Some(uni_common::Value::SparseVector { indices, values }) =
4748                            props.get(&cfg.property)
4749                        {
4750                            let pairs: Vec<(u32, f32)> = indices
4751                                .iter()
4752                                .copied()
4753                                .zip(values.iter().copied())
4754                                .collect();
4755                            added.insert(*vid, pairs);
4756                            // An in-place SET re-flushes an already-indexed vid. The
4757                            // sparse postings are a `Vec` with no per-vid dedup, so unless
4758                            // the vid is also marked removed, `apply_incremental_updates`
4759                            // appends the new postings *alongside* the stale ones — leaking
4760                            // duplicates that grow unboundedly on hot-updated docs and
4761                            // double-count the advisory `query_topk` score (issue #95).
4762                            // Mark every updated vid removed so its prior postings are
4763                            // purged before the new ones are appended (remove-then-add).
4764                            removed.insert(*vid);
4765                        }
4766                    }
4767                    // Tombstones are not in `vertices`; pull from tombstones_by_label.
4768                    if let Some(tomb_rows) = tombstones_by_label.get(&label_id) {
4769                        for (vid, _) in tomb_rows {
4770                            removed.insert(*vid);
4771                        }
4772                    }
4773
4774                    if !added.is_empty() || !removed.is_empty() {
4775                        sparse_updates.insert(cfg.property.clone(), (added, removed));
4776                    }
4777                }
4778            }
4779
4780            let mut v_data = Vec::new();
4781            let mut d_data = Vec::new();
4782            let mut ver_data = Vec::new();
4783            for (vid, labels, props, deleted, version) in vertices {
4784                v_data.push((vid, labels, props));
4785                d_data.push(deleted);
4786                ver_data.push(version);
4787            }
4788
4789            let backend = self.storage.backend();
4790
4791            // Skip the full-row Append entirely if this label only has
4792            // partial-write rows pending.
4793            if !v_data.is_empty() {
4794                let batch = ds.build_record_batch_with_timestamps(
4795                    &v_data,
4796                    &d_data,
4797                    &ver_data,
4798                    &schema,
4799                    Some(&vertex_created_at),
4800                    Some(&vertex_updated_at),
4801                )?;
4802                ds.write_batch(backend, batch, &schema).await?;
4803            }
4804
4805            // Partial-column batch (Lance MergeInsert path). The flag
4806            // gates whether the routing classified any VIDs as partial;
4807            // outside the flag this collection is always empty so the
4808            // call below is a cheap no-op.
4809            if let Some(partial_rows) = partial_by_label.remove(&label_id)
4810                && !partial_rows.is_empty()
4811            {
4812                let touched_union: std::collections::HashSet<String> = partial_rows
4813                    .iter()
4814                    .flat_map(|(_, _, _, keys)| keys.iter().cloned())
4815                    .collect();
4816                let pairs: Vec<(Vid, Properties)> = partial_rows
4817                    .iter()
4818                    .map(|(vid, props, _, _)| (*vid, props.clone()))
4819                    .collect();
4820                let versions: Vec<u64> = partial_rows.iter().map(|(_, _, v, _)| *v).collect();
4821                let partial_batch = ds.build_partial_record_batch(
4822                    &pairs,
4823                    &versions,
4824                    &touched_union,
4825                    &schema,
4826                    Some(&vertex_updated_at),
4827                )?;
4828                if partial_batch.num_rows() > 0 {
4829                    ds.merge_insert_batch(backend, partial_batch).await?;
4830                }
4831            }
4832
4833            // Tombstone batch (Round-12 §B): always MergeInsert with
4834            // just `_vid`, `_deleted=true`, `_version`, `_updated_at`.
4835            // No partial_lance_writes gating — tombstones never carry
4836            // useful property payload to write. Captured tombstone vids
4837            // also drive `remove_from_vid_labels_index` below.
4838            let tombstone_rows = tombstones_by_label.remove(&label_id).unwrap_or_default();
4839            if !tombstone_rows.is_empty() {
4840                let tomb_batch =
4841                    ds.build_tombstone_partial_batch(&tombstone_rows, Some(&vertex_updated_at))?;
4842                if tomb_batch.num_rows() > 0 {
4843                    ds.merge_insert_batch(backend, tomb_batch).await?;
4844                }
4845            }
4846
4847            ds.ensure_default_indexes(backend).await?;
4848
4849            // VidLabelsIndex maintenance is centralized at the main-vertex
4850            // flush above (it sees both schema'd and schemaless vertices).
4851
4852            // Update Manifest
4853            let current_snap =
4854                manifest
4855                    .vertices
4856                    .entry(label_name.to_string())
4857                    .or_insert(LabelSnapshot {
4858                        version: 0,
4859                        count: 0,
4860                        lance_version: 0,
4861                    });
4862            current_snap.version += 1;
4863            current_snap.count += v_data.len() as u64;
4864            // LanceDB tables don't expose Lance version directly
4865            current_snap.lance_version = 0;
4866
4867            // Invalidate table cache to ensure next read picks up new version
4868            self.storage.invalidate_table_cache(label_name);
4869
4870            // Apply inverted index updates incrementally
4871            #[cfg(feature = "lance-backend")]
4872            for idx in &schema.indexes {
4873                if let IndexDefinition::Inverted(cfg) = idx
4874                    && cfg.label == label_name
4875                    && let Some((added, removed)) = inverted_updates.get(&cfg.property)
4876                {
4877                    self.storage
4878                        .index_manager()
4879                        .update_inverted_index_incremental(cfg, added, removed)
4880                        .await?;
4881                }
4882            }
4883
4884            // Apply sparse-vector index updates incrementally
4885            #[cfg(feature = "lance-backend")]
4886            for idx in &schema.indexes {
4887                if let IndexDefinition::Sparse(cfg) = idx
4888                    && cfg.label == label_name
4889                    && let Some((added, removed)) = sparse_updates.get(&cfg.property)
4890                {
4891                    self.storage
4892                        .index_manager()
4893                        .update_sparse_vector_index_incremental(cfg, added, removed)
4894                        .await?;
4895                }
4896            }
4897
4898            // Update UID index with new vertex mappings
4899            // Collect (UniId, Vid) mappings from non-deleted vertices
4900            #[cfg(feature = "lance-backend")]
4901            {
4902                let mut uid_mappings: Vec<(uni_common::core::id::UniId, Vid)> = Vec::new();
4903                for (vid, _labels, props) in &v_data {
4904                    let ext_id = props.get("ext_id").and_then(|v| v.as_str());
4905                    let uid = crate::storage::vertex::VertexDataset::compute_vertex_uid(
4906                        label_name, ext_id, props,
4907                    );
4908                    uid_mappings.push((uid, *vid));
4909                }
4910
4911                if !uid_mappings.is_empty()
4912                    && let Ok(uid_index) = self.storage.uid_index(label_name)
4913                {
4914                    // Stamp mappings with this flush's MVCC version so a later
4915                    // re-create of the same UID deterministically outranks the
4916                    // stale mapping (review C3).
4917                    uid_index
4918                        .write_mapping_versioned(&uid_mappings, current_version)
4919                        .await?;
4920                }
4921            }
4922        }
4923        Ok(FlushOutcome {
4924            manifest,
4925            snapshot_id,
4926        })
4927    }
4928
4929    /// Composition entry that assumes the caller already holds `flush_lock`.
4930    /// Runs rotate + stream + finalize_locked in sequence. Used by
4931    /// [`Writer::flush_to_l1`] (acquires the lock first) and by
4932    /// `commit_transaction_l0`'s post-merge auto-flush branch (which already
4933    /// holds the lock from the commit critical section).
4934    #[instrument(
4935        skip(self),
4936        fields(snapshot_id, mutations_count, size_bytes),
4937        level = "info"
4938    )]
4939    async fn flush_inline_under_lock(&self, name: Option<String>) -> Result<String> {
4940        let start = std::time::Instant::now();
4941
4942        let (initial_size, initial_count) = {
4943            let l0_arc = self.l0_manager.get_current();
4944            let l0 = l0_arc.read();
4945            (l0.estimated_size, l0.mutation_count)
4946        };
4947        tracing::Span::current().record("size_bytes", initial_size);
4948        tracing::Span::current().record("mutations_count", initial_count);
4949
4950        debug!("Starting L0 flush to L1");
4951
4952        // Phases A (WAL pre-flush), B (rotate), C (WAL handoff).
4953        // FlushInProgressGuard lives on RotateOutput and stays alive for
4954        // the full flush — including the finalize_locked call below.
4955        let RotateOutput {
4956            old_l0_arc,
4957            wal_lsn,
4958            current_version,
4959            flush_in_progress_guard: _flush_guard,
4960        } = self.flush_l0_rotate().await?;
4961
4962        // Phases D (L1 collect), E (orphan resolve), F (manifest seed),
4963        // G (Lance writes). Builds the manifest but does NOT publish it.
4964        let FlushOutcome {
4965            manifest,
4966            snapshot_id,
4967        } = self
4968            .flush_stream_l1(old_l0_arc.clone(), wal_lsn, current_version, name)
4969            .await?;
4970
4971        // Phases H..S: publish manifest, complete_flush, WAL truncate,
4972        // property cache clear, last_flush_time, metrics, l1_runs++,
4973        // compaction trigger, index-rebuild scheduling, fork tick.
4974        self.flush_finalize_locked(
4975            old_l0_arc,
4976            wal_lsn,
4977            manifest,
4978            snapshot_id,
4979            initial_size,
4980            initial_count,
4981            start,
4982        )
4983        .await
4984    }
4985
4986    /// Phases H..S of the flush: publish the manifest and run all
4987    /// post-publish bookkeeping. Assumes the caller already holds
4988    /// `flush_lock` — see [`Writer::flush_finalize_now`] for the
4989    /// lock-acquiring variant used by the async finalize path.
4990    #[allow(clippy::too_many_arguments)]
4991    async fn flush_finalize_locked(
4992        &self,
4993        old_l0_arc: Arc<RwLock<L0Buffer>>,
4994        wal_lsn: u64,
4995        manifest: SnapshotManifest,
4996        snapshot_id: String,
4997        initial_size: usize,
4998        initial_count: usize,
4999        start: std::time::Instant,
5000    ) -> Result<String> {
5001        Self::flush_finalize_body(
5002            &self.shared_ctx(),
5003            old_l0_arc,
5004            wal_lsn,
5005            manifest,
5006            snapshot_id,
5007            initial_size,
5008            initial_count,
5009            start,
5010        )
5011        .await
5012    }
5013
5014    /// Phases H..S of the flush, lock-acquiring variant. Used by the
5015    /// async-flush finalizer task (running on a spawned tokio task),
5016    /// which holds neither `&self` nor `flush_lock`. Briefly re-acquires
5017    /// `flush_lock` to serialize the publish boundary, then runs the
5018    /// same body as `flush_finalize_locked` but over a SharedFlushCtx.
5019    #[allow(clippy::too_many_arguments)]
5020    pub(crate) async fn flush_finalize_now(
5021        shared: SharedFlushCtx,
5022        old_l0_arc: Arc<RwLock<L0Buffer>>,
5023        wal_lsn: u64,
5024        manifest: SnapshotManifest,
5025        snapshot_id: String,
5026        initial_size: usize,
5027        initial_count: usize,
5028        start: std::time::Instant,
5029    ) -> Result<String> {
5030        let _flush_lock_guard = shared.flush_lock.clone().lock_owned().await;
5031        Self::flush_finalize_body(
5032            &shared,
5033            old_l0_arc,
5034            wal_lsn,
5035            manifest,
5036            snapshot_id,
5037            initial_size,
5038            initial_count,
5039            start,
5040        )
5041        .await
5042    }
5043
5044    /// Shared body of `flush_finalize_locked` and `flush_finalize_now`.
5045    /// Static over `SharedFlushCtx`; the caller is responsible for
5046    /// holding `flush_lock`.
5047    #[allow(clippy::too_many_arguments)]
5048    async fn flush_finalize_body(
5049        shared: &SharedFlushCtx,
5050        old_l0_arc: Arc<RwLock<L0Buffer>>,
5051        wal_lsn: u64,
5052        mut manifest: SnapshotManifest,
5053        snapshot_id: String,
5054        initial_size: usize,
5055        initial_count: usize,
5056        start: std::time::Instant,
5057    ) -> Result<String> {
5058        // Parent-snapshot fixup. The stream phase built `manifest` with
5059        // parent_snapshot set from cached_manifest at stream time. If
5060        // OTHER flushes (sync or async) have finalized since then,
5061        // cached_manifest has advanced. Re-link this manifest to the
5062        // current cached chain so we don't orphan their data when we
5063        // overwrite cached_manifest below.
5064        let current_parent_id = shared
5065            .cached_manifest
5066            .lock()
5067            .as_ref()
5068            .map(|m| m.snapshot_id.clone());
5069        if current_parent_id.is_some() && manifest.parent_snapshot != current_parent_id {
5070            manifest.parent_snapshot = current_parent_id;
5071            metrics::counter!("uni_flush_parent_chain_fixups_total").increment(1);
5072        }
5073
5074        // H. Publish manifest (body first, then pointer — recovery is
5075        // idempotent if we crash between the two).
5076        // A fork writer must publish to a fork-scoped namespace, never the
5077        // global `catalog/latest` that the primary reopen reads (review C1).
5078        debug_assert_eq!(
5079            shared.fork_id.is_some(),
5080            shared.storage.snapshot_manager().is_fork_scoped(),
5081            "fork writer must publish to a fork-scoped snapshot namespace (review C1)"
5082        );
5083        shared
5084            .storage
5085            .snapshot_manager()
5086            .save_snapshot(&manifest)
5087            .await?;
5088        shared
5089            .storage
5090            .snapshot_manager()
5091            .set_latest_snapshot(&manifest.snapshot_id)
5092            .await?;
5093
5094        // H2. Durability barrier (review C4). `save_snapshot` / `set_latest_snapshot`
5095        // wrote the manifest body and the `catalog/latest` pointer through the
5096        // object store, which does NOT fsync. WAL truncation (K, below) removes
5097        // the only other durable copy of this flush's data, so a crash after K
5098        // but before the OS flushed those writes would lose the snapshot —
5099        // recovery could not resolve `latest`. Make them durable now (local-fs
5100        // only; remote stores provide their own durability on `put`).
5101        crate::snapshot::manager::fsync_snapshot_pointer(
5102            shared.storage.local_fs_root().as_deref(),
5103            shared.fork_id.as_ref(),
5104            &manifest.snapshot_id,
5105        )
5106        .map_err(|e| {
5107            anyhow!(
5108                "fsync snapshot {} before WAL truncate: {}",
5109                manifest.snapshot_id,
5110                e
5111            )
5112        })?;
5113
5114        // I. Cache manifest for next flush to avoid re-reading from object store.
5115        *shared.cached_manifest.lock() = Some(manifest.clone());
5116
5117        // L. Invalidate the property cache BEFORE removing the flushed buffer
5118        // from the L0 chain (Bug #10). `clear_cache` has no dependency on the
5119        // complete_flush (J) / WAL-truncate (K) steps below, so clearing it
5120        // first closes the non-monotonic-read window: once the buffer leaves
5121        // the L0 chain at J a freshly-written value would otherwise miss the
5122        // chain and fall through to a stale cache entry. By the time finalize
5123        // runs the streamed rows are already durable in L1, so a post-clear
5124        // read falls through to fresh storage instead. The finalizer holds
5125        // `flush_lock` throughout, so reordering L ahead of J is safe.
5126        if let Some(ref pm) = shared.property_manager {
5127            pm.clear_cache().await;
5128        }
5129
5130        // J. Complete flush: remove old L0 from pending_flush. MUST happen
5131        // BEFORE WAL truncation so min_pending_wal_lsn is accurate.
5132        shared.l0_manager.complete_flush(&old_l0_arc);
5133
5134        // Test-only seam (no-op without the `failpoints` feature): pause AFTER
5135        // complete_flush removed the buffer from the L0 chain (J) but BEFORE
5136        // WAL truncation (K). The property cache is already cleared (L moved
5137        // ahead of J above), so a read in this window falls through to fresh
5138        // L1 storage rather than a stale cache entry (Bug #10 — non-monotonic
5139        // read after flush finalize).
5140        fail::fail_point!("flush::after-complete-before-cache-clear");
5141
5142        // K. Truncate WAL up to the safe LSN. The floor is the START watermark
5143        // of any OTHER pending flush (this flush's own buffer was removed from
5144        // pending by `complete_flush` in J, so it is excluded): a pending — e.g.
5145        // failed — flush's committed entries live above its start and are not yet
5146        // in L1, so truncating to its high watermark would delete its own data
5147        // (the lost-commit-on-graceful-close bug).
5148        let wal_handle = shared.l0_manager.get_current().read().wal.clone();
5149        if let Some(w) = wal_handle {
5150            let safe_lsn = shared
5151                .l0_manager
5152                .min_pending_wal_lsn_start(&old_l0_arc)
5153                .map_or(wal_lsn, |floor| floor.min(wal_lsn));
5154            w.truncate_before(safe_lsn).await?;
5155        }
5156
5157        // M. Reset last flush time for time-based auto-flush.
5158        *shared.last_flush_time.lock() = std::time::Instant::now();
5159
5160        info!(
5161            snapshot_id,
5162            mutations_count = initial_count,
5163            size_bytes = initial_size,
5164            "L0 flush to L1 completed successfully"
5165        );
5166        metrics::histogram!("uni_flush_duration_seconds").record(start.elapsed().as_secs_f64());
5167        metrics::counter!("uni_flush_bytes_total").increment(initial_size as u64);
5168        metrics::counter!("uni_flush_rows_total").increment(initial_count as u64);
5169
5170        // P. Increment flush generation counter for write throttling.
5171        {
5172            let mut status = uni_common::sync::acquire_mutex(
5173                &shared.storage.compaction_status,
5174                "compaction_status",
5175            )?;
5176            status.l1_runs += 1;
5177        }
5178
5179        // Q. Trigger CSR compaction if enough frozen segments have accumulated.
5180        let am = shared.adjacency_manager.clone();
5181        if am.should_compact(shared.compaction_config.frozen_segments_compact_threshold) {
5182            let previous_still_running = {
5183                let guard = shared.compaction_handle.read();
5184                guard.as_ref().is_some_and(|h| !h.is_finished())
5185            };
5186            if previous_still_running {
5187                info!("Skipping compaction: previous compaction still in progress");
5188            } else {
5189                let handle = tokio::spawn(async move {
5190                    am.compact();
5191                });
5192                *shared.compaction_handle.write() = Some(handle);
5193            }
5194        }
5195
5196        // R. Post-flush: check if any indexes need rebuilding based on thresholds.
5197        if shared.auto_rebuild_enabled
5198            && let Some(rebuild_mgr) = shared.index_rebuild_manager.get()
5199        {
5200            Self::schedule_index_rebuilds_if_needed_static(
5201                &manifest,
5202                rebuild_mgr.clone(),
5203                shared.schema_manager.clone(),
5204                shared.index_rebuild_config.clone(),
5205            );
5206        }
5207
5208        // S. Emit fork-fragment observability after a successful forked flush.
5209        Self::tick_fork_fragment_observability_static(
5210            shared.fork_id,
5211            shared.fork_flush_count.clone(),
5212            shared.fork_fragment_warn_fired.clone(),
5213            shared.fork_fragment_warn_threshold,
5214        );
5215
5216        Ok(snapshot_id)
5217    }
5218
5219    /// Increment fork-flush bookkeeping and fire the fragment warn
5220    /// once if the threshold is crossed.
5221    ///
5222    /// Each flush typically appends ~1 fragment per touched dataset on
5223    /// the fork's branches; without compaction (deferred to Phase 5)
5224    /// long-lived heavy-write forks degrade. The flush count is a
5225    /// proxy for actual fragment growth — reading
5226    /// `Dataset::manifest().fragments.len()` per dataset would add a
5227    /// per-flush object-store roundtrip on the hot commit path, which
5228    /// is too costly for a purely observational guard rail.
5229    ///
5230    /// No-op for primary writers (`fork_id == None`).
5231    #[allow(dead_code)] // called by tests; production path uses _static
5232    pub(crate) fn tick_fork_fragment_observability(&self) {
5233        Self::tick_fork_fragment_observability_static(
5234            self.fork_id,
5235            self.fork_flush_count.clone(),
5236            self.fork_fragment_warn_fired.clone(),
5237            self.config.fork_fragment_warn_threshold,
5238        );
5239    }
5240
5241    /// Static variant of [`Writer::tick_fork_fragment_observability`].
5242    /// Used by the async-flush finalize path, where we hold a
5243    /// [`SharedFlushCtx`] bundle of Arcs rather than `&Writer`.
5244    pub(crate) fn tick_fork_fragment_observability_static(
5245        fork_id: Option<ForkId>,
5246        fork_flush_count: Arc<AtomicU64>,
5247        fork_fragment_warn_fired: Arc<AtomicBool>,
5248        warn_threshold: usize,
5249    ) {
5250        let Some(fork_id) = fork_id else { return };
5251        // `Relaxed` is sufficient: observational counter, no synchronizes-with.
5252        let new_count = fork_flush_count.fetch_add(1, Ordering::Relaxed) + 1;
5253        let fork_label = fork_id.to_string();
5254        metrics::gauge!(
5255            "uni_fork_l1_flushes",
5256            "fork" => fork_label.clone(),
5257        )
5258        .set(new_count as f64);
5259        let threshold = warn_threshold as u64;
5260        if !fork_fragment_warn_fired.load(Ordering::Relaxed)
5261            && threshold > 0
5262            && new_count >= threshold
5263        {
5264            fork_fragment_warn_fired.store(true, Ordering::Relaxed);
5265            tracing::warn!(
5266                fork = %fork_label,
5267                flush_count = new_count,
5268                threshold,
5269                "fork has exceeded the L1 flush-count threshold; \
5270                 fork compaction is deferred to Phase 5 — consider \
5271                 drop+recreate or promotion to bound fragment growth"
5272            );
5273        }
5274    }
5275
5276    /// Check rebuild thresholds and schedule background index rebuilds for
5277    /// labels that exceed growth or age limits. Marks affected indexes as
5278    /// `Stale` and spawns an async task to schedule the rebuild.
5279    #[allow(dead_code)] // production path uses _static; kept as the
5280    // documented instance entry point.
5281    fn schedule_index_rebuilds_if_needed(
5282        &self,
5283        manifest: &SnapshotManifest,
5284        rebuild_mgr: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
5285    ) {
5286        Self::schedule_index_rebuilds_if_needed_static(
5287            manifest,
5288            rebuild_mgr,
5289            self.schema_manager.clone(),
5290            self.config.index_rebuild.clone(),
5291        );
5292    }
5293
5294    /// Static variant of [`Writer::schedule_index_rebuilds_if_needed`].
5295    /// Used by the async-flush finalize path, where we hold the
5296    /// [`SchemaManager`] via `SharedFlushCtx` rather than `&Writer`.
5297    pub(crate) fn schedule_index_rebuilds_if_needed_static(
5298        manifest: &SnapshotManifest,
5299        rebuild_mgr: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
5300        schema_manager: Arc<uni_common::core::schema::SchemaManager>,
5301        index_rebuild_config: uni_common::config::IndexRebuildConfig,
5302    ) {
5303        let checker =
5304            crate::storage::index_rebuild::RebuildTriggerChecker::new(index_rebuild_config);
5305        let schema = schema_manager.schema();
5306        let labels = checker.labels_needing_rebuild(manifest, &schema.indexes);
5307
5308        if labels.is_empty() {
5309            return;
5310        }
5311
5312        // Mark affected indexes as Stale
5313        for label in &labels {
5314            for idx in &schema.indexes {
5315                if idx.label() == label {
5316                    let _ = schema_manager.update_index_metadata(idx.name(), |m| {
5317                        m.status = uni_common::core::schema::IndexStatus::Stale;
5318                    });
5319                }
5320            }
5321        }
5322
5323        tokio::spawn(async move {
5324            if let Err(e) = rebuild_mgr.schedule(labels).await {
5325                tracing::warn!("Failed to schedule index rebuild: {e}");
5326            }
5327        });
5328    }
5329}
5330
5331/// `FinalizeFn` implementation that the `FlushCoordinator` invokes from
5332/// its single-task finalizer loop. Unit struct on purpose: it must NOT
5333/// hold `Arc<Writer>` (that would create a reference cycle Writer ->
5334/// FlushCoordinator -> Arc<dyn FinalizeFn> -> Writer). All state needed
5335/// for finalize travels in via `SharedFlushCtx`.
5336pub(crate) struct WriterFinalizer;
5337
5338impl FinalizeFn for WriterFinalizer {
5339    fn finalize<'a>(
5340        &'a self,
5341        rotated: RotatedFlush,
5342        outcome: AsyncFlushOutcome,
5343        shared: SharedFlushCtx,
5344    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<String>> + Send + 'a>> {
5345        Box::pin(async move {
5346            // Read initial_size / initial_count from the rotated L0 so
5347            // we don't have to plumb them through the coordinator
5348            // submission. The buffer is still alive in pending_flush
5349            // until `complete_flush` (J) below pops it.
5350            let (initial_size, initial_count) = {
5351                let l0 = rotated.old_l0_arc.read();
5352                (l0.estimated_size, l0.mutation_count)
5353            };
5354            let result = Writer::flush_finalize_now(
5355                shared,
5356                rotated.old_l0_arc.clone(),
5357                rotated.wal_lsn,
5358                outcome.new_manifest,
5359                outcome.snapshot_id,
5360                initial_size,
5361                initial_count,
5362                std::time::Instant::now(),
5363            )
5364            .await;
5365            // `rotated` (permit + flush_in_progress_guard) drops here.
5366            drop(rotated.permit);
5367            result
5368        })
5369    }
5370
5371    fn finalize_failure<'a>(
5372        &'a self,
5373        rotated: RotatedFlush,
5374        err: anyhow::Error,
5375        _shared: SharedFlushCtx,
5376    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Error> + Send + 'a>> {
5377        Box::pin(async move {
5378            tracing::warn!(
5379                error = %err,
5380                seq = rotated.seq,
5381                "async flush stream failed; old L0 remains in pending_flush, \
5382                 WAL retains its data, recovery via WAL replay on restart"
5383            );
5384            metrics::counter!("uni_flush_failures_total").increment(1);
5385            // Permit + guard drop here so back-pressure releases even on
5386            // failure.
5387            drop(rotated.permit);
5388            err
5389        })
5390    }
5391}
5392
5393#[cfg(test)]
5394mod tests {
5395    use super::*;
5396    use tempfile::tempdir;
5397
5398    /// Test that commit_transaction writes mutations to WAL before merging to main L0.
5399    /// This verifies fix for issue #137 (transaction commit atomicity).
5400    #[tokio::test]
5401    async fn test_commit_transaction_wal_before_merge() -> Result<()> {
5402        use crate::runtime::wal::WriteAheadLog;
5403        use crate::storage::manager::StorageManager;
5404        use object_store::local::LocalFileSystem;
5405        use object_store::path::Path as ObjectStorePath;
5406        use uni_common::core::schema::SchemaManager;
5407
5408        let dir = tempdir()?;
5409        let path = dir.path().to_str().unwrap();
5410        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5411        let schema_path = ObjectStorePath::from("schema.json");
5412
5413        let schema_manager =
5414            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5415        let _label_id = schema_manager.add_label("Test")?;
5416        schema_manager.save().await?;
5417
5418        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5419
5420        // Create WAL for main L0
5421        let wal_path = ObjectStorePath::from("wal");
5422        let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
5423
5424        let writer = Writer::new_with_config(
5425            storage.clone(),
5426            schema_manager.clone(),
5427            1,
5428            UniConfig::default(),
5429            Some(wal),
5430            None,
5431        )
5432        .await?;
5433
5434        // Begin transaction — create a transaction L0
5435        let tx_l0 = writer.create_transaction_l0();
5436
5437        // Insert data in transaction
5438        let vid_a = writer.next_vid().await?;
5439        let vid_b = writer.next_vid().await?;
5440
5441        let mut props = std::collections::HashMap::new();
5442        props.insert("test".to_string(), Value::String("data".to_string()));
5443
5444        writer
5445            .insert_vertex_with_labels(vid_a, props.clone(), &["Test".to_string()], Some(&tx_l0))
5446            .await?;
5447        writer
5448            .insert_vertex_with_labels(
5449                vid_b,
5450                std::collections::HashMap::new(),
5451                &["Test".to_string()],
5452                Some(&tx_l0),
5453            )
5454            .await?;
5455
5456        let eid = writer.next_eid(1).await?;
5457        writer
5458            .insert_edge(
5459                vid_a,
5460                vid_b,
5461                1,
5462                eid,
5463                std::collections::HashMap::new(),
5464                None,
5465                Some(&tx_l0),
5466            )
5467            .await?;
5468
5469        // Get WAL before commit
5470        let l0 = writer.l0_manager.get_current();
5471        let wal = l0.read().wal.clone().expect("Main L0 should have WAL");
5472        let mutations_before = wal.replay().await?;
5473        let count_before = mutations_before.len();
5474
5475        // Commit transaction - this should write to WAL first
5476        let writer = Arc::new(writer);
5477        writer.commit_transaction_l0(tx_l0).await?;
5478
5479        // Verify WAL has the new mutations
5480        let mutations_after = wal.replay().await?;
5481        assert!(
5482            mutations_after.len() > count_before,
5483            "WAL should contain transaction mutations after commit"
5484        );
5485
5486        // Verify mutations are in correct order: vertices first, then edges
5487        let new_mutations: Vec<_> = mutations_after.into_iter().skip(count_before).collect();
5488
5489        let mut saw_vertex_a = false;
5490        let mut saw_vertex_b = false;
5491        let mut saw_edge = false;
5492
5493        for mutation in &new_mutations {
5494            match mutation {
5495                crate::runtime::wal::Mutation::InsertVertex { vid, .. } => {
5496                    if *vid == vid_a {
5497                        saw_vertex_a = true;
5498                    }
5499                    if *vid == vid_b {
5500                        saw_vertex_b = true;
5501                    }
5502                    // Vertices should come before edges
5503                    assert!(!saw_edge, "Vertices should be logged to WAL before edges");
5504                }
5505                crate::runtime::wal::Mutation::InsertEdge { eid: e, .. } => {
5506                    if *e == eid {
5507                        saw_edge = true;
5508                    }
5509                    // Edges should come after vertices
5510                    assert!(
5511                        saw_vertex_a && saw_vertex_b,
5512                        "Edge should be logged after both vertices"
5513                    );
5514                }
5515                _ => {}
5516            }
5517        }
5518
5519        assert!(saw_vertex_a, "Vertex A should be in WAL");
5520        assert!(saw_vertex_b, "Vertex B should be in WAL");
5521        assert!(saw_edge, "Edge should be in WAL");
5522
5523        // Verify data is also in main L0
5524        let l0_read = l0.read();
5525        assert!(
5526            l0_read.vertex_properties.contains_key(&vid_a),
5527            "Vertex A should be in main L0"
5528        );
5529        assert!(
5530            l0_read.vertex_properties.contains_key(&vid_b),
5531            "Vertex B should be in main L0"
5532        );
5533        assert!(
5534            l0_read.edge_endpoints.contains_key(&eid),
5535            "Edge should be in main L0"
5536        );
5537
5538        Ok(())
5539    }
5540
5541    /// Test that failed WAL flush leaves transaction intact for retry or rollback.
5542    #[tokio::test]
5543    async fn test_commit_transaction_wal_failure_rollback() -> Result<()> {
5544        use crate::runtime::wal::WriteAheadLog;
5545        use crate::storage::manager::StorageManager;
5546        use object_store::local::LocalFileSystem;
5547        use object_store::path::Path as ObjectStorePath;
5548        use uni_common::core::schema::SchemaManager;
5549
5550        let dir = tempdir()?;
5551        let path = dir.path().to_str().unwrap();
5552        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5553        let schema_path = ObjectStorePath::from("schema.json");
5554
5555        let schema_manager =
5556            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5557        let _label_id = schema_manager.add_label("Test")?;
5558        let _baseline_label_id = schema_manager.add_label("Baseline")?;
5559        let _txdata_label_id = schema_manager.add_label("TxData")?;
5560        schema_manager.save().await?;
5561
5562        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5563
5564        // Create WAL for main L0
5565        let wal_path = ObjectStorePath::from("wal");
5566        let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
5567
5568        let writer = Writer::new_with_config(
5569            storage.clone(),
5570            schema_manager.clone(),
5571            1,
5572            UniConfig::default(),
5573            Some(wal),
5574            None,
5575        )
5576        .await?;
5577
5578        // Insert baseline data (outside transaction)
5579        let baseline_vid = writer.next_vid().await?;
5580        writer
5581            .insert_vertex_with_labels(
5582                baseline_vid,
5583                [("baseline".to_string(), Value::Bool(true))]
5584                    .into_iter()
5585                    .collect(),
5586                &["Baseline".to_string()],
5587                None,
5588            )
5589            .await?;
5590
5591        // Begin transaction — create a transaction L0
5592        let tx_l0 = writer.create_transaction_l0();
5593
5594        // Insert data in transaction
5595        let tx_vid = writer.next_vid().await?;
5596        writer
5597            .insert_vertex_with_labels(
5598                tx_vid,
5599                [("tx_data".to_string(), Value::Bool(true))]
5600                    .into_iter()
5601                    .collect(),
5602                &["TxData".to_string()],
5603                Some(&tx_l0),
5604            )
5605            .await?;
5606
5607        // Capture main L0 state before rollback
5608        let l0 = writer.l0_manager.get_current();
5609        let vertex_count_before = l0.read().vertex_properties.len();
5610
5611        // Rollback transaction (simulating what would happen after WAL flush failure)
5612        drop(tx_l0);
5613
5614        // Verify main L0 is unchanged
5615        let vertex_count_after = l0.read().vertex_properties.len();
5616        assert_eq!(
5617            vertex_count_before, vertex_count_after,
5618            "Main L0 should not change after rollback"
5619        );
5620
5621        // Baseline should still be present
5622        assert!(
5623            l0.read().vertex_properties.contains_key(&baseline_vid),
5624            "Baseline data should remain"
5625        );
5626
5627        // Transaction data should NOT be in main L0
5628        assert!(
5629            !l0.read().vertex_properties.contains_key(&tx_vid),
5630            "Transaction data should not be in main L0 after rollback"
5631        );
5632
5633        Ok(())
5634    }
5635
5636    /// Test that batch insert with shared labels does not clone labels per vertex.
5637    /// This verifies fix for issue #161 (redundant label cloning).
5638    #[tokio::test]
5639    async fn test_batch_insert_shared_labels() -> Result<()> {
5640        use crate::storage::manager::StorageManager;
5641        use object_store::local::LocalFileSystem;
5642        use object_store::path::Path as ObjectStorePath;
5643        use uni_common::core::schema::SchemaManager;
5644
5645        let dir = tempdir()?;
5646        let path = dir.path().to_str().unwrap();
5647        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5648        let schema_path = ObjectStorePath::from("schema.json");
5649
5650        let schema_manager =
5651            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5652        let _label_id = schema_manager.add_label("Person")?;
5653        schema_manager.save().await?;
5654
5655        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5656
5657        let writer = Writer::new(storage.clone(), schema_manager.clone(), 1).await?;
5658
5659        // Shared labels - should not be cloned per vertex
5660        let labels = &["Person".to_string()];
5661
5662        // Insert batch of vertices with same labels
5663        let mut vids = Vec::new();
5664        for i in 0..100 {
5665            let vid = writer.next_vid().await?;
5666            let mut props = std::collections::HashMap::new();
5667            props.insert("id".to_string(), Value::Int(i));
5668            writer
5669                .insert_vertex_with_labels(vid, props, labels, None)
5670                .await?;
5671            vids.push(vid);
5672        }
5673
5674        // Verify all vertices have the correct labels
5675        let l0 = writer.l0_manager.get_current();
5676        for vid in vids {
5677            let l0_guard = l0.read();
5678            let vertex_labels = l0_guard.vertex_labels.get(&vid);
5679            assert!(vertex_labels.is_some(), "Vertex should have labels");
5680            assert_eq!(
5681                vertex_labels.unwrap(),
5682                &vec!["Person".to_string()],
5683                "Labels should match"
5684            );
5685        }
5686
5687        Ok(())
5688    }
5689
5690    /// Test that estimated_size tracks mutations correctly and approximates size_bytes().
5691    /// This verifies fix for issue #147 (O(V+E) size_bytes() in metrics).
5692    #[tokio::test]
5693    async fn test_estimated_size_tracks_mutations() -> Result<()> {
5694        use crate::storage::manager::StorageManager;
5695        use object_store::local::LocalFileSystem;
5696        use object_store::path::Path as ObjectStorePath;
5697        use uni_common::core::schema::SchemaManager;
5698
5699        let dir = tempdir()?;
5700        let path = dir.path().to_str().unwrap();
5701        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5702        let schema_path = ObjectStorePath::from("schema.json");
5703
5704        let schema_manager =
5705            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5706        let _label_id = schema_manager.add_label("Test")?;
5707        schema_manager.save().await?;
5708
5709        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5710
5711        let writer = Writer::new(storage.clone(), schema_manager.clone(), 1).await?;
5712
5713        let l0 = writer.l0_manager.get_current();
5714
5715        // Initial state should be empty
5716        let initial_estimated = l0.read().estimated_size;
5717        let initial_actual = l0.read().size_bytes();
5718        assert_eq!(initial_estimated, 0, "Initial estimated_size should be 0");
5719        assert_eq!(initial_actual, 0, "Initial size_bytes should be 0");
5720
5721        // Insert vertices with properties
5722        let mut vids = Vec::new();
5723        for i in 0..10 {
5724            let vid = writer.next_vid().await?;
5725            let mut props = std::collections::HashMap::new();
5726            props.insert("name".to_string(), Value::String(format!("vertex_{}", i)));
5727            props.insert("index".to_string(), Value::Int(i));
5728            writer
5729                .insert_vertex_with_labels(vid, props, &[], None)
5730                .await?;
5731            vids.push(vid);
5732        }
5733
5734        // Verify estimated_size grew
5735        let after_vertices_estimated = l0.read().estimated_size;
5736        let after_vertices_actual = l0.read().size_bytes();
5737        assert!(
5738            after_vertices_estimated > 0,
5739            "estimated_size should grow after insertions"
5740        );
5741
5742        // Verify estimated_size is within reasonable bounds of actual size (within 2x)
5743        let ratio = after_vertices_estimated as f64 / after_vertices_actual as f64;
5744        assert!(
5745            (0.5..=2.0).contains(&ratio),
5746            "estimated_size ({}) should be within 2x of size_bytes ({}), ratio: {}",
5747            after_vertices_estimated,
5748            after_vertices_actual,
5749            ratio
5750        );
5751
5752        // Insert edges with a simple edge type
5753        let edge_type = 1u32;
5754        for i in 0..9 {
5755            let eid = writer.next_eid(edge_type).await?;
5756            writer
5757                .insert_edge(
5758                    vids[i],
5759                    vids[i + 1],
5760                    edge_type,
5761                    eid,
5762                    std::collections::HashMap::new(),
5763                    Some("NEXT".to_string()),
5764                    None,
5765                )
5766                .await?;
5767        }
5768
5769        // Verify estimated_size grew further
5770        let after_edges_estimated = l0.read().estimated_size;
5771        let after_edges_actual = l0.read().size_bytes();
5772        assert!(
5773            after_edges_estimated > after_vertices_estimated,
5774            "estimated_size should grow after edge insertions"
5775        );
5776
5777        // Verify still within reasonable bounds
5778        let ratio = after_edges_estimated as f64 / after_edges_actual as f64;
5779        assert!(
5780            (0.5..=2.0).contains(&ratio),
5781            "estimated_size ({}) should be within 2x of size_bytes ({}), ratio: {}",
5782            after_edges_estimated,
5783            after_edges_actual,
5784            ratio
5785        );
5786
5787        Ok(())
5788    }
5789
5790    /// Test that flushing WAL on a writer with no mutations succeeds cleanly.
5791    #[tokio::test]
5792    async fn test_flush_wal_empty_l0_is_noop() -> Result<()> {
5793        use crate::runtime::wal::WriteAheadLog;
5794        use crate::storage::manager::StorageManager;
5795        use object_store::local::LocalFileSystem;
5796        use object_store::path::Path as ObjectStorePath;
5797        use uni_common::core::schema::SchemaManager;
5798
5799        let dir = tempdir()?;
5800        let path = dir.path().to_str().unwrap();
5801        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5802        let schema_path = ObjectStorePath::from("schema.json");
5803
5804        let schema_manager =
5805            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5806        schema_manager.save().await?;
5807
5808        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5809
5810        let wal_path = ObjectStorePath::from("wal");
5811        let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
5812
5813        let writer = Writer::new_with_config(
5814            storage.clone(),
5815            schema_manager.clone(),
5816            1,
5817            UniConfig::default(),
5818            Some(wal.clone()),
5819            None,
5820        )
5821        .await?;
5822
5823        // Flush with no mutations — should succeed cleanly
5824        let lsn = writer.flush_wal().await?;
5825        // LSN should be 0 or 1 (no real mutations flushed)
5826        assert!(lsn <= 1, "Empty flush should produce low LSN, got {}", lsn);
5827
5828        Ok(())
5829    }
5830
5831    /// Test that transaction data does not leak into main L0 without commit.
5832    #[tokio::test]
5833    async fn test_transaction_isolation_without_commit() -> Result<()> {
5834        use crate::runtime::wal::WriteAheadLog;
5835        use crate::storage::manager::StorageManager;
5836        use object_store::local::LocalFileSystem;
5837        use object_store::path::Path as ObjectStorePath;
5838        use uni_common::core::schema::SchemaManager;
5839
5840        let dir = tempdir()?;
5841        let path = dir.path().to_str().unwrap();
5842        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5843        let schema_path = ObjectStorePath::from("schema.json");
5844
5845        let schema_manager =
5846            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5847        let _label_id = schema_manager.add_label("Person")?;
5848        schema_manager.save().await?;
5849
5850        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5851
5852        let wal_path = ObjectStorePath::from("wal");
5853        let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
5854
5855        let writer = Writer::new_with_config(
5856            storage.clone(),
5857            schema_manager.clone(),
5858            1,
5859            UniConfig::default(),
5860            Some(wal),
5861            None,
5862        )
5863        .await?;
5864
5865        // Create transaction L0
5866        let tx_l0 = writer.create_transaction_l0();
5867
5868        // Insert vertex into transaction L0
5869        let vid = writer.next_vid().await?;
5870        writer
5871            .insert_vertex_with_labels(
5872                vid,
5873                [("name".to_string(), Value::String("Ghost".to_string()))]
5874                    .into_iter()
5875                    .collect(),
5876                &["Person".to_string()],
5877                Some(&tx_l0),
5878            )
5879            .await?;
5880
5881        // Verify data is in transaction L0
5882        assert!(
5883            tx_l0.read().vertex_properties.contains_key(&vid),
5884            "Transaction L0 should contain the vertex"
5885        );
5886
5887        // Verify data is NOT in main L0
5888        let main_l0 = writer.l0_manager.get_current();
5889        assert!(
5890            !main_l0.read().vertex_properties.contains_key(&vid),
5891            "Main L0 should NOT contain uncommitted transaction data"
5892        );
5893
5894        // Drop transaction without committing — data should be lost
5895        drop(tx_l0);
5896
5897        // Main L0 still should not have it
5898        assert!(
5899            !main_l0.read().vertex_properties.contains_key(&vid),
5900            "Main L0 should remain clean after dropped transaction"
5901        );
5902
5903        Ok(())
5904    }
5905
5906    /// Phase 2 Day 12: the fork-fragment warn fires exactly once when
5907    /// the flush count crosses the configured threshold and stays
5908    /// silent on subsequent flushes for the lifetime of the writer.
5909    /// Primary writers (`fork_id == None`) never fire it.
5910    ///
5911    /// Tested directly against `tick_fork_fragment_observability` so
5912    /// the contract is locked in independently of the broader
5913    /// `flush_to_l1` path (the end-to-end fork-flush path is blocked
5914    /// on Day 10's on-the-fly schema overlay growth).
5915    #[tokio::test]
5916    async fn fork_fragment_warn_fires_once_then_silences() -> Result<()> {
5917        use crate::storage::manager::StorageManager;
5918        use object_store::local::LocalFileSystem;
5919        use object_store::path::Path as ObjectStorePath;
5920        use uni_common::core::fork::ForkId;
5921        use uni_common::core::schema::SchemaManager;
5922
5923        let dir = tempdir()?;
5924        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5925        let schema_path = ObjectStorePath::from("schema.json");
5926        let schema_manager =
5927            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5928        let storage = Arc::new(
5929            StorageManager::new(dir.path().to_str().unwrap(), schema_manager.clone()).await?,
5930        );
5931
5932        let config = UniConfig {
5933            fork_fragment_warn_threshold: 3,
5934            ..Default::default()
5935        };
5936        let mut writer =
5937            Writer::new_with_config(storage, schema_manager, 1, config, None, None).await?;
5938
5939        // Primary path: never fires.
5940        for _ in 0..10 {
5941            writer.tick_fork_fragment_observability();
5942        }
5943        assert!(!writer.fork_fragment_warn_fired.load(Ordering::Relaxed));
5944        assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 0);
5945
5946        // Fork path: tag and tick. Below threshold → no fire.
5947        writer.fork_id = Some(ForkId::new());
5948        writer.tick_fork_fragment_observability();
5949        writer.tick_fork_fragment_observability();
5950        assert!(!writer.fork_fragment_warn_fired.load(Ordering::Relaxed));
5951        assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 2);
5952
5953        // Crossing threshold → fires once.
5954        writer.tick_fork_fragment_observability();
5955        assert!(writer.fork_fragment_warn_fired.load(Ordering::Relaxed));
5956        assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 3);
5957
5958        // Subsequent ticks bump the gauge but do not re-fire.
5959        let fired_after = writer.fork_fragment_warn_fired.load(Ordering::Relaxed);
5960        for _ in 0..5 {
5961            writer.tick_fork_fragment_observability();
5962        }
5963        assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 8);
5964        assert_eq!(
5965            writer.fork_fragment_warn_fired.load(Ordering::Relaxed),
5966            fired_after
5967        );
5968
5969        Ok(())
5970    }
5971
5972    /// The hot-path mutators must not write to any `Writer` struct field.
5973    /// Phase 2 of the refactor
5974    /// gave them `&self` receivers, which the compiler enforces against
5975    /// direct `self.x = y` assignment — but interior-mutable writes
5976    /// (Mutex/Atomic/OnceLock) still compile. This regression test snapshots
5977    /// every potentially-writable field, calls each hot-path mutator, and
5978    /// asserts no field changed.
5979    ///
5980    /// Cold-path methods (`flush_to_l1`, `commit_transaction_l0`,
5981    /// `tick_fork_fragment_observability`) DO mutate fields by design and
5982    /// are intentionally out of scope here.
5983    #[tokio::test]
5984    async fn hot_path_mutators_do_not_change_writer_fields() -> Result<()> {
5985        use crate::storage::manager::StorageManager;
5986        use object_store::local::LocalFileSystem;
5987        use object_store::path::Path as ObjectStorePath;
5988        use uni_common::core::schema::SchemaManager;
5989
5990        let dir = tempdir()?;
5991        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5992        let schema_path = ObjectStorePath::from("schema.json");
5993        let schema_manager =
5994            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5995        schema_manager.add_label("Person")?;
5996        schema_manager.save().await?;
5997        let storage = Arc::new(
5998            StorageManager::new(dir.path().to_str().unwrap(), schema_manager.clone()).await?,
5999        );
6000
6001        let writer =
6002            Writer::new_with_config(storage, schema_manager, 1, UniConfig::default(), None, None)
6003                .await?;
6004
6005        /// Captures every `Writer` field that *could* be written by a
6006        /// hot-path mutator (i.e., every non-Arc, non-immutable-after-
6007        /// construction field). Arc'd substructures (`l0_manager`,
6008        /// `storage`, etc.) are intentionally not checked — they are
6009        /// re-pointed only at construction.
6010        #[derive(Debug, PartialEq)]
6011        struct Snapshot {
6012            last_flush_time: std::time::Instant,
6013            cached_manifest_some: bool,
6014            fork_flush_count: u64,
6015            fork_fragment_warn_fired: bool,
6016            xervo_runtime_some: bool,
6017            index_rebuild_manager_some: bool,
6018            fork_id: Option<ForkId>,
6019        }
6020
6021        fn snap(w: &Writer) -> Snapshot {
6022            Snapshot {
6023                last_flush_time: *w.last_flush_time.lock(),
6024                cached_manifest_some: w.cached_manifest.lock().is_some(),
6025                fork_flush_count: w.fork_flush_count.load(Ordering::Relaxed),
6026                fork_fragment_warn_fired: w.fork_fragment_warn_fired.load(Ordering::Relaxed),
6027                xervo_runtime_some: w.xervo_runtime.get().is_some(),
6028                index_rebuild_manager_some: w.index_rebuild_manager.get().is_some(),
6029                fork_id: w.fork_id,
6030            }
6031        }
6032
6033        // 1. insert_vertex_with_labels
6034        let before = snap(&writer);
6035        let vid = writer.next_vid().await?;
6036        writer
6037            .insert_vertex_with_labels(vid, Properties::new(), &["Person".to_string()], None)
6038            .await?;
6039        assert_eq!(
6040            snap(&writer),
6041            before,
6042            "insert_vertex_with_labels mutated a Writer field"
6043        );
6044
6045        // 2. insert_vertices_batch
6046        let before = snap(&writer);
6047        let vids = writer.allocate_vids(2).await?;
6048        writer
6049            .insert_vertices_batch(
6050                vids,
6051                vec![Properties::new(), Properties::new()],
6052                vec!["Person".into()],
6053                None,
6054            )
6055            .await?;
6056        assert_eq!(
6057            snap(&writer),
6058            before,
6059            "insert_vertices_batch mutated a Writer field"
6060        );
6061
6062        // 3. delete_vertex
6063        let before = snap(&writer);
6064        writer.delete_vertex(vid, None, None).await?;
6065        assert_eq!(
6066            snap(&writer),
6067            before,
6068            "delete_vertex mutated a Writer field"
6069        );
6070
6071        // (insert_edge / delete_edge are skipped here: their fixture cost is
6072        // disproportionate to the audit's marginal value, and the same
6073        // structural argument plus the compiler-enforced `&self` covers them.)
6074
6075        Ok(())
6076    }
6077}