Skip to main content

uni_store/runtime/
writer.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::runtime::context::QueryContext;
5use crate::runtime::flush_coordinator::{
6    FinalizeFn, FlushCoordinator, FlushOutcome as AsyncFlushOutcome, RotatedFlush, SharedFlushCtx,
7};
8use crate::runtime::id_allocator::IdAllocator;
9use crate::runtime::l0::{L0Buffer, serialize_constraint_key};
10use crate::runtime::l0_manager::L0Manager;
11use crate::runtime::property_manager::PropertyManager;
12use crate::runtime::wal::WriteAheadLog;
13use crate::storage::adjacency_manager::AdjacencyManager;
14use crate::storage::delta::{L1Entry, Op};
15use crate::storage::main_edge::MainEdgeDataset;
16use crate::storage::main_vertex::MainVertexDataset;
17use crate::storage::manager::StorageManager;
18use anyhow::{Result, anyhow};
19use chrono::Utc;
20use metrics;
21use parking_lot::{Mutex as PlMutex, RwLock};
22use std::collections::{BTreeMap, HashMap, HashSet};
23use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
24use std::sync::{Arc, OnceLock};
25use tracing::{debug, info, instrument};
26use uni_common::Properties;
27use uni_common::Value;
28use uni_common::config::UniConfig;
29use uni_common::core::fork::ForkId;
30use uni_common::core::id::{Eid, Vid};
31use uni_common::core::schema::{ConstraintTarget, ConstraintType, IndexDefinition};
32use uni_common::core::snapshot::{EdgeSnapshot, LabelSnapshot, SnapshotManifest};
33use uni_xervo::runtime::ModelRuntime;
34use uuid::Uuid;
35
36/// Whether `property` on `label` is a multi-vector (`List<Vector>`) column — the
37/// late-interaction (ColBERT) shape that auto-embeds per-token via the multi-vector model
38/// (issue #104). A plain `Vector` column uses the dense single-vector model.
39fn is_multivector_property(
40    schema: &uni_common::core::schema::Schema,
41    label: &str,
42    property: &str,
43) -> bool {
44    schema
45        .properties
46        .get(label)
47        .and_then(|p| p.get(property))
48        .is_some_and(|m| {
49            matches!(&m.r#type, uni_common::DataType::List(inner)
50                if matches!(**inner, uni_common::DataType::Vector { .. }))
51        })
52}
53
54/// Convert per-token embedding vectors into the stored `List<Vector>` representation:
55/// a `Value::List` whose elements are each a `Value::List<Float>` (one token vector).
56fn multivec_to_value(tokens: &[Vec<f32>]) -> Value {
57    Value::List(
58        tokens
59            .iter()
60            .map(|tok| Value::List(tok.iter().map(|f| Value::Float(*f as f64)).collect()))
61            .collect(),
62    )
63}
64
65/// One coalesced auto-embed group: all targets that share an `alias` + `source_properties`,
66/// split into dense (`Vector`) vs multi-vector (`List<Vector>`) target columns. When a group
67/// has BOTH kinds, a single hybrid inference fills both (single forward pass).
68struct EmbedGroupSpec {
69    source_properties: Vec<String>,
70    document_prefix: Option<String>,
71    dense: Vec<String>,
72    multi: Vec<String>,
73}
74
75/// Run ONE embedding inference for a coalesced group of auto-embed targets sharing an alias +
76/// source text, returning `(dense_per_text, multi_vector_per_text)` (each `Some` iff that head
77/// was requested). When both heads are needed this uses the hybrid model
78/// (`hybrid_embedder`), so a multi-functional model (e.g. BGE-M3) produces the dense + ColBERT
79/// heads from a SINGLE forward pass; otherwise it uses the single-head dense / multi-vector
80/// embedder (today's behavior for non-mixed groups).
81async fn embed_group(
82    runtime: &ModelRuntime,
83    alias: &str,
84    texts: &[&str],
85    want_dense: bool,
86    want_multi: bool,
87) -> Result<(Option<Vec<Vec<f32>>>, Option<Vec<Vec<Vec<f32>>>>)> {
88    use uni_xervo::traits::hybrid::HeadSet;
89    if want_dense && want_multi {
90        // Single-pass hybrid: one model, one inference, both heads.
91        let embedder = runtime.hybrid_embedder(alias).await?;
92        let res = embedder
93            .embed(texts, HeadSet::DENSE | HeadSet::MULTI_VECTOR)
94            .await?;
95        let dense = res.dense.ok_or_else(|| {
96            anyhow!("hybrid model '{alias}' returned no dense head for a Vector target")
97        })?;
98        let multi = res.multi_vector.ok_or_else(|| {
99            anyhow!(
100                "hybrid model '{alias}' returned no multi-vector head for a List<Vector> target"
101            )
102        })?;
103        Ok((Some(dense), Some(multi)))
104    } else if want_multi {
105        let embedder = runtime.multi_vector_embedder(alias).await?;
106        Ok((None, Some(embedder.embed(texts).await?.vectors)))
107    } else {
108        let embedder = runtime.embedding(alias).await?;
109        Ok((Some(embedder.embed(texts).await?.vectors), None))
110    }
111}
112
113/// Group a label's auto-embed configs by `(alias, source_properties)`, classifying each target
114/// column as dense vs multi-vector. A group with both kinds is a single-pass hybrid source.
115fn collect_embed_groups(
116    schema: &uni_common::core::schema::Schema,
117    label: &str,
118) -> std::collections::BTreeMap<(String, Vec<String>), EmbedGroupSpec> {
119    use std::collections::BTreeMap;
120    let mut groups: BTreeMap<(String, Vec<String>), EmbedGroupSpec> = BTreeMap::new();
121    for idx in &schema.indexes {
122        if let IndexDefinition::Vector(v_config) = idx
123            && v_config.label == label
124            && let Some(emb) = &v_config.embedding_config
125        {
126            let g = groups
127                .entry((emb.alias.clone(), emb.source_properties.clone()))
128                .or_insert_with(|| EmbedGroupSpec {
129                    source_properties: emb.source_properties.clone(),
130                    document_prefix: emb.document_prefix.clone(),
131                    dense: Vec::new(),
132                    multi: Vec::new(),
133                });
134            if is_multivector_property(schema, label, &v_config.property) {
135                g.multi.push(v_config.property.clone());
136            } else {
137                g.dense.push(v_config.property.clone());
138            }
139        }
140    }
141    groups
142}
143
144#[derive(Clone, Debug)]
145pub struct WriterConfig {
146    pub max_mutations: usize,
147    /// Enable the partial-column MergeInsert path for SET-only flushes.
148    ///
149    /// When `true`, `Writer::insert_vertex_partial` records the touched
150    /// property keys into `L0Buffer::vertex_partial_keys` and the flush
151    /// routes those VIDs through Lance `MergeInsertBuilder` with a
152    /// subset-of-schema source, skipping the read of (and write of)
153    /// the unchanged columns — including wide ones like embeddings.
154    ///
155    /// When `false`, `insert_vertex_partial` falls back to the
156    /// read-modify-write `insert_vertex_with_labels` path (preserving
157    /// bit-for-bit equivalence with prior releases). Default `false`
158    /// for the first release; flip to `true` after telemetry on the
159    /// issue #72 ingest workload confirms the win.
160    ///
161    /// See the soundness probe at
162    /// `crates/uni-store/tests/common/storage/lance_merge_insert_probe.rs`.
163    pub partial_lance_writes: bool,
164}
165
166impl Default for WriterConfig {
167    fn default() -> Self {
168        Self {
169            max_mutations: 10_000,
170            partial_lance_writes: false,
171        }
172    }
173}
174
175/// Parent state captured atomically at a fork point under `flush_lock`.
176///
177/// Holds the allocator high-water marks and every existing dataset's
178/// Lance main-branch version at the instant the parent's L0 was
179/// flushed. Because [`Writer::flush_and_capture_fork_point`] reads these
180/// while still holding `flush_lock`, no concurrent commit or flush can
181/// advance the allocator or any dataset tip between the flush and the
182/// reads. A fork built from these values therefore cannot collide VIDs
183/// with the parent nor inherit rows committed after the fork point.
184#[derive(Clone, Debug, Default)]
185pub struct ForkPoint {
186    /// Next vertex id the parent would allocate at the fork point.
187    pub vid_hwm: u64,
188    /// Next edge id the parent would allocate at the fork point.
189    pub eid_hwm: u64,
190    /// `dataset_name` → Lance main-branch version at the fork point.
191    ///
192    /// Keys use the same dataset naming as the fork branch loop
193    /// (`vertices`, `edges`, `vertices_{label}`, `deltas_{type}_{dir}`,
194    /// `adjacency_{type}_{dir}`). A dataset with no `.lance` directory
195    /// on disk at the fork point has no entry.
196    pub dataset_versions: BTreeMap<String, u64>,
197    /// Parent's MVCC version high-water-mark at the fork point: the
198    /// largest `_version` any inherited row can carry. A fork bootstraps
199    /// its own version counter to this floor so a fork transaction's
200    /// `_version <= pin` read still sees inherited (base_paths) rows,
201    /// while the fork's own writes get versions above it.
202    pub version_hwm: u64,
203}
204
205/// RAII latch on [`StorageManager::flush_in_progress`].
206///
207/// Sets the flag to `true` on construction (via CAS) and back to `false` on
208/// drop, so any `?` early-exit inside `flush_to_l1` cannot leave the flag
209/// stuck. Returns `None` if a flush is already in progress, providing
210/// forward-compatible exclusion once the outer writer-RwLock is removed in
211/// Phase 4 of the concurrent-writer refactor.
212// FlushInProgressGuard moved to storage/manager.rs so flush_coordinator.rs
213// can hold it on RotatedFlush without a writer.rs back-import cycle.
214pub use crate::storage::manager::FlushInProgressGuard;
215
216/// Output of [`Writer::flush_l0_rotate`]: the to-be-flushed L0 buffer,
217/// captured WAL LSN, current_version, and the in-progress guard whose
218/// lifetime spans the full flush (including the future async stream
219/// phase that runs on a spawned task).
220struct RotateOutput {
221    old_l0_arc: Arc<RwLock<L0Buffer>>,
222    wal_lsn: u64,
223    current_version: u64,
224    flush_in_progress_guard: FlushInProgressGuard,
225}
226
227/// Project a property map to a subset selected by `keys`. Used to
228/// run `touched_needs_full_read` against just the SET-touched keys
229/// when the caller passes a fully-merged `props` map.
230fn props_subset(props: &Properties, keys: &HashSet<String>) -> Properties {
231    let mut out = Properties::new();
232    for k in keys {
233        if let Some(v) = props.get(k) {
234            out.insert(k.clone(), v.clone());
235        }
236    }
237    out
238}
239
240/// Join a storage base URI and a dataset name into a `.lance` URI.
241///
242/// Mirrors the fork branch loop's `join_uri` so the versions captured
243/// by [`Writer::flush_and_capture_fork_point`] key the exact same
244/// datasets the fork later branches.
245fn join_lance_uri(base: &str, dataset: &str) -> String {
246    if base.ends_with('/') {
247        format!("{base}{dataset}.lance")
248    } else {
249        format!("{base}/{dataset}.lance")
250    }
251}
252
253/// Cheap on-disk existence check for a dataset `.lance` directory.
254///
255/// Local-fs heuristic: a URI with a `://` scheme is assumed remote and
256/// reported present, deferring the real check to `current_version`.
257/// Mirrors the fork branch loop's `path_exists`.
258fn lance_path_exists(uri: &str) -> bool {
259    if uri.contains("://") {
260        return true;
261    }
262    std::path::Path::new(uri).exists()
263}
264
265/// Output of [`Writer::flush_stream_l1`]: the built (but not yet
266/// published) snapshot manifest and its id. Finalize is responsible
267/// for `save_snapshot` + `set_latest_snapshot` + `cached_manifest`
268/// update.
269struct FlushOutcome {
270    manifest: SnapshotManifest,
271    snapshot_id: String,
272}
273
274pub struct Writer {
275    pub l0_manager: Arc<L0Manager>,
276    pub storage: Arc<StorageManager>,
277    pub schema_manager: Arc<uni_common::core::schema::SchemaManager>,
278    pub allocator: Arc<IdAllocator>,
279    pub config: UniConfig,
280    /// Optional embedding runtime. `OnceLock` so the initializer can run
281    /// on `&self` after the `Writer` has been wrapped in `Arc<Writer>`
282    /// (Phase 4 of concurrent_writer.md). Read through
283    /// [`Writer::xervo_runtime`] — the field itself is private to keep
284    /// callers oblivious to the OnceLock representation.
285    xervo_runtime: OnceLock<Arc<ModelRuntime>>,
286    /// Property manager for cache invalidation after flush
287    pub property_manager: Option<Arc<PropertyManager>>,
288    /// Adjacency manager for dual-write (edges survive flush).
289    adjacency_manager: Arc<AdjacencyManager>,
290    /// Timestamp of last flush or creation. Interior-mutable so that
291    /// `&self` callers can update it; uncontended in practice because all
292    /// writes happen inside the single-flusher critical section.
293    /// Arc-wrapped so it can travel into the SharedFlushCtx that the
294    /// async-flush coordinator passes to spawned stream/finalize tasks.
295    last_flush_time: Arc<PlMutex<std::time::Instant>>,
296    /// Background compaction task handle (prevents concurrent compaction races)
297    compaction_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
298    /// Optional index rebuild manager for post-flush automatic rebuild scheduling.
299    /// `OnceLock` for the same reason as `xervo_runtime`.
300    /// Wrapped in `Arc` so the async-flush finalize path can read it
301    /// from a spawned task via `SharedFlushCtx`.
302    index_rebuild_manager: Arc<OnceLock<Arc<crate::storage::index_rebuild::IndexRebuildManager>>>,
303    /// Cached snapshot manifest from the last flush. Avoids re-reading from
304    /// object store on every flush_to_l1 call. Wrapped in a `Mutex` for
305    /// `&self` access; uncontended because all access is inside the
306    /// single-flusher critical section.
307    cached_manifest: Arc<PlMutex<Option<SnapshotManifest>>>,
308    /// Identifier of the fork this writer serves, if any. `None` for
309    /// primary's writer. Set by [`crate::fork::writer_factory::new_for_fork`]
310    /// and read in `flush_to_l1` to emit fork-tagged metrics and to fire
311    /// the fragment-count guard rail (Phase 2 Day 12).
312    pub fork_id: Option<ForkId>,
313    /// Number of `flush_to_l1` calls since this writer was constructed.
314    /// Used as a proxy for L1 fragment growth on the fork's branches:
315    /// each flush typically appends ~1 fragment per touched dataset, so
316    /// the count tracks the order of magnitude of fragment accumulation.
317    /// Reading the actual `Dataset::manifest().fragments.len()` per
318    /// flush would add a per-dataset object-store roundtrip on the hot
319    /// commit path; the proxy keeps the guard rail purely observational
320    /// (Phase 5 introduces fork compaction proper). Only meaningful when
321    /// `fork_id.is_some()`. `Relaxed` is sufficient — observational only.
322    fork_flush_count: Arc<AtomicU64>,
323    /// Whether the fork-fragment warning has already fired at the
324    /// configured threshold. One-shot per writer lifetime. `Relaxed` is
325    /// sufficient — observational only.
326    fork_fragment_warn_fired: Arc<AtomicBool>,
327    /// Dedicated lock for the genuinely-exclusive flush path. Acquired by
328    /// the [`Writer::flush_to_l1`] entry and by `commit_transaction_l0`
329    /// across its WAL-append + L0-merge window. Replaces the outer
330    /// `Arc<RwLock<Writer>>` for flush exclusion once Phase 4 drops it.
331    /// Arc-wrapped so async-flush coordinator's finalize path can
332    /// re-acquire it from a spawned task via SharedFlushCtx.
333    flush_lock: Arc<tokio::sync::Mutex<()>>,
334    /// Coordinator for async-flush pipeline. Owns the back-pressure
335    /// semaphore, rotate-order sequence, single-finalizer task, and
336    /// pending-flush counter. Always present even when async flush is
337    /// disabled — the sync `flush_to_l1` path uses it for the future
338    /// `FlushInProgressGuard`/permit ownership model.
339    /// Coordinator is `None` when `async_flush_enabled = false`. The
340    /// coordinator's finalizer task captures `SharedFlushCtx` which
341    /// includes `Arc<StorageManager>`; on a fork-scoped Writer that
342    /// also pins the fork's `ForkScope` via `storage.fork_scope`, so
343    /// the holder count never drops. Constructing it only when the
344    /// feature is actually on avoids that side-effect for all
345    /// existing sync-flush paths. When async-flush graduates from
346    /// opt-in to default (Commit 12), `drop_fork` (Commit 8) handles
347    /// the drain explicitly.
348    #[allow(dead_code)] // first production use lands in Commit 6/7
349    pub(crate) flush_coordinator: Option<Arc<crate::runtime::flush_coordinator::FlushCoordinator>>,
350    /// Optimistic-concurrency commit-sequence counter (SSI). Incremented once
351    /// per successful commit under `flush_lock`; a transaction captures the
352    /// current value at begin as its read sequence (`L0Buffer::occ_read_seq`).
353    ///
354    /// Always allocated; consulted only when `config.ssi_enabled` is `true`.
355    ///
356    /// Typed through the [`crate::runtime::sync`] shim so the OCC commit core can
357    /// be model-checked under loom/shuttle; aliases to `std::AtomicU64` normally.
358    commit_sequence: Arc<crate::runtime::sync::AtomicU64>,
359    /// Bounded log of recently-committed write-sets for OCC conflict detection.
360    /// Read and updated only under `flush_lock`.
361    ///
362    /// Always allocated; consulted only when `config.ssi_enabled` is `true`.
363    committed_writes: Arc<PlMutex<crate::runtime::occ::CommitRegistry>>,
364    /// Per-row pessimistic locks for `FOR UPDATE` (SSI escape hatch), keyed by
365    /// canonical (label, key-props) bytes. A transaction holds the lock from
366    /// MATCH until commit/rollback, serializing concurrent `FOR UPDATE` writers
367    /// on the same key (avoiding optimistic abort-retry on hot keys).
368    ///
369    /// Always allocated; populated only when `config.ssi_enabled` is `true`.
370    for_update_locks: Arc<dashmap::DashMap<Vec<u8>, Arc<tokio::sync::Mutex<()>>>>,
371}
372
373/// Number of recent commits retained for OCC conflict detection. Large enough
374/// that under-run — and the resulting conservative abort — is rare in practice;
375/// each entry is a small set of touched ids.
376const OCC_REGISTRY_CAPACITY: usize = 4096;
377
378impl Writer {
379    pub async fn new(
380        storage: Arc<StorageManager>,
381        schema_manager: Arc<uni_common::core::schema::SchemaManager>,
382        start_version: u64,
383    ) -> Result<Self> {
384        Self::new_with_config(
385            storage,
386            schema_manager,
387            start_version,
388            UniConfig::default(),
389            None,
390            None,
391        )
392        .await
393    }
394
395    pub async fn new_with_config(
396        storage: Arc<StorageManager>,
397        schema_manager: Arc<uni_common::core::schema::SchemaManager>,
398        start_version: u64,
399        config: UniConfig,
400        wal: Option<Arc<WriteAheadLog>>,
401        allocator: Option<Arc<IdAllocator>>,
402    ) -> Result<Self> {
403        let allocator = if let Some(a) = allocator {
404            a
405        } else {
406            let store = storage.store();
407            let path = object_store::path::Path::from("id_allocator.json");
408            Arc::new(IdAllocator::new(store, path, 1000).await?)
409        };
410
411        let l0_manager = Arc::new(L0Manager::new(start_version, wal));
412
413        let property_manager = Some(Arc::new(PropertyManager::new(
414            storage.clone(),
415            schema_manager.clone(),
416            1000,
417        )));
418
419        let adjacency_manager = storage.adjacency_manager();
420
421        // Hoist the Arc'd fields so we can both stash them on Writer and
422        // hand the same Arcs to the SharedFlushCtx that FlushCoordinator
423        // captures. Single-source-of-truth for each piece of mutable
424        // shared state.
425        let last_flush_time = Arc::new(PlMutex::new(std::time::Instant::now()));
426        let cached_manifest = Arc::new(PlMutex::new(None));
427        let fork_flush_count = Arc::new(AtomicU64::new(0));
428        let fork_fragment_warn_fired = Arc::new(AtomicBool::new(false));
429        let flush_lock = Arc::new(tokio::sync::Mutex::new(()));
430        let compaction_handle = Arc::new(RwLock::new(None));
431        let index_rebuild_manager: Arc<
432            OnceLock<Arc<crate::storage::index_rebuild::IndexRebuildManager>>,
433        > = Arc::new(OnceLock::new());
434
435        let flush_coordinator = if config.async_flush_enabled {
436            let shared = SharedFlushCtx {
437                storage: storage.clone(),
438                l0_manager: l0_manager.clone(),
439                adjacency_manager: adjacency_manager.clone(),
440                property_manager: property_manager.clone(),
441                schema_manager: schema_manager.clone(),
442                cached_manifest: cached_manifest.clone(),
443                last_flush_time: last_flush_time.clone(),
444                fork_id: None,
445                fork_flush_count: fork_flush_count.clone(),
446                fork_fragment_warn_fired: fork_fragment_warn_fired.clone(),
447                fork_fragment_warn_threshold: config.fork_fragment_warn_threshold,
448                flush_lock: flush_lock.clone(),
449                index_rebuild_manager: index_rebuild_manager.clone(),
450                compaction_handle: compaction_handle.clone(),
451                compaction_config: config.compaction.clone(),
452                index_rebuild_config: config.index_rebuild.clone(),
453                auto_rebuild_enabled: config.index_rebuild.auto_rebuild_enabled,
454            };
455            let finalize_fn: Arc<dyn FinalizeFn> = Arc::new(WriterFinalizer);
456            Some(Arc::new(FlushCoordinator::new(
457                config.max_pending_flushes,
458                shared,
459                finalize_fn,
460            )))
461        } else {
462            None
463        };
464
465        let commit_sequence = Arc::new(crate::runtime::sync::AtomicU64::new(0));
466        let committed_writes = Arc::new(PlMutex::new(crate::runtime::occ::CommitRegistry::new(
467            OCC_REGISTRY_CAPACITY,
468        )));
469        let for_update_locks = Arc::new(dashmap::DashMap::new());
470
471        Ok(Self {
472            l0_manager,
473            storage,
474            schema_manager,
475            allocator,
476            config,
477            xervo_runtime: OnceLock::new(),
478            property_manager,
479            adjacency_manager,
480            last_flush_time,
481            compaction_handle,
482            index_rebuild_manager,
483            cached_manifest,
484            fork_id: None,
485            fork_flush_count,
486            fork_fragment_warn_fired,
487            flush_lock,
488            flush_coordinator,
489            commit_sequence,
490            committed_writes,
491            for_update_locks,
492        })
493    }
494
495    /// Returns the shared pessimistic lock handle for a `FOR UPDATE` row key,
496    /// creating it on first use. The caller `.lock_owned().await`s the returned
497    /// mutex and holds the guard for the transaction's lifetime.
498    pub fn row_lock_handle(&self, key: &[u8]) -> Arc<tokio::sync::Mutex<()>> {
499        self.for_update_locks
500            .entry(key.to_vec())
501            .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
502            .clone()
503    }
504
505    /// Prunes `FOR UPDATE` lock-map entries for `keys` that no live transaction
506    /// holds anymore, so the map does not grow without bound across the keyspace.
507    ///
508    /// Called when a transaction ends, **after** its guards have been dropped.
509    /// `remove_if` evaluates its predicate under the DashMap shard lock, which is
510    /// the same lock `row_lock_handle` takes to clone an entry — so the check
511    /// `strong_count == 1` (only the map holds the `Arc`) is race-free: a
512    /// concurrent acquirer either already cloned the `Arc` (count ≥ 2 → we skip
513    /// removal) or has not yet taken the shard lock (it will mint a fresh entry
514    /// after we remove). Either way no two transactions ever lock different
515    /// `Mutex` instances for the same key.
516    pub fn release_for_update_locks(&self, keys: &[Vec<u8>]) {
517        for key in keys {
518            self.for_update_locks
519                .remove_if(key, |_, handle| Arc::strong_count(handle) == 1);
520        }
521    }
522
523    /// Number of live entries in the `FOR UPDATE` lock map. Introspection for
524    /// tests that the map does not leak entries across transactions (G5).
525    pub fn for_update_lock_count(&self) -> usize {
526        self.for_update_locks.len()
527    }
528
529    /// The current OCC commit sequence. A `FOR UPDATE` acquisition re-stamps a
530    /// fresh transaction's `occ_read_seq` to this so its conflict-detection
531    /// baseline advances to lock-acquisition time (read-latest under the lock).
532    pub fn current_commit_sequence(&self) -> u64 {
533        self.commit_sequence
534            .load(crate::runtime::sync::Ordering::Relaxed)
535    }
536
537    /// Build a fresh `SharedFlushCtx` from this Writer's current state.
538    /// Used by the async-flush stream/finalize paths to pass into spawned
539    /// tasks without smuggling `Arc<Writer>` (which would create a cycle
540    /// with `flush_coordinator -> FinalizeFn -> Writer`).
541    pub(crate) fn shared_ctx(&self) -> SharedFlushCtx {
542        SharedFlushCtx {
543            storage: self.storage.clone(),
544            l0_manager: self.l0_manager.clone(),
545            adjacency_manager: self.adjacency_manager.clone(),
546            property_manager: self.property_manager.clone(),
547            schema_manager: self.schema_manager.clone(),
548            cached_manifest: self.cached_manifest.clone(),
549            last_flush_time: self.last_flush_time.clone(),
550            fork_id: self.fork_id,
551            fork_flush_count: self.fork_flush_count.clone(),
552            fork_fragment_warn_fired: self.fork_fragment_warn_fired.clone(),
553            fork_fragment_warn_threshold: self.config.fork_fragment_warn_threshold,
554            flush_lock: self.flush_lock.clone(),
555            index_rebuild_manager: self.index_rebuild_manager.clone(),
556            compaction_handle: self.compaction_handle.clone(),
557            compaction_config: self.config.compaction.clone(),
558            index_rebuild_config: self.config.index_rebuild.clone(),
559            auto_rebuild_enabled: self.config.index_rebuild.auto_rebuild_enabled,
560        }
561    }
562
563    /// Borrow the flush coordinator if async flush is enabled.
564    /// Returns `None` when `config.async_flush_enabled = false`.
565    /// External callers (`drop_fork`) use this to drain pending streams.
566    pub fn flush_coordinator(
567        &self,
568    ) -> Option<&Arc<crate::runtime::flush_coordinator::FlushCoordinator>> {
569        self.flush_coordinator.as_ref()
570    }
571
572    /// Set the index rebuild manager for post-flush automatic rebuild scheduling.
573    ///
574    /// One-shot: returns `Err` if already set. The receiver is `&self` so this
575    /// can be called after the `Writer` has been wrapped in `Arc<Writer>`.
576    pub fn set_index_rebuild_manager(
577        &self,
578        manager: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
579    ) -> Result<()> {
580        self.index_rebuild_manager
581            .set(manager)
582            .map_err(|_| anyhow!("index_rebuild_manager already set"))
583    }
584
585    /// Replay WAL mutations into the current L0 buffer.
586    pub async fn replay_wal(&self, wal_high_water_mark: u64) -> Result<usize> {
587        let l0 = self.l0_manager.get_current();
588        let wal = l0.read().wal.clone();
589
590        if let Some(wal) = wal {
591            wal.initialize().await?;
592            let mutations = wal.replay_since(wal_high_water_mark).await?;
593            let count = mutations.len();
594
595            if count > 0 {
596                log::info!(
597                    "Replaying {} mutations from WAL (LSN > {})",
598                    count,
599                    wal_high_water_mark
600                );
601                let mut l0_guard = l0.write();
602                l0_guard.replay_mutations(mutations)?;
603                // Rebuild the UNIQUE constraint index over the recovered rows
604                // (Bug #9 Mechanism B). `replay_mutations` restores
605                // vertices/properties/labels but never repopulates
606                // `constraint_index` (its only other caller is the live insert
607                // path). Without this, a unique key that lives only in the WAL
608                // (committed but not yet flushed to Lance) is invisible to
609                // `check_unique_constraint_multi` after recovery and a
610                // duplicate of it could be created.
611                self.rebuild_constraint_index(&mut l0_guard);
612            }
613
614            Ok(count)
615        } else {
616            Ok(0)
617        }
618    }
619
620    /// Rebuild the UNIQUE constraint index on a recovered L0 buffer.
621    ///
622    /// Scans every recovered vertex's properties and, for each enabled UNIQUE
623    /// constraint whose target label the vertex carries and whose member
624    /// properties are all present, inserts the same constraint key the live
625    /// insert path builds (`serialize_constraint_key`). Tombstoned vertices are
626    /// skipped. Called after [`L0Buffer::replay_mutations`] under the buffer's
627    /// write lock; the schema is already loaded on the `Writer`.
628    fn rebuild_constraint_index(&self, l0_guard: &mut L0Buffer) {
629        let schema = self.schema_manager.schema();
630        // Collect entries first to avoid borrowing `vertex_properties`
631        // immutably while mutating `constraint_index` through the same guard.
632        let mut keys: Vec<(Vec<u8>, Vid)> = Vec::new();
633        for (&vid, props) in &l0_guard.vertex_properties {
634            if l0_guard.vertex_tombstones.contains(&vid) {
635                continue;
636            }
637            let Some(labels) = l0_guard.vertex_labels.get(&vid) else {
638                continue;
639            };
640            for label in labels {
641                for constraint in &schema.constraints {
642                    if !constraint.enabled {
643                        continue;
644                    }
645                    let ConstraintTarget::Label(l) = &constraint.target else {
646                        continue;
647                    };
648                    if l != label {
649                        continue;
650                    }
651                    let ConstraintType::Unique {
652                        properties: unique_props,
653                    } = &constraint.constraint_type
654                    else {
655                        continue;
656                    };
657                    let mut key_values = Vec::new();
658                    let mut all_present = true;
659                    for prop in unique_props {
660                        if let Some(val) = props.get(prop) {
661                            key_values.push((prop.clone(), val.clone()));
662                        } else {
663                            all_present = false;
664                            break;
665                        }
666                    }
667                    if all_present {
668                        keys.push((serialize_constraint_key(label, &key_values), vid));
669                    }
670                }
671            }
672        }
673        for (key, vid) in keys {
674            l0_guard.insert_constraint_key(key, vid);
675        }
676    }
677
678    /// Allocates the next VID (pure auto-increment).
679    pub async fn next_vid(&self) -> Result<Vid> {
680        self.allocator.allocate_vid().await
681    }
682
683    /// Allocates multiple VIDs at once for bulk operations.
684    /// This is more efficient than calling next_vid() in a loop.
685    pub async fn allocate_vids(&self, count: usize) -> Result<Vec<Vid>> {
686        self.allocator.allocate_vids(count).await
687    }
688
689    /// Allocates the next EID (pure auto-increment).
690    pub async fn next_eid(&self, _type_id: u32) -> Result<Eid> {
691        self.allocator.allocate_eid().await
692    }
693
694    /// Allocates multiple EIDs at once for bulk operations.
695    /// This is more efficient than calling next_eid() in a loop.
696    pub async fn allocate_eids(&self, count: usize) -> Result<Vec<Eid>> {
697        self.allocator.allocate_eids(count).await
698    }
699
700    /// Install the embedding runtime exactly once. Receiver is `&self` so it
701    /// can be called after the `Writer` has been wrapped in `Arc<Writer>`.
702    pub fn set_xervo_runtime(&self, runtime: Arc<ModelRuntime>) -> Result<()> {
703        self.xervo_runtime
704            .set(runtime)
705            .map_err(|_| anyhow!("xervo_runtime already set"))
706    }
707
708    pub fn xervo_runtime(&self) -> Option<Arc<ModelRuntime>> {
709        self.xervo_runtime.get().cloned()
710    }
711
712    /// Create a new empty L0 buffer for transaction-scoped mutations.
713    ///
714    /// Only reads the current version — no exclusive lock required on Writer.
715    /// The returned buffer has no WAL reference; mutations are logged at
716    /// commit time via [`Self::commit_transaction_l0`].
717    pub fn create_transaction_l0(&self) -> Arc<RwLock<L0Buffer>> {
718        let current_version = self.l0_manager.get_current().read().current_version;
719        // Transaction mutations are logged to WAL at COMMIT time, not during the transaction.
720        let buf = L0Buffer::new(current_version, None);
721        // SSI: stamp the OCC read sequence at begin so commit can detect any
722        // transaction that committed since. Gated on the runtime `ssi_enabled`
723        // toggle — when off, `occ_read_set` stays `None` and every downstream
724        // read-set recording / commit validation self-gates to a no-op.
725        let buf = if self.config.ssi_enabled {
726            let mut buf = buf;
727            buf.occ_read_seq = self
728                .commit_sequence
729                .load(crate::runtime::sync::Ordering::Relaxed);
730            // The read path records observed ids here for SSI antidependency
731            // detection; commit consults it.
732            buf.occ_read_set = Some(Arc::new(parking_lot::Mutex::new(
733                crate::runtime::l0::OccReadSet::default(),
734            )));
735            buf
736        } else {
737            buf
738        };
739        Arc::new(RwLock::new(buf))
740    }
741
742    /// Resolve the target L0 buffer for a mutation.
743    ///
744    /// When `tx_l0` is `Some`, the mutation targets a transaction-private buffer.
745    /// When `None`, it targets the global L0 from the manager.
746    fn resolve_l0(&self, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) -> Arc<RwLock<L0Buffer>> {
747        tx_l0
748            .cloned()
749            .unwrap_or_else(|| self.l0_manager.get_current())
750    }
751
752    fn update_metrics(&self) {
753        let l0 = self.l0_manager.get_current();
754        let size = l0.read().estimated_size;
755        metrics::gauge!("l0_buffer_size_bytes").set(size as f64);
756    }
757
758    /// Overlay-aware issue-#77 edge-endpoint validation.
759    ///
760    /// The current buffer alone does not hold all committed-but-unflushed
761    /// tombstones — a flush rotation moves them onto `pending_flush` until
762    /// the Lance write completes. A vertex is effectively deleted iff,
763    /// walking newest-first (tx → current → pending newest→oldest), the
764    /// first buffer that knows the vid says "tombstoned" (an insert clears
765    /// the tombstone within a buffer, so props/tombstone are mutually
766    /// exclusive per buffer).
767    ///
768    /// Must run under `flush_lock` so the overlay cannot change before the
769    /// merge, and BEFORE the durable WAL flush (see the call site).
770    fn validate_edge_endpoints_overlay(&self, tx_l0: &L0Buffer) -> Result<()> {
771        let chain = self.l0_manager.get_pending_flush();
772        let current = self.l0_manager.get_current();
773        let effectively_deleted = |vid: &Vid| -> bool {
774            if tx_l0.vertex_properties.contains_key(vid) {
775                return false;
776            }
777            if tx_l0.vertex_tombstones.contains(vid) {
778                return true;
779            }
780            {
781                let cur = current.read();
782                if cur.vertex_properties.contains_key(vid) {
783                    return false;
784                }
785                if cur.vertex_tombstones.contains(vid) {
786                    return true;
787                }
788            }
789            for frozen in chain.iter().rev() {
790                let g = frozen.read();
791                if g.vertex_properties.contains_key(vid) {
792                    return false;
793                }
794                if g.vertex_tombstones.contains(vid) {
795                    return true;
796                }
797            }
798            false
799        };
800        for (eid, (src_vid, dst_vid, _etype)) in &tx_l0.edge_endpoints {
801            if tx_l0.tombstones.contains_key(eid) {
802                continue; // a deletion, not an insertion — never resurrects a vertex
803            }
804            if effectively_deleted(src_vid) {
805                anyhow::bail!(
806                    "Cannot insert edge {}: source vertex {} has been deleted (issue #77)",
807                    eid,
808                    src_vid
809                );
810            }
811            if effectively_deleted(dst_vid) {
812                anyhow::bail!(
813                    "Cannot insert edge {}: destination vertex {} has been deleted (issue #77)",
814                    eid,
815                    dst_vid
816                );
817            }
818        }
819        Ok(())
820    }
821
822    /// Seed `main_l0` (the current buffer) with the newest pending-overlay
823    /// value for each CRDT property the transaction writes, so the commit
824    /// merge MERGES against the committed CRDT state instead of shadowing it
825    /// (the carve-out lets concurrent CRDT writers commit on the assumption
826    /// that the merge sees the committed value — true only while that value
827    /// lives in current, not mid-flush on `pending_flush`). No-op when
828    /// nothing is pending or the property already exists in current. Vertex
829    /// properties only, mirroring the carve-out itself.
830    fn seed_crdt_state_from_chain(&self, tx_l0: &L0Buffer, main_l0: &mut L0Buffer) {
831        let chain = self.l0_manager.get_pending_flush();
832        if chain.is_empty() {
833            return;
834        }
835        for (vid, props) in &tx_l0.vertex_properties {
836            Self::seed_crdt_props(&chain, *vid, props, main_l0);
837        }
838    }
839
840    /// Per-vertex CRDT seeding (see [`Self::seed_crdt_state_from_chain`]).
841    /// Also used by the non-transactional vertex write path, which CRDT-merges
842    /// into the current buffer directly and has the same shadowing hazard
843    /// during a flush window.
844    fn seed_crdt_props(
845        chain: &[Arc<RwLock<L0Buffer>>],
846        vid: Vid,
847        props: &Properties,
848        target: &mut L0Buffer,
849    ) {
850        for (key, value) in props {
851            if crate::runtime::l0::try_as_crdt(value).is_none() {
852                continue;
853            }
854            if target
855                .vertex_properties
856                .get(&vid)
857                .is_some_and(|p| p.contains_key(key))
858            {
859                continue;
860            }
861            // Newest generation first: the first hit is the live state.
862            for frozen in chain.iter().rev() {
863                let g = frozen.read();
864                if let Some(v) = g.vertex_properties.get(&vid).and_then(|p| p.get(key)) {
865                    if crate::runtime::l0::try_as_crdt(v).is_some() {
866                        target
867                            .vertex_properties
868                            .entry(vid)
869                            .or_default()
870                            .insert(key.clone(), v.clone());
871                    }
872                    break;
873                }
874            }
875        }
876    }
877
878    /// Commit an externally-owned transaction L0 buffer.
879    ///
880    /// Writes mutations to WAL, flushes, merges into main L0, and replays
881    /// edges into the AdjacencyManager. Returns the WAL LSN of the commit
882    /// (0 when no WAL is configured).
883    /// Commit a transaction's private L0 buffer into main L0.
884    ///
885    /// Returns `(wal_lsn, flush_pending)`. When `flush_pending == true`, the
886    /// post-commit `should_flush()` predicate fired but no flush ran — the
887    /// caller is expected to spawn a background `flush_to_l1`. This is the
888    /// shape used when `UniConfig::async_flush_enabled` is set, so commits
889    /// don't block on L1-streaming I/O.
890    pub async fn commit_transaction_l0(
891        self: &Arc<Self>,
892        tx_l0_arc: Arc<RwLock<L0Buffer>>,
893    ) -> Result<(u64, bool)> {
894        // Hold `flush_lock` across WAL append + flush + main-L0 merge.
895        // Two concurrent commits serialize here; in Phase 3 the outer
896        // `Arc<RwLock<Writer>>` already provides this exclusion, so the
897        // acquisition is uncontended. Phase 4 drops the outer lock and
898        // this becomes the load-bearing serialization point.
899        let _flush_lock_guard = self.flush_lock.lock().await;
900
901        // Crash-recovery seam: simulate process death immediately after winning
902        // the commit serialization point but before any durable work. No-op
903        // unless built with `--features failpoints`. (See ssi_resilience tests.)
904        fail::fail_point!("commit::after-flush-lock");
905
906        // SSI: optimistic conflict detection. This MUST run before any WAL
907        // write — `flush_wal()` below is the durable commit point and the WAL
908        // has no abort marker, so aborting after it would resurrect this
909        // transaction on crash recovery. The write-set is reused for
910        // registration after a successful merge.
911        // Runtime-gated on `config.ssi_enabled`. When off, no validation runs
912        // and `occ_write_set` is `None`, so the post-merge registration below
913        // is skipped — reproducing last-writer-wins exactly.
914        let occ_write_set: Option<crate::runtime::occ::WriteSet> = if self.config.ssi_enabled {
915            let tx_l0 = tx_l0_arc.read();
916            let read_seq = tx_l0.occ_read_seq;
917            let write_set = crate::runtime::occ::WriteSet::from_l0(&tx_l0);
918            if !write_set.is_empty() {
919                // Telemetry: one validation per non-empty (writing) commit. The
920                // ratio of conflicts to validations is the headline abort rate.
921                metrics::counter!("uni_ssi_commit_validations_total").increment(1);
922                // Read-set is consulted only for writing transactions, so a
923                // read-only commit (empty write-set) runs at snapshot isolation.
924                let read_guard = tx_l0.occ_read_set.as_ref().map(|rs| rs.lock());
925                if let Some(conflict) =
926                    self.committed_writes
927                        .lock()
928                        .check(read_seq, &write_set, read_guard.as_deref())
929                {
930                    use crate::runtime::occ::Conflict;
931                    match &conflict {
932                        Conflict::WriteWrite { .. } => metrics::counter!(
933                            "uni_ssi_serialization_conflicts_total",
934                            "kind" => "write_write",
935                        )
936                        .increment(1),
937                        Conflict::ReadWrite { .. } => metrics::counter!(
938                            "uni_ssi_serialization_conflicts_total",
939                            "kind" => "read_write",
940                        )
941                        .increment(1),
942                        Conflict::HistoryTruncated { .. } => {
943                            metrics::counter!("uni_ssi_history_truncated_total").increment(1)
944                        }
945                    }
946                    return Err(anyhow::Error::new(
947                        uni_common::UniError::SerializationConflict {
948                            message: conflict.to_string(),
949                        },
950                    ));
951                }
952            }
953
954            // Validate against the committed-but-unflushed overlay under
955            // `flush_lock`: serializable MERGE uniqueness + CRDT carve-out
956            // soundness. The current buffer alone does not hold all committed
957            // state — a flush rotation moves it onto `pending_flush` until the
958            // Lance write completes (the Bug #9A window, here at the
959            // commit-time layer) — so every check walks [current, pending…].
960            {
961                let pending = self.l0_manager.get_pending_flush();
962                let main_l0 = self.l0_manager.get_current();
963                let overlay: Vec<Arc<RwLock<L0Buffer>>> =
964                    std::iter::once(main_l0).chain(pending).collect();
965
966                // SSI / serializable MERGE: abort if a concurrent transaction has
967                // already committed a row with one of this transaction's unique
968                // keys. Commits serialize here, so this closes the race window
969                // left by the per-insert check. (Empty index → no iterations.)
970                for (key, vid) in &tx_l0.constraint_index {
971                    if overlay
972                        .iter()
973                        .any(|b| b.read().has_constraint_key(key, *vid))
974                    {
975                        metrics::counter!("uni_ssi_constraint_conflicts_total").increment(1);
976                        return Err(anyhow::Error::new(
977                            uni_common::UniError::ConstraintConflict {
978                                message: "unique key already committed by a concurrent \
979                                          transaction"
980                                    .to_string(),
981                            },
982                        ));
983                    }
984                }
985
986                // Implicit MERGE phantom guard: a `MERGE` that *created* a node
987                // registered its (label, key-props) here even with no declared
988                // UNIQUE constraint. If a concurrent transaction already committed
989                // the same MERGE key, abort retriably so the two converge to one
990                // node on retry (the loser's MATCH then finds the committed row).
991                // Only MERGE-creates register keys, so a plain CREATE of the same
992                // properties never lands here. (Empty index → no iterations.)
993                for (key, vid) in &tx_l0.merge_guard_index {
994                    if overlay
995                        .iter()
996                        .any(|b| b.read().has_merge_guard_key(key, *vid))
997                    {
998                        metrics::counter!("uni_ssi_constraint_conflicts_total").increment(1);
999                        return Err(anyhow::Error::new(
1000                            uni_common::UniError::ConstraintConflict {
1001                                message: "MERGE key already committed by a concurrent \
1002                                          transaction"
1003                                    .to_string(),
1004                            },
1005                        ));
1006                    }
1007                }
1008
1009                // Same race window for global ext_id uniqueness: the per-insert
1010                // check ran against an older main L0; re-probe the committed
1011                // index here, where commits serialize.
1012                for (ext_id, vid) in &tx_l0.extid_index {
1013                    let taken = overlay.iter().any(|b| {
1014                        matches!(b.read().extid_index.get(ext_id), Some(&owner) if owner != *vid)
1015                    });
1016                    if taken {
1017                        metrics::counter!("uni_ssi_constraint_conflicts_total").increment(1);
1018                        return Err(anyhow::Error::new(
1019                            uni_common::UniError::ConstraintConflict {
1020                                message: format!(
1021                                    "ext_id '{ext_id}' already committed by a concurrent \
1022                                     transaction"
1023                                ),
1024                            },
1025                        ));
1026                    }
1027                }
1028
1029                // CRDT carve-out soundness: a pure-CRDT write was dropped from the
1030                // write-set assuming its merge commutes. If the overlay holds a
1031                // *different* CRDT variant for the same property, the merge would
1032                // silently overwrite it — abort instead of losing the update.
1033                // (Checked against every overlay buffer: conservative if an old
1034                // generation held a different variant that a newer commit already
1035                // replaced, but an abort+retry is always sound.)
1036                for buf in &overlay {
1037                    if let Some(conflict) =
1038                        crate::runtime::occ::crdt_carveout_overwrite(&tx_l0, &buf.read())
1039                    {
1040                        metrics::counter!("uni_ssi_crdt_aborts_total").increment(1);
1041                        return Err(anyhow::Error::new(
1042                            uni_common::UniError::SerializationConflict {
1043                                message: conflict.to_string(),
1044                            },
1045                        ));
1046                    }
1047                }
1048            }
1049            Some(write_set)
1050        } else {
1051            None
1052        };
1053
1054        // Issue #77: an edge whose endpoint is effectively deleted makes the
1055        // merge below bail. That bail MUST happen before the durable WAL flush —
1056        // after it the transaction is committed-but-unmerged (a ghost commit),
1057        // and WAL replay re-hits the same bail, making the database unopenable.
1058        // SSI validation was deliberately placed before the flush for exactly
1059        // this reason; the endpoint check belongs here too. Runs unconditionally
1060        // (issue #77 is not SSI-gated) under `flush_lock`, so the overlay
1061        // tombstone state cannot change between here and the merge. A
1062        // tombstone may live in a flush-rotated pending buffer rather than
1063        // the current buffer, so the check walks the overlay newest-first.
1064        {
1065            let tx_l0 = tx_l0_arc.read();
1066            self.validate_edge_endpoints_overlay(&tx_l0)?;
1067        }
1068
1069        // Crash-recovery seam: SSI validation has passed; the transaction is
1070        // about to become durable. A crash here must leave NO trace (validation
1071        // happens before the WAL is touched). No-op unless `failpoints`.
1072        fail::fail_point!("commit::after-validate");
1073
1074        // 1. Write transaction mutations to WAL BEFORE merging into main L0
1075        // This ensures durability before visibility.
1076        {
1077            let tx_l0 = tx_l0_arc.read();
1078            let main_l0_arc = self.l0_manager.get_current();
1079            let main_l0 = main_l0_arc.read();
1080
1081            // If WAL exists, write mutations to it for durability
1082            if let Some(wal) = main_l0.wal.as_ref() {
1083                // Order: vertices first, then edges (to ensure src/dst exist on replay)
1084
1085                // Vertex insertions
1086                for (vid, properties) in &tx_l0.vertex_properties {
1087                    if !tx_l0.vertex_tombstones.contains(vid) {
1088                        let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
1089                        wal.append(crate::runtime::wal::Mutation::InsertVertex {
1090                            vid: *vid,
1091                            properties: properties.clone(),
1092                            labels,
1093                        })?;
1094                    }
1095                }
1096
1097                // Vertex deletions
1098                for vid in &tx_l0.vertex_tombstones {
1099                    let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
1100                    wal.append(crate::runtime::wal::Mutation::DeleteVertex { vid: *vid, labels })?;
1101                }
1102
1103                // Label-only mutations (SET n:Label / REMOVE n:Label). After
1104                // vertex inserts (so the vertex exists on replay), before edges,
1105                // and skipping vertices deleted in this same commit.
1106                for vid in &tx_l0.vertex_label_overwrites {
1107                    if tx_l0.vertex_tombstones.contains(vid) {
1108                        continue;
1109                    }
1110                    let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
1111                    wal.append(crate::runtime::wal::Mutation::SetVertexLabels {
1112                        vid: *vid,
1113                        labels,
1114                    })?;
1115                }
1116
1117                // Crash-recovery seam: vertices appended, edges not yet. Tests
1118                // assert that a crash here (before `flush_wal`) recovers NOTHING
1119                // — the durable commit point is the flush below, not append.
1120                fail::fail_point!("commit::mid-wal");
1121
1122                // Edge insertions and deletions from edge_endpoints
1123                for (eid, (src_vid, dst_vid, edge_type)) in &tx_l0.edge_endpoints {
1124                    if tx_l0.tombstones.contains_key(eid) {
1125                        let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
1126                        wal.append(crate::runtime::wal::Mutation::DeleteEdge {
1127                            eid: *eid,
1128                            src_vid: *src_vid,
1129                            dst_vid: *dst_vid,
1130                            edge_type: *edge_type,
1131                            version,
1132                        })?;
1133                    } else {
1134                        let properties =
1135                            tx_l0.edge_properties.get(eid).cloned().unwrap_or_default();
1136                        let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
1137                        let edge_type_name = tx_l0.edge_types.get(eid).cloned();
1138                        wal.append(crate::runtime::wal::Mutation::InsertEdge {
1139                            src_vid: *src_vid,
1140                            dst_vid: *dst_vid,
1141                            edge_type: *edge_type,
1142                            eid: *eid,
1143                            version,
1144                            properties,
1145                            edge_type_name,
1146                        })?;
1147                    }
1148                }
1149
1150                // Tombstones for edges that only exist in the global L0 (not in
1151                // this transaction's edge_endpoints).  Without this, deletes of
1152                // pre-existing edges would be silently lost.
1153                for (eid, tombstone) in &tx_l0.tombstones {
1154                    if !tx_l0.edge_endpoints.contains_key(eid) {
1155                        let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
1156                        wal.append(crate::runtime::wal::Mutation::DeleteEdge {
1157                            eid: *eid,
1158                            src_vid: tombstone.src_vid,
1159                            dst_vid: tombstone.dst_vid,
1160                            edge_type: tombstone.edge_type,
1161                            version,
1162                        })?;
1163                    }
1164                }
1165            }
1166        }
1167
1168        // 2. Flush WAL to durable storage - THIS IS THE COMMIT POINT
1169        let wal_lsn = self.flush_wal().await?;
1170
1171        // Crash-recovery seam: the WAL is durable but main L0 has NOT merged.
1172        // A crash here must RECOVER the transaction on replay (it is committed),
1173        // even though it was never made visible in-process. No-op unless `failpoints`.
1174        fail::fail_point!("commit::after-wal-flush");
1175
1176        // Component C1: if an outstanding snapshot pins the current generation,
1177        // clone it aside (lazy copy-on-write) before merging, so the pinning
1178        // transaction's reads stay isolated from this commit. No-op — and zero
1179        // cost — when nothing is pinned (the common case). We hold `flush_lock`,
1180        // so this cannot race a flush rotate or another commit's merge; the merge
1181        // below re-fetches `get_current()`, landing in the fresh post-freeze buffer.
1182        // Self-gates on the runtime SSI toggle: a snapshot is only ever pinned by
1183        // a transaction begun under `ssi_enabled`, so `is_current_pinned()` is
1184        // always false when SSI is off and this is a zero-cost no-op.
1185        if self.l0_manager.is_current_pinned() {
1186            self.l0_manager.freeze_current_for_snapshot();
1187            metrics::counter!("uni_l0_snapshot_freezes_total").increment(1);
1188        }
1189
1190        // 3. Merge into main L0 and make visible
1191        {
1192            // Write-lock the tx buffer: `merge_take` moves its property maps
1193            // into main L0 instead of cloning them. The commit consumes the
1194            // transaction, so the drained maps are never observed afterwards;
1195            // everything read below (endpoints, versions, tombstones) is left
1196            // intact.
1197            let mut tx_l0 = tx_l0_arc.write();
1198            let main_l0_arc = self.l0_manager.get_current();
1199            let mut main_l0 = main_l0_arc.write();
1200            // A CRDT property's committed state may live only in a
1201            // flush-rotated pending buffer (the post-rotation current is
1202            // empty until the Lance write completes — the Bug #9A window).
1203            // `merge_crdt_properties` merges against the CURRENT buffer's
1204            // value — without seeding, the tx's CRDT state would SHADOW the
1205            // pending buffer's at read time (newest buffer wins per property)
1206            // and concurrent increments would be lost. Seed the newest
1207            // overlay value for each CRDT property the tx writes that current
1208            // lacks, so the merge below merges instead of replaces.
1209            self.seed_crdt_state_from_chain(&tx_l0, &mut main_l0);
1210            main_l0.merge_take(&mut tx_l0)?;
1211
1212            // Replay transaction edges into the AdjacencyManager overlay
1213            for (eid, (src, dst, etype)) in &tx_l0.edge_endpoints {
1214                let edge_version = tx_l0
1215                    .edge_versions
1216                    .get(eid)
1217                    .copied()
1218                    .unwrap_or(main_l0.current_version);
1219                if tx_l0.tombstones.contains_key(eid) {
1220                    self.adjacency_manager
1221                        .add_tombstone(*eid, *src, *dst, *etype, edge_version);
1222                } else {
1223                    self.adjacency_manager
1224                        .insert_edge(*src, *dst, *eid, *etype, edge_version);
1225                }
1226            }
1227
1228            // Replay tombstones for edges that only exist in the global L0
1229            // (not in this transaction's edge_endpoints).
1230            for (eid, tombstone) in &tx_l0.tombstones {
1231                if !tx_l0.edge_endpoints.contains_key(eid) {
1232                    let edge_version = tx_l0
1233                        .edge_versions
1234                        .get(eid)
1235                        .copied()
1236                        .unwrap_or(main_l0.current_version);
1237                    self.adjacency_manager.add_tombstone(
1238                        *eid,
1239                        tombstone.src_vid,
1240                        tombstone.dst_vid,
1241                        tombstone.edge_type,
1242                        edge_version,
1243                    );
1244                }
1245            }
1246        }
1247
1248        // Crash-recovery seam: durable AND merged, but the in-memory commit
1249        // registry has not recorded this write-set yet. A crash here is
1250        // indistinguishable from one at `after-wal-flush` on reopen (the
1251        // registry is in-memory and rebuilt empty); the tx still recovers.
1252        fail::fail_point!("commit::after-merge");
1253
1254        // SSI: register this commit's write-set under a fresh commit sequence so
1255        // later transactions detect conflicts against it. Still under
1256        // `flush_lock`, before the async-flush branch can drop the guard.
1257        // `occ_write_set` is `Some` only when `config.ssi_enabled`.
1258        if let Some(write_set) = occ_write_set
1259            && !write_set.is_empty()
1260        {
1261            // Bump-then-record via the shared OCC seam (see `CommitRegistry::commit`)
1262            // so production and the loom/shuttle models exercise identical logic.
1263            self.committed_writes
1264                .lock()
1265                .commit(&self.commit_sequence, write_set);
1266        }
1267
1268        self.update_metrics();
1269
1270        // 4. Best-effort post-commit auto-flush.
1271        //
1272        // Two paths:
1273        // - async_flush_enabled = false (default): inline under our
1274        //   existing flush_lock guard via flush_inline_under_lock.
1275        // - async_flush_enabled = true: rotate inline, drop flush_lock,
1276        //   then submit the stream phase to the coordinator. Gated on
1277        //   `pending_flush_count() < max_pending_flushes` so we don't
1278        //   stack up rotations beyond the configured pipeline depth.
1279        //   `try_acquire_permit` is non-blocking: if we lose the race
1280        //   for the last permit, we just skip this trigger (the next
1281        //   commit retries).
1282        let mut flush_pending = false;
1283        if self.should_flush() {
1284            if self.config.async_flush_enabled
1285                && let Some(coord) = self.flush_coordinator.as_ref()
1286                && coord.pending_flush_count() < self.config.max_pending_flushes
1287            {
1288                match coord.try_acquire_permit() {
1289                    Some(permit) => {
1290                        match self.flush_l0_rotate().await {
1291                            Ok(rotate_out) => {
1292                                // Allocate the rotate seq and bump pending ONLY
1293                                // after the rotate succeeds (Bug #3). A failed
1294                                // rotate must consume neither: the finalizer
1295                                // advances strictly in consecutive seq order and
1296                                // only decrements pending on finalize, so a
1297                                // leaked seq/pending from a failed rotate would
1298                                // wedge the finalizer forever and climb pending
1299                                // toward `max_pending_flushes`. The seq is still
1300                                // allocated under `flush_lock` (immediately after
1301                                // the rotate, before the guard drops below), so
1302                                // concurrent rotates keep seq order == rotation
1303                                // order, and the seq is not used until submit.
1304                                let seq = coord.next_rotate_seq();
1305                                coord.note_pending();
1306                                // Release flush_lock BEFORE the spawn so concurrent
1307                                // commits can proceed while the stream runs.
1308                                drop(_flush_lock_guard);
1309                                let parent_manifest = self.cached_manifest.lock().clone();
1310                                let rotated = crate::runtime::flush_coordinator::RotatedFlush {
1311                                    seq,
1312                                    old_l0_arc: rotate_out.old_l0_arc.clone(),
1313                                    wal_lsn: rotate_out.wal_lsn,
1314                                    current_version: rotate_out.current_version,
1315                                    name: None,
1316                                    parent_manifest,
1317                                    permit,
1318                                    flush_in_progress_guard: rotate_out.flush_in_progress_guard,
1319                                };
1320                                let writer = self.clone();
1321                                let _ticket = coord.submit_for_stream(
1322                                    rotated,
1323                                    move |old_l0, wal, ver, n| async move {
1324                                        let outcome =
1325                                            writer.flush_stream_l1(old_l0, wal, ver, n).await?;
1326                                        Ok(crate::runtime::flush_coordinator::FlushOutcome {
1327                                            new_manifest: outcome.manifest,
1328                                            snapshot_id: outcome.snapshot_id,
1329                                        })
1330                                    },
1331                                );
1332                                flush_pending = true;
1333                                // Early return — flush_lock already dropped.
1334                                return Ok((wal_lsn, flush_pending));
1335                            }
1336                            Err(e) => {
1337                                tracing::warn!("Async rotate failed (non-critical): {}", e);
1338                                // No seq was allocated and pending was not
1339                                // bumped (both moved into the Ok arm for Bug
1340                                // #3), so the finalizer is not wedged. The
1341                                // permit drops here, freeing the slot.
1342                            }
1343                        }
1344                    }
1345                    None => {
1346                        // Race: someone else grabbed the last permit. Skip;
1347                        // next commit will retry should_flush().
1348                        metrics::counter!("uni_flush_trigger_skipped_total").increment(1);
1349                    }
1350                }
1351            } else if let Err(e) = self.flush_inline_under_lock(None).await {
1352                tracing::warn!("Post-commit flush check failed (non-critical): {}", e);
1353            }
1354        }
1355
1356        Ok((wal_lsn, flush_pending))
1357    }
1358
1359    /// Flush the WAL buffer to durable storage.
1360    ///
1361    /// Returns the LSN of the flushed segment, or `0` when no WAL is configured.
1362    pub async fn flush_wal(&self) -> Result<u64> {
1363        let l0 = self.l0_manager.get_current();
1364        let wal = l0.read().wal.clone();
1365
1366        match wal {
1367            Some(wal) => Ok(wal.flush().await?),
1368            None => Ok(0),
1369        }
1370    }
1371
1372    /// Record property removals in the active L0 mutation stats.
1373    ///
1374    /// Routes to the transaction L0 if provided, otherwise to the main L0.
1375    pub fn track_properties_removed(&self, count: usize, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) {
1376        if count == 0 {
1377            return;
1378        }
1379        let l0 = self.resolve_l0(tx_l0);
1380        l0.write().mutation_stats.properties_removed += count;
1381    }
1382
1383    /// Validates vertex constraints for the given properties.
1384    /// In the new design, label is passed as a parameter since VID no longer embeds label.
1385    async fn validate_vertex_constraints_for_label(
1386        &self,
1387        vid: Vid,
1388        properties: &Properties,
1389        label: &str,
1390        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1391    ) -> Result<()> {
1392        self.validate_vertex_constraints_for_label_impl(vid, properties, label, tx_l0, false)
1393            .await
1394    }
1395
1396    /// Partial-update sibling: validates only constraints touching keys
1397    /// present in `properties` (the touched set). NOT NULL is checked
1398    /// only for touched keys; multi-key UNIQUE / CHECK / EXISTS are
1399    /// skipped when any referenced key is absent (the caller is
1400    /// expected to have routed to the full-row path in that case via
1401    /// `touched_needs_full_read`).
1402    async fn validate_vertex_constraints_for_label_partial(
1403        &self,
1404        vid: Vid,
1405        properties: &Properties,
1406        label: &str,
1407        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1408    ) -> Result<()> {
1409        self.validate_vertex_constraints_for_label_impl(vid, properties, label, tx_l0, true)
1410            .await
1411    }
1412
1413    async fn validate_vertex_constraints_for_label_impl(
1414        &self,
1415        vid: Vid,
1416        properties: &Properties,
1417        label: &str,
1418        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1419        partial: bool,
1420    ) -> Result<()> {
1421        let schema = self.schema_manager.schema();
1422
1423        {
1424            // 1. Check NOT NULL constraints (from Property definitions).
1425            //    Under partial-update mode, skip properties NOT in
1426            //    `properties` — they retain their previous (already-
1427            //    validated) value.
1428            if let Some(props_meta) = schema.properties.get(label) {
1429                for (prop_name, meta) in props_meta {
1430                    if !meta.nullable {
1431                        let present = properties.get(prop_name);
1432                        if partial && present.is_none() {
1433                            continue;
1434                        }
1435                        if present.is_none_or(|v| v.is_null()) {
1436                            log::warn!(
1437                                "Constraint violation: Property '{}' cannot be null for label '{}'",
1438                                prop_name,
1439                                label
1440                            );
1441                            return Err(anyhow!(
1442                                "Constraint violation: Property '{}' cannot be null",
1443                                prop_name
1444                            ));
1445                        }
1446                    }
1447                }
1448            }
1449
1450            // 2. Check Explicit Constraints (Unique, Check, etc.)
1451            for constraint in &schema.constraints {
1452                if !constraint.enabled {
1453                    continue;
1454                }
1455                match &constraint.target {
1456                    ConstraintTarget::Label(l) if l == label => {}
1457                    _ => continue,
1458                }
1459
1460                match &constraint.constraint_type {
1461                    ConstraintType::Unique {
1462                        properties: unique_props,
1463                    } => {
1464                        // Support single and multi-property unique constraints
1465                        if !unique_props.is_empty() {
1466                            let mut key_values = Vec::new();
1467                            let mut missing = false;
1468                            for prop in unique_props {
1469                                if let Some(val) = properties.get(prop) {
1470                                    key_values.push((prop.clone(), val.clone()));
1471                                } else {
1472                                    missing = true; // Can't enforce if property missing (partial update?)
1473                                    // For INSERT, missing means null?
1474                                    // If property is nullable, unique constraint typically allows multiple nulls or ignores?
1475                                    // For now, only check if ALL keys are present
1476                                }
1477                            }
1478
1479                            if !missing {
1480                                self.check_unique_constraint_multi(label, &key_values, vid, tx_l0)
1481                                    .await?;
1482                            }
1483                        }
1484                    }
1485                    ConstraintType::Exists { property } => {
1486                        if properties.get(property).is_none_or(|v| v.is_null()) {
1487                            log::warn!(
1488                                "Constraint violation: Property '{}' must exist for label '{}'",
1489                                property,
1490                                label
1491                            );
1492                            return Err(anyhow!(
1493                                "Constraint violation: Property '{}' must exist",
1494                                property
1495                            ));
1496                        }
1497                    }
1498                    ConstraintType::Check { expression } => {
1499                        if !self.evaluate_check_constraint(expression, properties)? {
1500                            return Err(anyhow!(
1501                                "CHECK constraint '{}' violated: expression '{}' evaluated to false",
1502                                constraint.name,
1503                                expression
1504                            ));
1505                        }
1506                    }
1507                    _ => {
1508                        return Err(anyhow!("Unsupported constraint type"));
1509                    }
1510                }
1511            }
1512        }
1513        Ok(())
1514    }
1515
1516    /// Validates vertex constraints for a vertex with the given labels.
1517    /// Labels must be passed explicitly since the vertex may not yet be in L0.
1518    /// Unknown labels (not in schema) are skipped.
1519    async fn validate_vertex_constraints(
1520        &self,
1521        vid: Vid,
1522        properties: &Properties,
1523        labels: &[String],
1524        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1525    ) -> Result<()> {
1526        let schema = self.schema_manager.schema();
1527
1528        // Validate constraints only for known labels
1529        for label in labels {
1530            // Skip unknown labels (schemaless support)
1531            if schema.get_label_case_insensitive(label).is_none() {
1532                continue;
1533            }
1534            self.validate_vertex_constraints_for_label(vid, properties, label, tx_l0)
1535                .await?;
1536        }
1537
1538        // Check global ext_id uniqueness if ext_id is provided
1539        if let Some(ext_id) = properties.get("ext_id").and_then(|v| v.as_str()) {
1540            self.check_extid_globally_unique(ext_id, vid, tx_l0).await?;
1541        }
1542
1543        Ok(())
1544    }
1545
1546    /// Partial sibling of `validate_vertex_constraints` — validates only
1547    /// constraints touching keys present in `properties`. Used by
1548    /// `insert_vertex_partial`'s fast path; the caller pre-screens for
1549    /// multi-key UNIQUE constraints via `touched_needs_full_read`.
1550    async fn validate_vertex_constraints_partial(
1551        &self,
1552        vid: Vid,
1553        touched: &Properties,
1554        labels: &[String],
1555        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1556    ) -> Result<()> {
1557        let schema = self.schema_manager.schema();
1558        for label in labels {
1559            if schema.get_label_case_insensitive(label).is_none() {
1560                continue;
1561            }
1562            self.validate_vertex_constraints_for_label_partial(vid, touched, label, tx_l0)
1563                .await?;
1564        }
1565        if let Some(ext_id) = touched.get("ext_id").and_then(|v| v.as_str()) {
1566            self.check_extid_globally_unique(ext_id, vid, tx_l0).await?;
1567        }
1568        Ok(())
1569    }
1570
1571    /// Collect ext_ids and unique constraint keys from an iterator of vertex properties.
1572    ///
1573    /// Used to build a constraint key index from L0 buffers for batch validation.
1574    fn collect_constraint_keys_from_properties<'a>(
1575        properties_iter: impl Iterator<Item = &'a Properties>,
1576        label: &str,
1577        constraints: &[uni_common::core::schema::Constraint],
1578        existing_keys: &mut HashMap<String, HashSet<String>>,
1579        existing_extids: &mut HashSet<String>,
1580    ) {
1581        for props in properties_iter {
1582            if let Some(ext_id) = props.get("ext_id").and_then(|v| v.as_str()) {
1583                existing_extids.insert(ext_id.to_string());
1584            }
1585
1586            for constraint in constraints {
1587                if !constraint.enabled {
1588                    continue;
1589                }
1590                if let ConstraintTarget::Label(l) = &constraint.target {
1591                    if l != label {
1592                        continue;
1593                    }
1594                } else {
1595                    continue;
1596                }
1597
1598                if let ConstraintType::Unique {
1599                    properties: unique_props,
1600                } = &constraint.constraint_type
1601                {
1602                    let mut key_parts = Vec::new();
1603                    let mut all_present = true;
1604                    for prop in unique_props {
1605                        if let Some(val) = props.get(prop) {
1606                            key_parts.push(format!("{}:{}", prop, val));
1607                        } else {
1608                            all_present = false;
1609                            break;
1610                        }
1611                    }
1612                    if all_present {
1613                        let key = key_parts.join("|");
1614                        existing_keys
1615                            .entry(constraint.name.clone())
1616                            .or_default()
1617                            .insert(key);
1618                    }
1619                }
1620            }
1621        }
1622    }
1623
1624    /// Validates constraints for a batch of vertices efficiently.
1625    ///
1626    /// This method builds an in-memory index from L0 buffers ONCE instead of scanning
1627    /// per vertex, reducing complexity from O(n²) to O(n) for bulk inserts.
1628    ///
1629    /// # Arguments
1630    /// * `vids` - VIDs of vertices being inserted
1631    /// * `properties_batch` - Properties for each vertex
1632    /// * `label` - Label for all vertices (assumes single label for now)
1633    ///
1634    /// # Performance
1635    /// For N vertices with unique constraints:
1636    /// - Old approach: O(N²) - scan L0 buffer N times
1637    /// - New approach: O(N) - scan L0 buffer once, build HashSet, check each vertex in O(1)
1638    async fn validate_vertex_batch_constraints(
1639        &self,
1640        vids: &[Vid],
1641        properties_batch: &[Properties],
1642        label: &str,
1643        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1644    ) -> Result<()> {
1645        if vids.len() != properties_batch.len() {
1646            return Err(anyhow!("VID/properties length mismatch"));
1647        }
1648
1649        let schema = self.schema_manager.schema();
1650
1651        // 1. Validate NOT NULL constraints for each vertex
1652        if let Some(props_meta) = schema.properties.get(label) {
1653            for (idx, properties) in properties_batch.iter().enumerate() {
1654                for (prop_name, meta) in props_meta {
1655                    if !meta.nullable && properties.get(prop_name).is_none_or(|v| v.is_null()) {
1656                        return Err(anyhow!(
1657                            "Constraint violation at index {}: Property '{}' cannot be null",
1658                            idx,
1659                            prop_name
1660                        ));
1661                    }
1662                }
1663            }
1664        }
1665
1666        // 2. Build constraint key index from L0 buffers (ONCE for entire batch)
1667        let mut existing_keys: HashMap<String, HashSet<String>> = HashMap::new();
1668        let mut existing_extids: HashSet<String> = HashSet::new();
1669
1670        // Scan current L0 buffer
1671        {
1672            let l0 = self.l0_manager.get_current();
1673            let l0_guard = l0.read();
1674            Self::collect_constraint_keys_from_properties(
1675                l0_guard.vertex_properties.values(),
1676                label,
1677                &schema.constraints,
1678                &mut existing_keys,
1679                &mut existing_extids,
1680            );
1681        }
1682
1683        // Scan transaction L0 if present
1684        if let Some(tx_l0) = tx_l0 {
1685            let tx_l0_guard = tx_l0.read();
1686            Self::collect_constraint_keys_from_properties(
1687                tx_l0_guard.vertex_properties.values(),
1688                label,
1689                &schema.constraints,
1690                &mut existing_keys,
1691                &mut existing_extids,
1692            );
1693        }
1694
1695        // 3. Check batch vertices against index AND check for duplicates within batch
1696        let mut batch_keys: HashMap<String, HashMap<String, usize>> = HashMap::new();
1697        let mut batch_extids: HashMap<String, usize> = HashMap::new();
1698
1699        for (idx, (_vid, properties)) in vids.iter().zip(properties_batch.iter()).enumerate() {
1700            // Check ext_id uniqueness
1701            if let Some(ext_id) = properties.get("ext_id").and_then(|v| v.as_str()) {
1702                if existing_extids.contains(ext_id) {
1703                    return Err(anyhow!(
1704                        "Constraint violation at index {}: ext_id '{}' already exists",
1705                        idx,
1706                        ext_id
1707                    ));
1708                }
1709                if let Some(first_idx) = batch_extids.get(ext_id) {
1710                    return Err(anyhow!(
1711                        "Constraint violation: ext_id '{}' duplicated in batch at indices {} and {}",
1712                        ext_id,
1713                        first_idx,
1714                        idx
1715                    ));
1716                }
1717                // Also check the main vertices table — the L0 scans above
1718                // miss vertices already flushed to L1, so without this a
1719                // batch insert (e.g. a fork promote onto primary) silently
1720                // twins a duplicate ext_id instead of erroring. Mirrors the
1721                // single-vertex `check_extid_globally_unique`.
1722                if let Ok(Some(found_vid)) =
1723                    MainVertexDataset::find_by_ext_id(self.storage.backend(), ext_id, None).await
1724                {
1725                    return Err(anyhow!(
1726                        "Constraint violation at index {}: ext_id '{}' already exists (vertex {:?})",
1727                        idx,
1728                        ext_id,
1729                        found_vid
1730                    ));
1731                }
1732                batch_extids.insert(ext_id.to_string(), idx);
1733            }
1734
1735            // Check unique constraints
1736            for constraint in &schema.constraints {
1737                if !constraint.enabled {
1738                    continue;
1739                }
1740                if let ConstraintTarget::Label(l) = &constraint.target {
1741                    if l != label {
1742                        continue;
1743                    }
1744                } else {
1745                    continue;
1746                }
1747
1748                match &constraint.constraint_type {
1749                    ConstraintType::Unique {
1750                        properties: unique_props,
1751                    } => {
1752                        let mut key_parts = Vec::new();
1753                        let mut all_present = true;
1754                        for prop in unique_props {
1755                            if let Some(val) = properties.get(prop) {
1756                                key_parts.push(format!("{}:{}", prop, val));
1757                            } else {
1758                                all_present = false;
1759                                break;
1760                            }
1761                        }
1762
1763                        if all_present {
1764                            let key = key_parts.join("|");
1765
1766                            // Check against existing L0 keys
1767                            if let Some(keys) = existing_keys.get(&constraint.name)
1768                                && keys.contains(&key)
1769                            {
1770                                return Err(anyhow!(
1771                                    "Constraint violation at index {}: Duplicate composite key for label '{}' (constraint '{}')",
1772                                    idx,
1773                                    label,
1774                                    constraint.name
1775                                ));
1776                            }
1777
1778                            // Check for duplicates within batch
1779                            let batch_constraint_keys =
1780                                batch_keys.entry(constraint.name.clone()).or_default();
1781                            if let Some(first_idx) = batch_constraint_keys.get(&key) {
1782                                return Err(anyhow!(
1783                                    "Constraint violation: Duplicate key '{}' in batch at indices {} and {}",
1784                                    key,
1785                                    first_idx,
1786                                    idx
1787                                ));
1788                            }
1789                            batch_constraint_keys.insert(key, idx);
1790                        }
1791                    }
1792                    ConstraintType::Exists { property }
1793                        if properties.get(property).is_none_or(|v| v.is_null()) =>
1794                    {
1795                        return Err(anyhow!(
1796                            "Constraint violation at index {}: Property '{}' must exist",
1797                            idx,
1798                            property
1799                        ));
1800                    }
1801                    ConstraintType::Check { expression }
1802                        if !self.evaluate_check_constraint(expression, properties)? =>
1803                    {
1804                        return Err(anyhow!(
1805                            "Constraint violation at index {}: CHECK constraint '{}' violated",
1806                            idx,
1807                            constraint.name
1808                        ));
1809                    }
1810                    _ => {}
1811                }
1812            }
1813        }
1814
1815        // 4. Check storage for unique constraints (can batch this into a single query)
1816        for constraint in &schema.constraints {
1817            if !constraint.enabled {
1818                continue;
1819            }
1820            if let ConstraintTarget::Label(l) = &constraint.target {
1821                if l != label {
1822                    continue;
1823                }
1824            } else {
1825                continue;
1826            }
1827
1828            if let ConstraintType::Unique {
1829                properties: unique_props,
1830            } = &constraint.constraint_type
1831            {
1832                // Build compound OR filter for all batch vertices
1833                let mut or_filters = Vec::new();
1834                for properties in properties_batch.iter() {
1835                    let mut and_parts = Vec::new();
1836                    let mut all_present = true;
1837                    for prop in unique_props {
1838                        if let Some(val) = properties.get(prop) {
1839                            let val_str = match val {
1840                                Value::String(s) => format!("'{}'", s.replace('\'', "''")),
1841                                Value::Int(n) => n.to_string(),
1842                                Value::Float(f) => f.to_string(),
1843                                Value::Bool(b) => b.to_string(),
1844                                _ => {
1845                                    all_present = false;
1846                                    break;
1847                                }
1848                            };
1849                            and_parts.push(format!("{} = {}", prop, val_str));
1850                        } else {
1851                            all_present = false;
1852                            break;
1853                        }
1854                    }
1855                    if all_present {
1856                        or_filters.push(format!("({})", and_parts.join(" AND ")));
1857                    }
1858                }
1859
1860                #[cfg(feature = "lance-backend")]
1861                if !or_filters.is_empty() {
1862                    let vid_list: Vec<String> =
1863                        vids.iter().map(|v| v.as_u64().to_string()).collect();
1864                    let filter = format!(
1865                        "({}) AND _deleted = false AND _vid NOT IN ({})",
1866                        or_filters.join(" OR "),
1867                        vid_list.join(", ")
1868                    );
1869
1870                    if let Ok(ds) = self.storage.vertex_dataset(label)
1871                        && let Ok(lance_ds) = ds.open_raw().await
1872                    {
1873                        let count = lance_ds.count_rows(Some(filter.clone())).await?;
1874                        if count > 0 {
1875                            return Err(anyhow!(
1876                                "Constraint violation: Duplicate composite key for label '{}' in storage (constraint '{}')",
1877                                label,
1878                                constraint.name
1879                            ));
1880                        }
1881                    }
1882                }
1883            }
1884        }
1885
1886        Ok(())
1887    }
1888
1889    /// Checks that ext_id is globally unique across all vertices.
1890    ///
1891    /// Searches L0 buffers (current, transaction, pending) and the main vertices table
1892    /// to ensure no other vertex uses this ext_id.
1893    ///
1894    /// # Errors
1895    ///
1896    /// Returns error if another vertex with the same ext_id exists.
1897    async fn check_extid_globally_unique(
1898        &self,
1899        ext_id: &str,
1900        current_vid: Vid,
1901        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1902    ) -> Result<()> {
1903        // Check L0 buffers: current, transaction, and pending flush
1904        let l0_buffers_to_check: Vec<Arc<RwLock<L0Buffer>>> = {
1905            let mut buffers = vec![self.l0_manager.get_current()];
1906            if let Some(tx_l0) = tx_l0 {
1907                buffers.push(tx_l0.clone());
1908            }
1909            buffers.extend(self.l0_manager.get_pending_flush());
1910            buffers
1911        };
1912
1913        for l0 in &l0_buffers_to_check {
1914            // O(1) per buffer via the maintained `extid_index` (the previous
1915            // full `vertex_properties` scan made constrained ingest O(n²)).
1916            if let Some(&vid) = l0.read().extid_index.get(ext_id)
1917                && vid != current_vid
1918            {
1919                return Err(anyhow!(
1920                    "Constraint violation: ext_id '{}' already exists (vertex {:?})",
1921                    ext_id,
1922                    vid
1923                ));
1924            }
1925        }
1926
1927        // Check main vertices table (if it exists)
1928        // Pass None for global uniqueness check (not snapshot-isolated)
1929        let backend = self.storage.backend();
1930        if let Ok(Some(found_vid)) = MainVertexDataset::find_by_ext_id(backend, ext_id, None).await
1931            && found_vid != current_vid
1932        {
1933            return Err(anyhow!(
1934                "Constraint violation: ext_id '{}' already exists (vertex {:?})",
1935                ext_id,
1936                found_vid
1937            ));
1938        }
1939
1940        Ok(())
1941    }
1942
1943    /// Helper to get vertex labels from L0 buffer.
1944    fn get_vertex_labels_from_l0(&self, vid: Vid) -> Option<Vec<String>> {
1945        let l0 = self.l0_manager.get_current();
1946        let l0_guard = l0.read();
1947        // Check if vertex is tombstoned (deleted) - if so, return None
1948        if l0_guard.vertex_tombstones.contains(&vid) {
1949            return None;
1950        }
1951        l0_guard.get_vertex_labels(vid).map(|l| l.to_vec())
1952    }
1953
1954    /// Get vertex labels from all sources: current L0, pending L0s, and storage.
1955    /// This is the proper way to read vertex labels after a flush, as it checks both
1956    /// in-memory buffers and persisted storage.
1957    pub async fn get_vertex_labels(
1958        &self,
1959        vid: Vid,
1960        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1961    ) -> Option<Vec<String>> {
1962        // 1. Check current L0
1963        if let Some(labels) = self.get_vertex_labels_from_l0(vid) {
1964            return Some(labels);
1965        }
1966
1967        // 2. Check transaction L0 if present
1968        if let Some(tx_l0) = tx_l0 {
1969            let guard = tx_l0.read();
1970            if guard.vertex_tombstones.contains(&vid) {
1971                return None;
1972            }
1973            if let Some(labels) = guard.get_vertex_labels(vid) {
1974                return Some(labels.to_vec());
1975            }
1976        }
1977
1978        // 3. Check pending flush L0s
1979        for pending_l0 in self.l0_manager.get_pending_flush() {
1980            let guard = pending_l0.read();
1981            if guard.vertex_tombstones.contains(&vid) {
1982                return None;
1983            }
1984            if let Some(labels) = guard.get_vertex_labels(vid) {
1985                return Some(labels.to_vec());
1986            }
1987        }
1988
1989        // 4. Check storage
1990        self.find_vertex_labels_in_storage(vid).await.ok().flatten()
1991    }
1992
1993    /// Helper to get edge type from L0 buffer.
1994    fn get_edge_type_from_l0(&self, eid: Eid) -> Option<String> {
1995        let l0 = self.l0_manager.get_current();
1996        let l0_guard = l0.read();
1997        l0_guard.get_edge_type(eid).map(|s| s.to_string())
1998    }
1999
2000    /// Look up the edge type ID (u32) for an EID from the L0 buffer's edge endpoints.
2001    /// Falls back to the transaction L0 if available.
2002    pub fn get_edge_type_id_from_l0(
2003        &self,
2004        eid: Eid,
2005        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2006    ) -> Option<u32> {
2007        // Check transaction L0 first
2008        if let Some(tx_l0) = tx_l0 {
2009            let guard = tx_l0.read();
2010            if let Some((_, _, etype)) = guard.get_edge_endpoint_full(eid) {
2011                return Some(etype);
2012            }
2013        }
2014        // Fall back to main L0
2015        let l0 = self.l0_manager.get_current();
2016        let l0_guard = l0.read();
2017        l0_guard
2018            .get_edge_endpoint_full(eid)
2019            .map(|(_, _, etype)| etype)
2020    }
2021
2022    /// Set the type name for an edge (used for schemaless edge types).
2023    /// This is called during CREATE for edge types not found in the schema.
2024    pub fn set_edge_type(
2025        &self,
2026        eid: Eid,
2027        type_name: String,
2028        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2029    ) {
2030        self.resolve_l0(tx_l0).write().set_edge_type(eid, type_name);
2031    }
2032
2033    /// Evaluate a simple CHECK constraint expression.
2034    /// Supports: "property op value" (e.g., "age > 18", "status = 'active'")
2035    fn evaluate_check_constraint(&self, expression: &str, properties: &Properties) -> Result<bool> {
2036        let parts: Vec<&str> = expression.split_whitespace().collect();
2037        if parts.len() != 3 {
2038            // For now, only support "prop op val"
2039            // Fallback to true if too complex to avoid breaking, but warn
2040            log::warn!(
2041                "Complex CHECK constraint expression '{}' not fully supported yet; allowing write.",
2042                expression
2043            );
2044            return Ok(true);
2045        }
2046
2047        let prop_part = parts[0].trim_start_matches('(');
2048        // Handle "variable.property" format - take the part after the dot
2049        let prop_name = if let Some(idx) = prop_part.find('.') {
2050            &prop_part[idx + 1..]
2051        } else {
2052            prop_part
2053        };
2054
2055        let op = parts[1];
2056        let val_str = parts[2].trim_end_matches(')');
2057
2058        let prop_val = match properties.get(prop_name) {
2059            Some(v) => v,
2060            None => return Ok(true), // If property missing, CHECK usually passes (unless NOT NULL)
2061        };
2062
2063        // Parse value string (handle quotes for strings)
2064        let target_val = if (val_str.starts_with('\'') && val_str.ends_with('\''))
2065            || (val_str.starts_with('"') && val_str.ends_with('"'))
2066        {
2067            Value::String(val_str[1..val_str.len() - 1].to_string())
2068        } else if let Ok(n) = val_str.parse::<i64>() {
2069            Value::Int(n)
2070        } else if let Ok(n) = val_str.parse::<f64>() {
2071            Value::Float(n)
2072        } else if let Ok(b) = val_str.parse::<bool>() {
2073            Value::Bool(b)
2074        } else {
2075            // Check for internal format wrappers if they somehow leaked through
2076            if val_str.starts_with("Number(") && val_str.ends_with(')') {
2077                let n_str = &val_str[7..val_str.len() - 1];
2078                if let Ok(n) = n_str.parse::<i64>() {
2079                    Value::Int(n)
2080                } else if let Ok(n) = n_str.parse::<f64>() {
2081                    Value::Float(n)
2082                } else {
2083                    Value::String(val_str.to_string())
2084                }
2085            } else {
2086                Value::String(val_str.to_string())
2087            }
2088        };
2089
2090        match op {
2091            "=" | "==" => Ok(prop_val == &target_val),
2092            "!=" | "<>" => Ok(prop_val != &target_val),
2093            ">" => self
2094                .compare_values(prop_val, &target_val)
2095                .map(|o| o.is_gt()),
2096            "<" => self
2097                .compare_values(prop_val, &target_val)
2098                .map(|o| o.is_lt()),
2099            ">=" => self
2100                .compare_values(prop_val, &target_val)
2101                .map(|o| o.is_ge()),
2102            "<=" => self
2103                .compare_values(prop_val, &target_val)
2104                .map(|o| o.is_le()),
2105            _ => {
2106                log::warn!("Unsupported operator '{}' in CHECK constraint", op);
2107                Ok(true)
2108            }
2109        }
2110    }
2111
2112    fn compare_values(&self, a: &Value, b: &Value) -> Result<std::cmp::Ordering> {
2113        use std::cmp::Ordering;
2114
2115        fn cmp_f64(x: f64, y: f64) -> Ordering {
2116            x.partial_cmp(&y).unwrap_or(Ordering::Equal)
2117        }
2118
2119        match (a, b) {
2120            (Value::Int(n1), Value::Int(n2)) => Ok(n1.cmp(n2)),
2121            (Value::Float(f1), Value::Float(f2)) => Ok(cmp_f64(*f1, *f2)),
2122            (Value::Int(n), Value::Float(f)) => Ok(cmp_f64(*n as f64, *f)),
2123            (Value::Float(f), Value::Int(n)) => Ok(cmp_f64(*f, *n as f64)),
2124            (Value::String(s1), Value::String(s2)) => Ok(s1.cmp(s2)),
2125            _ => Err(anyhow!(
2126                "Cannot compare incompatible types: {:?} vs {:?}",
2127                a,
2128                b
2129            )),
2130        }
2131    }
2132
2133    async fn check_unique_constraint_multi(
2134        &self,
2135        label: &str,
2136        key_values: &[(String, Value)],
2137        current_vid: Vid,
2138        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2139    ) -> Result<()> {
2140        // Serialize constraint key once for O(1) lookups
2141        let key = serialize_constraint_key(label, key_values);
2142
2143        // 1. Check L0 (in-memory) using O(1) constraint index
2144        {
2145            let l0 = self.l0_manager.get_current();
2146            let l0_guard = l0.read();
2147            if l0_guard.has_constraint_key(&key, current_vid) {
2148                return Err(anyhow!(
2149                    "Constraint violation: Duplicate composite key for label '{}'",
2150                    label
2151                ));
2152            }
2153        }
2154
2155        // 1b. Check pending-flush buffers (Bug #9A). A flush rotates a key's
2156        // buffer onto `pending_flush` and installs a fresh empty current
2157        // buffer; until the rotated rows reach Lance the key is invisible to
2158        // both the current-buffer check above and the storage check below, so
2159        // a duplicate could slip through that flush window. Mirror the read
2160        // paths (e.g. `check_extid_globally_unique`, `get_vertex_labels`) that
2161        // already consult `pending_flush`.
2162        for pending_l0 in self.l0_manager.get_pending_flush() {
2163            if pending_l0.read().has_constraint_key(&key, current_vid) {
2164                return Err(anyhow!(
2165                    "Constraint violation: Duplicate composite key for label '{}' (in pending flush)",
2166                    label
2167                ));
2168            }
2169        }
2170
2171        // Check Transaction L0
2172        if let Some(tx_l0) = tx_l0 {
2173            let tx_l0_guard = tx_l0.read();
2174            if tx_l0_guard.has_constraint_key(&key, current_vid) {
2175                return Err(anyhow!(
2176                    "Constraint violation: Duplicate composite key for label '{}' (in tx)",
2177                    label
2178                ));
2179            }
2180        }
2181
2182        // 2. Check Storage (L1/L2)
2183        let filters: Vec<String> = key_values
2184            .iter()
2185            .map(|(prop, val)| {
2186                let val_str = match val {
2187                    Value::String(s) => format!("'{}'", s.replace('\'', "''")),
2188                    Value::Int(n) => n.to_string(),
2189                    Value::Float(f) => f.to_string(),
2190                    Value::Bool(b) => b.to_string(),
2191                    _ => "NULL".to_string(),
2192                };
2193                format!("{} = {}", prop, val_str)
2194            })
2195            .collect();
2196
2197        let mut filter = filters.join(" AND ");
2198        filter.push_str(&format!(
2199            " AND _deleted = false AND _vid != {}",
2200            current_vid.as_u64()
2201        ));
2202
2203        #[cfg(feature = "lance-backend")]
2204        if let Ok(ds) = self.storage.vertex_dataset(label)
2205            && let Ok(lance_ds) = ds.open_raw().await
2206        {
2207            let count = lance_ds.count_rows(Some(filter.clone())).await?;
2208            if count > 0 {
2209                return Err(anyhow!(
2210                    "Constraint violation: Duplicate composite key for label '{}' (in storage). Filter: {}",
2211                    label,
2212                    filter
2213                ));
2214            }
2215        }
2216
2217        Ok(())
2218    }
2219
2220    async fn check_write_pressure(&self) -> Result<()> {
2221        let status = self
2222            .storage
2223            .compaction_status()
2224            .map_err(|e| anyhow::anyhow!("Failed to get compaction status: {}", e))?;
2225        let l1_runs = status.l1_runs;
2226        let throttle = &self.config.throttle;
2227
2228        if l1_runs >= throttle.hard_limit {
2229            log::warn!("Write stalled: L1 runs ({}) at hard limit", l1_runs);
2230            // Simple polling for now
2231            while self
2232                .storage
2233                .compaction_status()
2234                .map_err(|e| anyhow::anyhow!("Failed to get compaction status: {}", e))?
2235                .l1_runs
2236                >= throttle.hard_limit
2237            {
2238                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2239            }
2240        } else if l1_runs >= throttle.soft_limit {
2241            let excess = l1_runs - throttle.soft_limit;
2242            // Cap multiplier to avoid overflow
2243            let excess = std::cmp::min(excess, 31);
2244            let multiplier = 2_u32.pow(excess as u32);
2245            let delay = throttle.base_delay * multiplier;
2246            tokio::time::sleep(delay).await;
2247        }
2248        Ok(())
2249    }
2250
2251    /// Check transaction memory limit to prevent OOM.
2252    /// No-op when no transaction is active.
2253    fn check_transaction_memory(&self, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) -> Result<()> {
2254        if let Some(tx_l0) = tx_l0 {
2255            let size = tx_l0.read().estimated_size;
2256            if size > self.config.max_transaction_memory {
2257                return Err(anyhow!(
2258                    "Transaction memory limit exceeded: {} bytes used, limit is {} bytes. \
2259                     Roll back or commit the current transaction.",
2260                    size,
2261                    self.config.max_transaction_memory
2262                ));
2263            }
2264        }
2265        Ok(())
2266    }
2267
2268    async fn get_query_context(
2269        &self,
2270        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2271    ) -> Option<QueryContext> {
2272        Some(QueryContext::new_with_pending(
2273            self.l0_manager.get_current(),
2274            tx_l0.cloned(),
2275            self.l0_manager.get_pending_flush(),
2276        ))
2277    }
2278
2279    /// Layer-1 CRDT variant enforcement, shared by the single-vertex and batch
2280    /// write paths.
2281    ///
2282    /// Rejects a declared CRDT property written as a parsed CRDT value
2283    /// (`Value::Map`) whose variant differs from the schema's declared variant.
2284    /// A mismatch would make the commit-time merge silently overwrite instead of
2285    /// merge, and the OCC CRDT carve-out (`occ::crdt_carveout_overwrite` /
2286    /// `WriteSet::from_l0`) would hide it as a lost update — so it must be caught
2287    /// at write time, on *every* write path. `try_as_crdt` is `Map`-gated, so the
2288    /// JSON-string (Cypher) form and non-CRDT values pass through untouched: they
2289    /// are never carved out and stay conflictable.
2290    fn enforce_crdt_variants(
2291        props_meta: &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
2292        properties: &Properties,
2293    ) -> Result<()> {
2294        for (key, value) in properties {
2295            let Some(meta) = props_meta.get(key) else {
2296                continue;
2297            };
2298            let uni_common::core::schema::DataType::Crdt(expected) = &meta.r#type else {
2299                continue;
2300            };
2301            if let Some(crdt) = crate::runtime::l0::try_as_crdt(value)
2302                && crdt.type_name() != expected.type_name()
2303            {
2304                return Err(anyhow::Error::new(uni_common::UniError::Constraint {
2305                    message: format!(
2306                        "CRDT property '{key}' must be written as a {} value",
2307                        expected.type_name()
2308                    ),
2309                }));
2310            }
2311        }
2312        Ok(())
2313    }
2314
2315    /// Prepare a vertex for upsert by merging CRDT properties with existing values.
2316    ///
2317    /// When `label` is provided, uses it directly to look up property metadata.
2318    /// Otherwise falls back to discovering the label from L0 buffers and storage.
2319    ///
2320    /// # Errors
2321    ///
2322    /// Returns an error if CRDT property merging fails.
2323    async fn prepare_vertex_upsert(
2324        &self,
2325        vid: Vid,
2326        properties: &mut Properties,
2327        label: Option<&str>,
2328        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2329    ) -> Result<()> {
2330        let Some(pm) = &self.property_manager else {
2331            return Ok(());
2332        };
2333
2334        let schema = self.schema_manager.schema();
2335
2336        // Resolve label: use provided label or discover from L0/storage
2337        let discovered_labels;
2338        let label_name = if let Some(l) = label {
2339            Some(l)
2340        } else {
2341            discovered_labels = self.get_vertex_labels(vid, tx_l0).await;
2342            discovered_labels
2343                .as_ref()
2344                .and_then(|l| l.first().map(|s| s.as_str()))
2345        };
2346
2347        let Some(label_str) = label_name else {
2348            return Ok(());
2349        };
2350        let Some(props_meta) = schema.properties.get(label_str) else {
2351            return Ok(());
2352        };
2353
2354        // Identify CRDT properties in the insert data
2355        let crdt_keys: Vec<String> = properties
2356            .keys()
2357            .filter(|key| {
2358                props_meta.get(*key).is_some_and(|meta| {
2359                    matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2360                })
2361            })
2362            .cloned()
2363            .collect();
2364
2365        if crdt_keys.is_empty() {
2366            return Ok(());
2367        }
2368
2369        // Enforce that each declared CRDT property written as a parsed CRDT value
2370        // (`Value::Map`) carries its declared variant. A mismatched variant makes
2371        // `merge_crdt_properties` overwrite rather than merge at commit, and the
2372        // OCC carve-out (`occ::crdt_carveout_overwrite` / `WriteSet::from_l0`)
2373        // would hide that as a silent lost update — reject it at the source.
2374        //
2375        // Only the `Map` form is checked: it is exactly the form the carve-out
2376        // applies to (`try_as_crdt` is `Map`-gated). A CRDT written as a JSON
2377        // string (the Cypher form) or a non-CRDT value is never carved out — it
2378        // stays conflictable — so it poses no carve-out soundness risk and is left
2379        // to the existing merge/parse path. This is the declared-property half of
2380        // the layered fix; the commit-time check covers undeclared CRDT-shaped values.
2381        Self::enforce_crdt_variants(props_meta, properties)?;
2382
2383        let ctx = self.get_query_context(tx_l0).await;
2384        for key in crdt_keys {
2385            let existing = pm.get_vertex_prop_with_ctx(vid, &key, ctx.as_ref()).await?;
2386            if !existing.is_null()
2387                && let Some(val) = properties.get_mut(&key)
2388            {
2389                *val = pm.merge_crdt_values(&existing, val)?;
2390            }
2391        }
2392
2393        Ok(())
2394    }
2395
2396    async fn prepare_edge_upsert(
2397        &self,
2398        eid: Eid,
2399        properties: &mut Properties,
2400        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2401    ) -> Result<()> {
2402        if let Some(pm) = &self.property_manager {
2403            let schema = self.schema_manager.schema();
2404            // Get edge type from L0 buffer instead of from EID
2405            let type_name = self.get_edge_type_from_l0(eid);
2406
2407            if let Some(ref t_name) = type_name
2408                && let Some(props_meta) = schema.properties.get(t_name)
2409            {
2410                let mut crdt_keys = Vec::new();
2411                for (key, _) in properties.iter() {
2412                    if let Some(meta) = props_meta.get(key)
2413                        && matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2414                    {
2415                        crdt_keys.push(key.clone());
2416                    }
2417                }
2418
2419                if !crdt_keys.is_empty() {
2420                    let ctx = self.get_query_context(tx_l0).await;
2421                    for key in crdt_keys {
2422                        let existing = pm.get_edge_prop(eid, &key, ctx.as_ref()).await?;
2423
2424                        if !existing.is_null()
2425                            && let Some(val) = properties.get_mut(&key)
2426                        {
2427                            *val = pm.merge_crdt_values(&existing, val)?;
2428                        }
2429                    }
2430                }
2431            }
2432        }
2433        Ok(())
2434    }
2435
2436    #[instrument(skip(self, properties), level = "trace")]
2437    pub async fn insert_vertex(
2438        &self,
2439        vid: Vid,
2440        properties: Properties,
2441        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2442    ) -> Result<()> {
2443        self.insert_vertex_with_labels(vid, properties, &[], tx_l0)
2444            .await?;
2445        Ok(())
2446    }
2447
2448    /// Component C1 (G4): before a non-transactional mutation merges into main
2449    /// L0, if an outstanding snapshot pins the current generation, freeze it
2450    /// aside so snapshots taken *before* this write stay isolated from it.
2451    ///
2452    /// `flush_lock` (acquired and released here) serializes the freeze against
2453    /// concurrent commit-time freezes/merges, matching the atomicity the tx
2454    /// commit path gets. No-op for transactional writes (their freeze happens at
2455    /// commit) and — the common case — when nothing is pinned, where it costs one
2456    /// atomic load. Freezes at most once per pinned generation: the freeze
2457    /// installs a fresh unpinned `current`, so later writes in the same bulk
2458    /// import see no pin and merge in place, and the snapshot keeps reading the
2459    /// frozen pre-import buffer.
2460    async fn freeze_for_non_tx_write_if_pinned(&self, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) {
2461        // Self-gates on the runtime SSI toggle: nothing pins a snapshot unless a
2462        // transaction began under `ssi_enabled`, so `is_current_pinned()` is
2463        // always false (one atomic load) when SSI is off.
2464        if tx_l0.is_none() && self.l0_manager.is_current_pinned() {
2465            let _flush_lock_guard = self.flush_lock.lock().await;
2466            // Re-check under the lock: a concurrent commit may have frozen first.
2467            if self.l0_manager.is_current_pinned() {
2468                self.l0_manager.freeze_current_for_snapshot();
2469                metrics::counter!("uni_l0_snapshot_freezes_total").increment(1);
2470            }
2471        }
2472    }
2473
2474    #[instrument(skip(self, properties, labels), level = "trace")]
2475    pub async fn insert_vertex_with_labels(
2476        &self,
2477        vid: Vid,
2478        mut properties: Properties,
2479        labels: &[String],
2480        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2481    ) -> Result<Properties> {
2482        let start = std::time::Instant::now();
2483        self.check_write_pressure().await?;
2484        self.check_transaction_memory(tx_l0)?;
2485
2486        // Component C1 (G4): a non-transactional write (`tx_l0 == None`, e.g. bulk
2487        // import / LOAD CSV) mutates main L0 directly, outside the commit-time
2488        // snapshot freeze. Freeze the pinned generation aside first so snapshots
2489        // taken before this write stay isolated from it.
2490        self.freeze_for_non_tx_write_if_pinned(tx_l0).await;
2491
2492        if !self.try_defer_embedding(labels, &properties, vid, tx_l0) {
2493            self.process_embeddings_for_labels(labels, &mut properties)
2494                .await?;
2495        }
2496        self.validate_vertex_constraints(vid, &properties, labels, tx_l0)
2497            .await?;
2498        self.prepare_vertex_upsert(
2499            vid,
2500            &mut properties,
2501            labels.first().map(|s| s.as_str()),
2502            tx_l0,
2503        )
2504        .await?;
2505
2506        // Clone properties and labels before moving into L0 to return them and populate constraint index
2507        let properties_copy = properties.clone();
2508        let labels_copy = labels.to_vec();
2509
2510        {
2511            // For a non-tx write, re-resolve the live `current` buffer and hold
2512            // `flush_lock` across the (synchronous) write so a concurrent flush
2513            // rotate (which takes `flush_lock` via `begin_flush`) cannot install
2514            // a fresh `current` between resolve and write, dropping our write
2515            // (Bug #4). For a tx write `resolve_l0` returns the tx-private
2516            // buffer, never the rotating `current`, so no `flush_lock` is needed
2517            // (and taking it would risk re-entrancy with the commit path).
2518            let _flush_lock_guard = if tx_l0.is_none() {
2519                Some(self.flush_lock.lock().await)
2520            } else {
2521                None
2522            };
2523            let l0 = self.resolve_l0(tx_l0);
2524            let mut l0_guard = l0.write();
2525            // Generation chaining: a non-tx CRDT write into the (post-freeze,
2526            // possibly empty) current buffer must merge against the chained
2527            // committed state, not shadow it. No-op when the chain is empty
2528            // or this is a tx-private write.
2529            if tx_l0.is_none() {
2530                let pending = self.l0_manager.get_pending_flush();
2531                if !pending.is_empty() {
2532                    Self::seed_crdt_props(&pending, vid, &properties, &mut l0_guard);
2533                }
2534            }
2535            l0_guard.insert_vertex_with_labels(vid, properties, labels);
2536
2537            // Populate constraint index for O(1) duplicate detection
2538            let schema = self.schema_manager.schema();
2539            for label in &labels_copy {
2540                if schema.get_label_case_insensitive(label).is_none() {
2541                    if self.config.strict_schema {
2542                        return Err(anyhow::anyhow!(
2543                            "Label '{}' is not defined in the schema \
2544                             (strict_schema is enabled).",
2545                            label
2546                        ));
2547                    }
2548                    continue; // Schemaless: skip unknown labels.
2549                }
2550
2551                // For each unique constraint on this label, insert into constraint index
2552                for constraint in &schema.constraints {
2553                    if !constraint.enabled {
2554                        continue;
2555                    }
2556                    if let ConstraintTarget::Label(l) = &constraint.target {
2557                        if l != label {
2558                            continue;
2559                        }
2560                    } else {
2561                        continue;
2562                    }
2563
2564                    if let ConstraintType::Unique {
2565                        properties: unique_props,
2566                    } = &constraint.constraint_type
2567                    {
2568                        let mut key_values = Vec::new();
2569                        let mut all_present = true;
2570                        for prop in unique_props {
2571                            if let Some(val) = properties_copy.get(prop) {
2572                                key_values.push((prop.clone(), val.clone()));
2573                            } else {
2574                                all_present = false;
2575                                break;
2576                            }
2577                        }
2578
2579                        if all_present {
2580                            let key = serialize_constraint_key(label, &key_values);
2581                            l0_guard.insert_constraint_key(key, vid);
2582                        }
2583                    }
2584                }
2585            }
2586        }
2587
2588        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2589        self.update_metrics();
2590
2591        if tx_l0.is_none() {
2592            self.check_flush().await?;
2593        }
2594        if start.elapsed().as_millis() > 100 {
2595            log::warn!("Slow insert_vertex: {}ms", start.elapsed().as_millis());
2596        }
2597        Ok(properties_copy)
2598    }
2599
2600    /// True iff routing this partial write through MergeInsert would
2601    /// miss a constraint check. Specifically: a multi-key UNIQUE
2602    /// constraint where the touched-set doesn't cover all member keys
2603    /// requires the unchanged keys from the existing row to compute
2604    /// the composite. Conservative: also returns true if any touched
2605    /// key is `ext_id` (uniqueness checked globally — handled in the
2606    /// full-row path).
2607    fn touched_needs_full_read(&self, touched: &Properties, labels: &[String]) -> bool {
2608        if touched.contains_key("ext_id") {
2609            return true;
2610        }
2611        let schema = self.schema_manager.schema();
2612        for label in labels {
2613            if schema.get_label_case_insensitive(label).is_none() {
2614                continue;
2615            }
2616            for constraint in &schema.constraints {
2617                if !constraint.enabled {
2618                    continue;
2619                }
2620                if let ConstraintTarget::Label(l) = &constraint.target {
2621                    if !l.eq_ignore_ascii_case(label) {
2622                        continue;
2623                    }
2624                } else {
2625                    continue;
2626                }
2627                if let ConstraintType::Unique {
2628                    properties: unique_props,
2629                } = &constraint.constraint_type
2630                {
2631                    if unique_props.len() < 2 {
2632                        continue; // single-key UNIQUE — partial path sees the key
2633                    }
2634                    if unique_props.iter().any(|p| touched.contains_key(p)) {
2635                        return true;
2636                    }
2637                }
2638            }
2639        }
2640        false
2641    }
2642
2643    /// Insert a vertex's FULL property row plus a touched-keys hint so
2644    /// the flush emits ONLY those columns via Lance MergeInsert.
2645    ///
2646    /// Caller must have read the full row (via PropertyManager) and
2647    /// applied SET-touched values on top before calling — same input
2648    /// shape as `insert_vertex_with_labels`. The new arg `touched_keys`
2649    /// is the set of property keys this SET statement actually
2650    /// assigned; L0 records it in `vertex_partial_keys[vid]` and the
2651    /// flush filters the MergeInsert source schema down to those keys.
2652    /// When `UniConfig::partial_lance_writes == false`, falls through
2653    /// to `insert_vertex_with_labels` (Append) — preserving bit-for-bit
2654    /// equivalence with prior releases.
2655    #[instrument(skip(self, props, touched_keys, labels), level = "trace")]
2656    pub async fn insert_vertex_partial_full(
2657        &self,
2658        vid: Vid,
2659        mut props: Properties,
2660        touched_keys: HashSet<String>,
2661        labels: &[String],
2662        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2663    ) -> Result<()> {
2664        if !self.config.partial_lance_writes
2665            || self.touched_needs_full_read(&props_subset(&props, &touched_keys), labels)
2666        {
2667            self.insert_vertex_with_labels(vid, props, labels, tx_l0)
2668                .await?;
2669            return Ok(());
2670        }
2671
2672        self.check_write_pressure().await?;
2673        self.check_transaction_memory(tx_l0)?;
2674        if !self.try_defer_embedding(labels, &props, vid, tx_l0) {
2675            self.process_embeddings_for_labels(labels, &mut props)
2676                .await?;
2677        }
2678        // Full-row validation runs because we have the complete map;
2679        // no need for the partial-only validator.
2680        self.validate_vertex_constraints(vid, &props, labels, tx_l0)
2681            .await?;
2682        {
2683            let l0 = self.resolve_l0(tx_l0);
2684            let mut l0_guard = l0.write();
2685            l0_guard.insert_vertex_partial_full(vid, props, touched_keys, labels);
2686        }
2687        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2688        metrics::counter!("uni_partial_writes_total").increment(1);
2689        self.update_metrics();
2690        if tx_l0.is_none() {
2691            self.check_flush().await?;
2692        }
2693        Ok(())
2694    }
2695
2696    /// Insert a vertex's *partial* property set without first reading the
2697    /// full row.
2698    ///
2699    /// When `WriterConfig::partial_lance_writes` is `true`, the touched
2700    /// keys flow into `L0Buffer::vertex_partial_keys` so the next flush
2701    /// emits them via Lance `MergeInsertBuilder` against a subset-of-
2702    /// schema source — preserving untouched columns (e.g., embeddings)
2703    /// byte-equal in Lance with no read at the caller and no write of
2704    /// those columns.
2705    ///
2706    /// When the flag is `false`, this falls back to the existing
2707    /// `insert_vertex_with_labels` path after merging `touched` with
2708    /// the current properties from L0/storage. The caller can therefore
2709    /// use this entry point unconditionally; the optimization activates
2710    /// only when the flag is on.
2711    #[instrument(skip(self, touched, labels), level = "trace")]
2712    pub async fn insert_vertex_partial(
2713        &self,
2714        vid: Vid,
2715        touched: Properties,
2716        labels: &[String],
2717        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2718    ) -> Result<()> {
2719        let needs_full_read =
2720            !self.config.partial_lance_writes || self.touched_needs_full_read(&touched, labels);
2721        if needs_full_read {
2722            // Flag-off fallback (or constraint-driven fallback): merge
2723            // `touched` with the current full property snapshot from
2724            // L0/storage and route through the existing path. Preserves
2725            // bit-for-bit equivalence with the pre-Round-11 release.
2726            let existing = if let Some(pm) = &self.property_manager {
2727                pm.get_all_vertex_props_with_ctx(vid, None)
2728                    .await
2729                    .unwrap_or_default()
2730                    .unwrap_or_default()
2731            } else {
2732                Properties::new()
2733            };
2734            let mut merged = existing;
2735            for (k, v) in touched {
2736                merged.insert(k, v);
2737            }
2738            self.insert_vertex_with_labels(vid, merged, labels, tx_l0)
2739                .await?;
2740            return Ok(());
2741        }
2742
2743        // Flag-on fast path: stage the partial update directly. Pressure
2744        // checks, embedding generation, constraint validation all still
2745        // run — but the validator is the partial-aware variant that
2746        // skips NOT NULL / multi-key UNIQUE / CHECK / EXISTS for
2747        // properties not present in `touched`. Multi-key UNIQUE that
2748        // overlaps the touched set forces a fallback above via
2749        // `touched_needs_full_read`.
2750        let mut touched = touched;
2751        self.check_write_pressure().await?;
2752        self.check_transaction_memory(tx_l0)?;
2753        if !self.try_defer_embedding(labels, &touched, vid, tx_l0) {
2754            self.process_embeddings_for_labels(labels, &mut touched)
2755                .await?;
2756        }
2757        self.validate_vertex_constraints_partial(vid, &touched, labels, tx_l0)
2758            .await?;
2759
2760        {
2761            let l0 = self.resolve_l0(tx_l0);
2762            let mut l0_guard = l0.write();
2763            l0_guard.insert_vertex_partial(vid, touched, labels);
2764        }
2765
2766        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2767        metrics::counter!("uni_partial_writes_total").increment(1);
2768        self.update_metrics();
2769        if tx_l0.is_none() {
2770            self.check_flush().await?;
2771        }
2772        Ok(())
2773    }
2774
2775    /// Insert multiple vertices with batched operations.
2776    ///
2777    /// This method uses batched operations to achieve O(N) complexity instead of O(N²)
2778    /// for bulk inserts with unique constraints.
2779    ///
2780    /// # Performance Improvements
2781    /// - Batch VID allocation: 1 call instead of N calls
2782    /// - Batch constraint validation: O(N) instead of O(N²)
2783    /// - Batch embedding generation: 1 API call per config instead of N calls
2784    /// - Transaction wrapping: Automatic flush deferral, atomicity
2785    ///
2786    /// # Arguments
2787    /// * `vids` - Pre-allocated VIDs for the vertices
2788    /// * `properties_batch` - Properties for each vertex
2789    /// * `labels` - Labels for all vertices (assumes single label for simplicity)
2790    ///
2791    /// # Errors
2792    /// Returns error if:
2793    /// - VID/properties length mismatch
2794    /// - Constraint violation detected
2795    /// - Embedding generation fails
2796    /// - Transaction commit fails
2797    ///
2798    /// # Atomicity
2799    /// If this method fails, all changes are rolled back (if transaction was started here).
2800    pub async fn insert_vertices_batch(
2801        &self,
2802        vids: Vec<Vid>,
2803        mut properties_batch: Vec<Properties>,
2804        labels: Vec<String>,
2805        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2806    ) -> Result<Vec<Properties>> {
2807        let start = std::time::Instant::now();
2808
2809        // Validate inputs
2810        if vids.len() != properties_batch.len() {
2811            return Err(anyhow!(
2812                "VID/properties size mismatch: {} vids, {} properties",
2813                vids.len(),
2814                properties_batch.len()
2815            ));
2816        }
2817
2818        if vids.is_empty() {
2819            return Ok(Vec::new());
2820        }
2821
2822        // Batch operations — writes go directly to the resolved L0.
2823        // Atomicity is guaranteed by the caller holding the writer lock.
2824        let result = async {
2825            self.check_write_pressure().await?;
2826            self.check_transaction_memory(tx_l0)?;
2827
2828            // Component C1 (G4): batch bulk-import is the canonical non-tx write —
2829            // freeze the pinned generation aside before merging so snapshot
2830            // readers stay isolated. No-op when unpinned or transactional.
2831            self.freeze_for_non_tx_write_if_pinned(tx_l0).await;
2832
2833            // Batch embedding generation (1 API call per config)
2834            self.process_embeddings_for_batch(&labels, &mut properties_batch)
2835                .await?;
2836
2837            // Batch constraint validation (O(N) instead of O(N²))
2838            let label = labels
2839                .first()
2840                .ok_or_else(|| anyhow!("No labels provided"))?;
2841            self.validate_vertex_batch_constraints(&vids, &properties_batch, label, tx_l0)
2842                .await?;
2843
2844            // Batch prepare (CRDT merging if needed)
2845            // Check schema once: skip entirely if no CRDT properties for this label.
2846            // For new vertices (freshly allocated VIDs), there are no existing CRDT
2847            // values to merge, so the per-vertex lookup is unnecessary in that case.
2848            let has_crdt_fields = {
2849                let schema = self.schema_manager.schema();
2850                schema
2851                    .properties
2852                    .get(label.as_str())
2853                    .is_some_and(|props_meta| {
2854                        props_meta.values().any(|meta| {
2855                            matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2856                        })
2857                    })
2858            };
2859
2860            if has_crdt_fields {
2861                // Layer-1 variant enforcement (G3): the batch path must reject a
2862                // declared-CRDT variant mismatch exactly as the single-vertex
2863                // `prepare_vertex_upsert` does. Without this, a wrong-variant CRDT
2864                // written via batch import slips past write-time validation and
2865                // the OCC carve-out then masks the overwrite as a lost update.
2866                {
2867                    let schema = self.schema_manager.schema();
2868                    if let Some(props_meta) = schema.properties.get(label.as_str()) {
2869                        for props in &properties_batch {
2870                            Self::enforce_crdt_variants(props_meta, props)?;
2871                        }
2872                    }
2873                }
2874
2875                // Batch fetch existing CRDT values: collect VIDs that need merging,
2876                // then query once via PropertyManager instead of per-vertex lookups.
2877                let schema = self.schema_manager.schema();
2878                let crdt_keys: Vec<String> = schema
2879                    .properties
2880                    .get(label.as_str())
2881                    .map(|props_meta| {
2882                        props_meta
2883                            .iter()
2884                            .filter(|(_, meta)| {
2885                                matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2886                            })
2887                            .map(|(key, _)| key.clone())
2888                            .collect()
2889                    })
2890                    .unwrap_or_default();
2891
2892                if let Some(pm) = &self.property_manager {
2893                    let ctx = self.get_query_context(tx_l0).await;
2894                    for (vid, props) in vids.iter().zip(&mut properties_batch) {
2895                        for key in &crdt_keys {
2896                            if props.contains_key(key) {
2897                                let existing =
2898                                    pm.get_vertex_prop_with_ctx(*vid, key, ctx.as_ref()).await?;
2899                                if !existing.is_null()
2900                                    && let Some(val) = props.get_mut(key)
2901                                {
2902                                    *val = pm.merge_crdt_values(&existing, val)?;
2903                                }
2904                            }
2905                        }
2906                    }
2907                }
2908            }
2909
2910            // Batch L0 writes — route to active L0 (transaction L0 if active, else current).
2911            let target_l0 = self.resolve_l0(tx_l0);
2912
2913            let properties_result = properties_batch.clone();
2914            {
2915                let mut l0_guard = target_l0.write();
2916                for (vid, props) in vids.iter().zip(properties_batch.iter()) {
2917                    l0_guard.insert_vertex_with_labels(*vid, props.clone(), &labels);
2918                }
2919            }
2920
2921            // Update metrics (batch increment)
2922            metrics::counter!("uni_l0_buffer_mutations_total").increment(vids.len() as u64);
2923            self.update_metrics();
2924
2925            Ok::<Vec<Properties>, anyhow::Error>(properties_result)
2926        }
2927        .await;
2928
2929        let props = result?;
2930
2931        if start.elapsed().as_millis() > 100 {
2932            log::warn!(
2933                "Slow insert_vertices_batch ({} vertices): {}ms",
2934                vids.len(),
2935                start.elapsed().as_millis()
2936            );
2937        }
2938
2939        Ok(props)
2940    }
2941
2942    /// Delete a vertex by VID.
2943    ///
2944    /// When `labels` is provided, uses them directly to populate L0 for
2945    /// correct tombstone flushing. Otherwise discovers labels from L0
2946    /// buffers and storage (which can be slow for many vertices).
2947    ///
2948    /// # Errors
2949    ///
2950    /// Returns an error if write pressure stalls, label lookup fails, or
2951    /// the L0 delete operation fails.
2952    #[instrument(skip(self, labels), level = "trace")]
2953    pub async fn delete_vertex(
2954        &self,
2955        vid: Vid,
2956        labels: Option<Vec<String>>,
2957        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2958    ) -> Result<()> {
2959        let start = std::time::Instant::now();
2960        self.check_write_pressure().await?;
2961        self.check_transaction_memory(tx_l0)?;
2962        self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
2963
2964        // Before deleting, ensure we have the vertex's labels stored in L0 so
2965        // the tombstone can be flushed to the correct label datasets. Discover
2966        // them up front (this may await storage) WITHOUT pinning the buffer we
2967        // will eventually mutate — for non-tx writes the live `current` buffer
2968        // is re-resolved below under `flush_lock`, so a concurrent rotate can't
2969        // drop our write (Bug #4). `resolve_l0` here is only used for cheap
2970        // reads that tolerate a racing rotate.
2971        let has_labels = {
2972            let l0_guard = self.resolve_l0(tx_l0);
2973            let guard = l0_guard.read();
2974            guard.vertex_labels.contains_key(&vid)
2975        };
2976
2977        let backfill_labels = if has_labels {
2978            None
2979        } else if let Some(provided) = labels {
2980            // Caller provided labels — skip the lookup entirely
2981            Some(provided)
2982        } else {
2983            // Discover labels from pending flush L0s, then storage
2984            let mut found = None;
2985            for pending_l0 in self.l0_manager.get_pending_flush() {
2986                let pending_guard = pending_l0.read();
2987                if let Some(l) = pending_guard.get_vertex_labels(vid) {
2988                    found = Some(l.to_vec());
2989                    break;
2990                }
2991            }
2992            if found.is_none() {
2993                found = self.find_vertex_labels_in_storage(vid).await?;
2994            }
2995            found
2996        };
2997
2998        // Test-only seam (no-op without the `failpoints` feature): pause a
2999        // non-transactional delete AFTER the awaited label discovery but BEFORE
3000        // it re-resolves the live buffer and writes the tombstone. A concurrent
3001        // flush can rotate+complete a buffer in this window; the fix re-resolves
3002        // `get_current()` and mutates it under `flush_lock`, so the tombstone
3003        // always lands in the live buffer (Bug #4 — silent lost delete across
3004        // L0 rotation).
3005        fail::fail_point!("nontx::after-capture");
3006
3007        // Apply the label backfill and the tombstone together. For a non-tx
3008        // write, hold `flush_lock` across the (synchronous) re-resolve + write
3009        // so a concurrent flush rotate (which takes `flush_lock` via
3010        // `begin_flush`) cannot install a fresh `current` between our resolve
3011        // and our write. For a tx write `resolve_l0` returns the tx-private
3012        // buffer (never the rotating `current`), so no `flush_lock` is needed
3013        // — and taking it there would risk re-entrancy with the commit path.
3014        if tx_l0.is_none() {
3015            let _flush_lock_guard = self.flush_lock.lock().await;
3016            let l0 = self.l0_manager.get_current();
3017            let mut guard = l0.write();
3018            if let Some(found_labels) = backfill_labels {
3019                guard.vertex_labels.insert(vid, found_labels);
3020            }
3021            guard.delete_vertex(vid)?;
3022        } else {
3023            let l0 = self.resolve_l0(tx_l0);
3024            let mut guard = l0.write();
3025            if let Some(found_labels) = backfill_labels {
3026                guard.vertex_labels.insert(vid, found_labels);
3027            }
3028            guard.delete_vertex(vid)?;
3029        }
3030        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
3031        self.update_metrics();
3032
3033        if tx_l0.is_none() {
3034            self.check_flush().await?;
3035        }
3036        if start.elapsed().as_millis() > 100 {
3037            log::warn!("Slow delete_vertex: {}ms", start.elapsed().as_millis());
3038        }
3039        Ok(())
3040    }
3041
3042    /// Find vertex labels from storage by querying the main vertices table.
3043    /// Returns the labels from the latest non-deleted version of the vertex.
3044    async fn find_vertex_labels_in_storage(&self, vid: Vid) -> Result<Option<Vec<String>>> {
3045        use crate::backend::types::ScanRequest;
3046        use arrow_array::Array;
3047        use arrow_array::cast::AsArray;
3048
3049        let backend = self.storage.backend();
3050        let table_name = MainVertexDataset::table_name();
3051
3052        // Check if table exists first; if not, vertex hasn't been flushed to storage yet
3053        if !backend.table_exists(table_name).await? {
3054            return Ok(None);
3055        }
3056
3057        // Query for this specific vid (don't filter by _deleted yet - we need to find the latest version first)
3058        let filter = format!("_vid = {}", vid.as_u64());
3059        let batches = backend
3060            .scan(
3061                ScanRequest::all(table_name)
3062                    .with_filter(filter)
3063                    .with_columns(vec![
3064                        "_vid".to_string(),
3065                        "labels".to_string(),
3066                        "_version".to_string(),
3067                        "_deleted".to_string(),
3068                    ]),
3069            )
3070            .await
3071            .unwrap_or_default();
3072
3073        // Find the row with the highest version number
3074        let mut max_version: Option<u64> = None;
3075        let mut labels: Option<Vec<String>> = None;
3076        let mut is_deleted = false;
3077
3078        for batch in batches {
3079            if batch.num_rows() == 0 {
3080                continue;
3081            }
3082
3083            let version_array = batch
3084                .column_by_name("_version")
3085                .unwrap()
3086                .as_primitive::<arrow_array::types::UInt64Type>();
3087
3088            let deleted_array = batch.column_by_name("_deleted").unwrap().as_boolean();
3089
3090            let labels_array = batch.column_by_name("labels").unwrap().as_list::<i32>();
3091
3092            for row_idx in 0..batch.num_rows() {
3093                let version = version_array.value(row_idx);
3094
3095                if max_version.is_none_or(|mv| version > mv) {
3096                    is_deleted = deleted_array.value(row_idx);
3097
3098                    let labels_list = labels_array.value(row_idx);
3099                    let string_array = labels_list.as_string::<i32>();
3100                    let vertex_labels: Vec<String> = (0..string_array.len())
3101                        .filter(|&i| !string_array.is_null(i))
3102                        .map(|i| string_array.value(i).to_string())
3103                        .collect();
3104
3105                    max_version = Some(version);
3106                    labels = Some(vertex_labels);
3107                }
3108            }
3109        }
3110
3111        // If the latest version is deleted, return None
3112        if is_deleted { Ok(None) } else { Ok(labels) }
3113    }
3114
3115    #[expect(clippy::too_many_arguments)]
3116    #[instrument(skip(self, props, touched_keys), level = "trace")]
3117    pub async fn insert_edge_partial_full(
3118        &self,
3119        src_vid: Vid,
3120        dst_vid: Vid,
3121        edge_type: u32,
3122        eid: Eid,
3123        props: Properties,
3124        edge_type_name: Option<String>,
3125        touched_keys: HashSet<String>,
3126        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
3127    ) -> Result<()> {
3128        self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
3129        if !self.config.partial_lance_writes {
3130            return self
3131                .insert_edge(
3132                    src_vid,
3133                    dst_vid,
3134                    edge_type,
3135                    eid,
3136                    props,
3137                    edge_type_name,
3138                    tx_l0,
3139                )
3140                .await;
3141        }
3142
3143        let start = std::time::Instant::now();
3144        self.check_write_pressure().await?;
3145        self.check_transaction_memory(tx_l0)?;
3146        let mut props = props;
3147        self.prepare_edge_upsert(eid, &mut props, tx_l0).await?;
3148
3149        let l0 = self.resolve_l0(tx_l0);
3150        l0.write().insert_edge_partial_full(
3151            src_vid,
3152            dst_vid,
3153            edge_type,
3154            eid,
3155            props,
3156            edge_type_name,
3157            touched_keys,
3158        )?;
3159
3160        if tx_l0.is_none() {
3161            let version = l0.read().current_version;
3162            self.adjacency_manager
3163                .insert_edge(src_vid, dst_vid, eid, edge_type, version);
3164        }
3165
3166        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
3167        metrics::counter!("uni_partial_writes_total").increment(1);
3168        self.update_metrics();
3169        if tx_l0.is_none() {
3170            self.check_flush().await?;
3171        }
3172        if start.elapsed().as_millis() > 100 {
3173            log::warn!(
3174                "Slow insert_edge_partial_full: {}ms",
3175                start.elapsed().as_millis()
3176            );
3177        }
3178        Ok(())
3179    }
3180
3181    #[expect(clippy::too_many_arguments)]
3182    pub async fn insert_edge(
3183        &self,
3184        src_vid: Vid,
3185        dst_vid: Vid,
3186        edge_type: u32,
3187        eid: Eid,
3188        mut properties: Properties,
3189        edge_type_name: Option<String>,
3190        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
3191    ) -> Result<()> {
3192        let start = std::time::Instant::now();
3193        self.check_write_pressure().await?;
3194        self.check_transaction_memory(tx_l0)?;
3195        self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
3196        self.prepare_edge_upsert(eid, &mut properties, tx_l0)
3197            .await?;
3198
3199        let l0 = self.resolve_l0(tx_l0);
3200        l0.write()
3201            .insert_edge(src_vid, dst_vid, edge_type, eid, properties, edge_type_name)?;
3202
3203        // Dual-write to AdjacencyManager overlay (survives flush).
3204        // Skip for transaction-local L0 -- transaction edges are overlaid separately.
3205        if tx_l0.is_none() {
3206            let version = l0.read().current_version;
3207            self.adjacency_manager
3208                .insert_edge(src_vid, dst_vid, eid, edge_type, version);
3209        }
3210
3211        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
3212        self.update_metrics();
3213
3214        if tx_l0.is_none() {
3215            self.check_flush().await?;
3216        }
3217        if start.elapsed().as_millis() > 100 {
3218            log::warn!("Slow insert_edge: {}ms", start.elapsed().as_millis());
3219        }
3220        Ok(())
3221    }
3222
3223    #[instrument(skip(self), level = "trace")]
3224    pub async fn delete_edge(
3225        &self,
3226        eid: Eid,
3227        src_vid: Vid,
3228        dst_vid: Vid,
3229        edge_type: u32,
3230        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
3231    ) -> Result<()> {
3232        let start = std::time::Instant::now();
3233        self.check_write_pressure().await?;
3234        self.check_transaction_memory(tx_l0)?;
3235        self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
3236        let l0 = self.resolve_l0(tx_l0);
3237
3238        l0.write().delete_edge(eid, src_vid, dst_vid, edge_type)?;
3239
3240        // Dual-write tombstone to AdjacencyManager overlay.
3241        if tx_l0.is_none() {
3242            let version = l0.read().current_version;
3243            self.adjacency_manager
3244                .add_tombstone(eid, src_vid, dst_vid, edge_type, version);
3245        }
3246        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
3247        self.update_metrics();
3248
3249        if tx_l0.is_none() {
3250            self.check_flush().await?;
3251        }
3252        if start.elapsed().as_millis() > 100 {
3253            log::warn!("Slow delete_edge: {}ms", start.elapsed().as_millis());
3254        }
3255        Ok(())
3256    }
3257
3258    /// Decide whether a flush should be triggered based on mutation count
3259    /// or elapsed time since the last flush.
3260    ///
3261    /// Extracted from [`Writer::check_flush`] so `commit_transaction_l0` can
3262    /// reuse the decision while bypassing the lock-acquiring entry point
3263    /// (it already holds `flush_lock`).
3264    fn should_flush(&self) -> bool {
3265        let count = self.l0_manager.get_current().read().mutation_count;
3266        if count == 0 {
3267            return false;
3268        }
3269        if count >= self.config.auto_flush_threshold {
3270            return true;
3271        }
3272        if let Some(interval) = self.config.auto_flush_interval
3273            && self.last_flush_time.lock().elapsed() >= interval
3274            && count >= self.config.auto_flush_min_mutations
3275        {
3276            return true;
3277        }
3278        false
3279    }
3280
3281    /// Check if flush should be triggered based on mutation count or time elapsed.
3282    /// This method is called after each write operation and can also be called
3283    /// by a background task for time-based flushing.
3284    pub async fn check_flush(&self) -> Result<()> {
3285        if self.should_flush() {
3286            self.flush_to_l1(None).await?;
3287        }
3288        Ok(())
3289    }
3290
3291    /// Process embeddings for a vertex using labels passed directly.
3292    /// Use this when labels haven't been stored to L0 yet.
3293    async fn process_embeddings_for_labels(
3294        &self,
3295        labels: &[String],
3296        properties: &mut Properties,
3297    ) -> Result<()> {
3298        let label_name = labels.first().map(|s| s.as_str());
3299        self.process_embeddings_impl(label_name, properties).await
3300    }
3301
3302    /// Phase B: if `defer_embeddings` is enabled in `UniConfig` and the
3303    /// vertex has an embedding config that hasn't been satisfied by the
3304    /// caller-provided properties, enqueue the VID in
3305    /// `L0Buffer::pending_embeddings` and return `true`. The caller then
3306    /// skips `process_embeddings_for_labels` and the embedding is computed
3307    /// in a single batched call at flush time via
3308    /// `drain_pending_embeddings`.
3309    ///
3310    /// Returns `false` (caller falls back to today's per-row eager embed)
3311    /// if any of:
3312    ///  - the flag is off,
3313    ///  - no label has an embedding config,
3314    ///  - the user already provided the target property (matches the
3315    ///    existing skip-if-present semantics at writer.rs:2727).
3316    ///
3317    /// Trade-off: when deferral is active, in-tx reads of the embedding
3318    /// column return only what was already in storage (or nothing for
3319    /// brand-new vertices). Existing tests that RETURN n.embedding in
3320    /// the same tx as a SET on the source column must run with the flag
3321    /// off; opt in only when no such reads happen between write and
3322    /// commit.
3323    fn try_defer_embedding(
3324        &self,
3325        labels: &[String],
3326        properties: &Properties,
3327        vid: Vid,
3328        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
3329    ) -> bool {
3330        if !self.config.defer_embeddings {
3331            return false;
3332        }
3333        let Some(label) = labels.first() else {
3334            return false;
3335        };
3336
3337        let schema = self.schema_manager.schema();
3338        let mut has_unsatisfied_cfg = false;
3339        for idx in &schema.indexes {
3340            if let IndexDefinition::Vector(v_cfg) = idx
3341                && v_cfg.label == *label
3342                && v_cfg.embedding_config.is_some()
3343                && !properties.contains_key(&v_cfg.property)
3344            {
3345                has_unsatisfied_cfg = true;
3346                break;
3347            }
3348        }
3349        if !has_unsatisfied_cfg {
3350            return false;
3351        }
3352
3353        let l0 = self.resolve_l0(tx_l0);
3354        let mut guard = l0.write();
3355        guard.pending_embeddings.insert(vid, label.clone());
3356        true
3357    }
3358
3359    /// Drain `pending_embeddings` from the rotated old-L0 right before
3360    /// `flush_stream_l1` reads it. Groups by label, issues one batched
3361    /// `process_embeddings_for_batch` call per label, and writes the
3362    /// resulting embedding vectors into each VID's `vertex_properties`
3363    /// map. After this returns, the flush proceeds against an L0 that
3364    /// looks no different from one whose embeddings were generated
3365    /// per-row at insert.
3366    ///
3367    /// Idempotent: a VID whose embedding was already materialized
3368    /// (e.g., by on-demand read paths in a future Phase B revision) is
3369    /// detected via `properties.contains_key(target_prop)` inside
3370    /// `process_embeddings_for_batch` (writer.rs:~2650), so re-running
3371    /// the drain is safe.
3372    async fn drain_pending_embeddings(&self, old_l0_arc: &Arc<RwLock<L0Buffer>>) -> Result<()> {
3373        let by_label: HashMap<String, Vec<Vid>> = {
3374            let guard = old_l0_arc.read();
3375            if guard.pending_embeddings.is_empty() {
3376                return Ok(());
3377            }
3378            let mut m: HashMap<String, Vec<Vid>> = HashMap::new();
3379            for (vid, label) in &guard.pending_embeddings {
3380                m.entry(label.clone()).or_default().push(*vid);
3381            }
3382            m
3383        };
3384
3385        for (label, vids) in by_label {
3386            let mut properties_batch: Vec<Properties> = {
3387                let guard = old_l0_arc.read();
3388                vids.iter()
3389                    .map(|vid| {
3390                        guard
3391                            .vertex_properties
3392                            .get(vid)
3393                            .cloned()
3394                            .unwrap_or_default()
3395                    })
3396                    .collect()
3397            };
3398
3399            self.process_embeddings_for_batch(std::slice::from_ref(&label), &mut properties_batch)
3400                .await?;
3401
3402            let mut guard = old_l0_arc.write();
3403            for (vid, props) in vids.iter().zip(properties_batch) {
3404                let target = guard.vertex_properties.entry(*vid).or_default();
3405                for (k, v) in props {
3406                    target.insert(k, v);
3407                }
3408                guard.pending_embeddings.remove(vid);
3409            }
3410        }
3411        Ok(())
3412    }
3413
3414    /// Materialise MUVERA FDE columns for the about-to-flush L0. Mirrors
3415    /// [`Self::drain_pending_embeddings`]: for each MUVERA index, compute the derived
3416    /// Fixed-Dimensional Encoding from each row's source multi-vector and inject it into
3417    /// that row's `vertex_properties` (so the normal column builder writes the
3418    /// `__fde_*` column with no hot-path change). For partial-write rows that touched the
3419    /// source column, the derived column is added to `vertex_partial_keys` so the partial
3420    /// MergeInsert batch carries the recomputed FDE (avoids staleness on `SET`).
3421    ///
3422    /// No-op when the schema has no MUVERA index. Unlike auto-embed, the FDE is a pure,
3423    /// deterministic, in-process transform — no runtime/embedding service needed.
3424    fn materialize_fde_columns(&self, old_l0_arc: &Arc<RwLock<L0Buffer>>) -> Result<()> {
3425        let schema = self.schema_manager.schema();
3426        let specs = crate::storage::muvera_index::fde_specs(&schema);
3427        if specs.is_empty() {
3428            return Ok(());
3429        }
3430        let mut guard = old_l0_arc.write();
3431        for spec in &specs {
3432            let encoder = uni_common::muvera::FdeEncoder::new(&spec.params)
3433                .map_err(|e| anyhow!("MUVERA index '{}': {e}", spec.index_name))?;
3434            // VIDs of this label currently in L0 (collect first to avoid a borrow
3435            // conflict with the per-row mutation below).
3436            let vids: Vec<Vid> = guard
3437                .vertex_labels
3438                .iter()
3439                .filter(|(_, labels)| labels.contains(&spec.label))
3440                .map(|(vid, _)| *vid)
3441                .collect();
3442            for vid in vids {
3443                // Decode the source multi-vector tokens (borrow ends before the mutation).
3444                let tokens = match guard
3445                    .vertex_properties
3446                    .get(&vid)
3447                    .and_then(|p| p.get(&spec.source_prop))
3448                {
3449                    Some(v) => crate::storage::muvera_index::value_to_multivec(v),
3450                    None => continue, // source absent → leave the FDE column NULL
3451                };
3452                let fde = encoder.encode_doc(&tokens).map_err(|e| {
3453                    anyhow!("MUVERA index '{}' vid {:?}: {e}", spec.index_name, vid)
3454                })?;
3455                if let Some(props) = guard.vertex_properties.get_mut(&vid) {
3456                    props.insert(spec.derived_col.clone(), Value::Vector(fde));
3457                }
3458                if let Some(touched) = guard.vertex_partial_keys.get_mut(&vid)
3459                    && touched.contains(&spec.source_prop)
3460                {
3461                    touched.insert(spec.derived_col.clone());
3462                }
3463            }
3464        }
3465        Ok(())
3466    }
3467
3468    /// Process embeddings for a batch of vertices efficiently.
3469    ///
3470    /// Groups vertices by embedding config and makes batched API calls to the
3471    /// embedding service instead of calling once per vertex.
3472    ///
3473    /// # Performance
3474    /// For N vertices with embedding config:
3475    /// - Old approach: N API calls to embedding service
3476    /// - New approach: 1 API call per embedding config (usually 1 total)
3477    async fn process_embeddings_for_batch(
3478        &self,
3479        labels: &[String],
3480        properties_batch: &mut [Properties],
3481    ) -> Result<()> {
3482        let Some(label) = labels.first().map(|s| s.as_str()) else {
3483            return Ok(());
3484        };
3485        let schema = self.schema_manager.schema();
3486
3487        // Group auto-embed targets by (alias, source). A group with both a dense Vector and a
3488        // multi-vector List<Vector> column is a single-pass hybrid source: one inference fills
3489        // both. Non-mixed groups use the dense / multi-vector embedder as before.
3490        let groups = collect_embed_groups(&schema, label);
3491        if groups.is_empty() {
3492            return Ok(());
3493        }
3494
3495        for (key, group) in groups {
3496            let alias = &key.0;
3497            let want_dense = !group.dense.is_empty();
3498            let want_multi = !group.multi.is_empty();
3499
3500            // A row needs this group's inference if it has the source text and is still missing
3501            // at least one of the group's target columns (user-supplied values are preserved).
3502            let mut input_texts: Vec<String> = Vec::new();
3503            let mut needs: Vec<usize> = Vec::new();
3504            for (idx, properties) in properties_batch.iter().enumerate() {
3505                let all_present = group
3506                    .dense
3507                    .iter()
3508                    .chain(group.multi.iter())
3509                    .all(|t| properties.contains_key(t));
3510                if all_present {
3511                    continue;
3512                }
3513                let mut inputs = Vec::new();
3514                for src in &group.source_properties {
3515                    if let Some(val) = properties.get(src)
3516                        && let Some(s) = val.as_str()
3517                    {
3518                        inputs.push(s.to_string());
3519                    }
3520                }
3521                if inputs.is_empty() {
3522                    continue;
3523                }
3524                let text = inputs.join(" ");
3525                let text = match &group.document_prefix {
3526                    Some(prefix) => format!("{prefix}{text}"),
3527                    None => text,
3528                };
3529                input_texts.push(text);
3530                needs.push(idx);
3531            }
3532            if input_texts.is_empty() {
3533                continue;
3534            }
3535
3536            let runtime = self
3537                .xervo_runtime
3538                .get()
3539                .ok_or_else(|| anyhow!("Uni-Xervo runtime not configured for auto-embedding"))?;
3540            let input_refs: Vec<&str> = input_texts.iter().map(|s| s.as_str()).collect();
3541            let (dense, multi) =
3542                embed_group(runtime, alias, &input_refs, want_dense, want_multi).await?;
3543
3544            for (i, &row) in needs.iter().enumerate() {
3545                if let Some(vec) = dense.as_ref().and_then(|d| d.get(i)) {
3546                    let vals: Vec<Value> = vec.iter().map(|f| Value::Float(*f as f64)).collect();
3547                    for t in &group.dense {
3548                        if !properties_batch[row].contains_key(t) {
3549                            properties_batch[row].insert(t.clone(), Value::List(vals.clone()));
3550                        }
3551                    }
3552                }
3553                if let Some(tokens) = multi.as_ref().and_then(|m| m.get(i)) {
3554                    let mv = multivec_to_value(tokens);
3555                    for t in &group.multi {
3556                        if !properties_batch[row].contains_key(t) {
3557                            properties_batch[row].insert(t.clone(), mv.clone());
3558                        }
3559                    }
3560                }
3561            }
3562        }
3563
3564        Ok(())
3565    }
3566
3567    async fn process_embeddings_impl(
3568        &self,
3569        label_name: Option<&str>,
3570        properties: &mut Properties,
3571    ) -> Result<()> {
3572        let schema = self.schema_manager.schema();
3573
3574        let Some(label) = label_name else {
3575            return Ok(());
3576        };
3577
3578        // Same (alias, source) grouping as the deferred path: a mixed dense + multi-vector
3579        // group is a single-pass hybrid source (one inference fills both columns).
3580        let groups = collect_embed_groups(&schema, label);
3581        if groups.is_empty() {
3582            log::info!("No embedding config found for label {}", label);
3583            return Ok(());
3584        }
3585
3586        for (key, group) in groups {
3587            let alias = &key.0;
3588            // Skip if every target already present (user-supplied values win).
3589            if group
3590                .dense
3591                .iter()
3592                .chain(group.multi.iter())
3593                .all(|t| properties.contains_key(t))
3594            {
3595                continue;
3596            }
3597
3598            let mut inputs = Vec::new();
3599            for src in &group.source_properties {
3600                if let Some(val) = properties.get(src)
3601                    && let Some(s) = val.as_str()
3602                {
3603                    inputs.push(s.to_string());
3604                }
3605            }
3606            if inputs.is_empty() {
3607                continue;
3608            }
3609            let text = inputs.join(" ");
3610            let text = match &group.document_prefix {
3611                Some(prefix) => format!("{prefix}{text}"),
3612                None => text,
3613            };
3614
3615            let runtime = self
3616                .xervo_runtime
3617                .get()
3618                .ok_or_else(|| anyhow!("Uni-Xervo runtime not configured for auto-embedding"))?;
3619            let want_dense = !group.dense.is_empty();
3620            let want_multi = !group.multi.is_empty();
3621            let (dense, multi) =
3622                embed_group(runtime, alias, &[text.as_str()], want_dense, want_multi).await?;
3623
3624            if let Some(vec) = dense.as_ref().and_then(|d| d.first()) {
3625                let vals: Vec<Value> = vec.iter().map(|f| Value::Float(*f as f64)).collect();
3626                for t in &group.dense {
3627                    if !properties.contains_key(t) {
3628                        properties.insert(t.clone(), Value::List(vals.clone()));
3629                    }
3630                }
3631            }
3632            if let Some(tokens) = multi.as_ref().and_then(|m| m.first()) {
3633                let mv = multivec_to_value(tokens);
3634                for t in &group.multi {
3635                    if !properties.contains_key(t) {
3636                        properties.insert(t.clone(), mv.clone());
3637                    }
3638                }
3639            }
3640        }
3641        Ok(())
3642    }
3643
3644    /// Flushes the current in-memory L0 buffer to L1 storage.
3645    ///
3646    /// # Lock Ordering
3647    ///
3648    /// To prevent deadlocks, locks must be acquired in the following order:
3649    /// 1. `Writer` lock (held by caller via outer `Arc<RwLock<Writer>>`; removed in Phase 4)
3650    /// 2. `flush_lock` (acquired by this entry point; held across the whole flush)
3651    /// 3. `L0Manager` lock (via `begin_flush` / `get_current`)
3652    /// 4. `L0Buffer` lock (individual buffer RWLocks)
3653    /// 5. `Index` / `Storage` locks (during actual flush)
3654    ///
3655    /// Callers that already hold `flush_lock` (today only `commit_transaction_l0`)
3656    /// must call `flush_inline_under_lock` (private) directly to avoid a re-entrant
3657    /// `tokio::sync::Mutex` deadlock — see concurrent_writer.md §5.5.
3658    pub async fn flush_to_l1(&self, name: Option<String>) -> Result<String> {
3659        // Drain any in-flight async flushes first. `flush_to_l1` is a
3660        // SYNCHRONIZATION BARRIER — callers (test fixtures, fork
3661        // setup, shutdown paths) rely on it as "all writes are now
3662        // durably in Lance". Without the drain, an async stream from
3663        // a recent commit might still be writing to Lance when
3664        // `flush_to_l1` returns, leaving a window where forks branch
3665        // off pre-write Lance state and lose data.
3666        if let Some(coord) = self.flush_coordinator.as_ref() {
3667            let _ = coord.drain(self.config.drop_fork_drain_timeout).await;
3668        }
3669        let _flush_lock_guard = self.flush_lock.lock().await;
3670        self.flush_inline_under_lock(name).await
3671    }
3672
3673    /// Flush L0→L1 and capture the fork point under one held `flush_lock`.
3674    ///
3675    /// Drains in-flight async flushes, takes `flush_lock`, runs the inline
3676    /// flush, and then — still holding the lock — reads the allocator
3677    /// high-water marks and each existing candidate dataset's Lance
3678    /// version. Capturing under the held lock is what makes the fork point
3679    /// atomic: no concurrent commit can advance the allocator and no
3680    /// concurrent flush can advance a dataset tip between the flush and the
3681    /// reads. See [`ForkPoint`].
3682    ///
3683    /// `candidate_dataset_names` are resolved to `{base_uri}/{name}.lance`;
3684    /// names with no `.lance` directory on disk are skipped (returned map
3685    /// has no entry for them), matching the fork branch loop's existence
3686    /// check.
3687    ///
3688    /// # Errors
3689    /// Propagates flush failures from `flush_inline_under_lock` and any
3690    /// per-dataset version read failure from `lance_branch::current_version`.
3691    ///
3692    /// # Deadlocks
3693    /// Must not be called by a task already holding `flush_lock` (e.g.
3694    /// `commit_transaction_l0`); the `tokio::sync::Mutex` is not reentrant.
3695    /// Fork creation never holds the lock, so the sole call site is safe.
3696    pub async fn flush_and_capture_fork_point(
3697        &self,
3698        candidate_dataset_names: &[String],
3699    ) -> Result<ForkPoint> {
3700        if let Some(coord) = self.flush_coordinator.as_ref() {
3701            let _ = coord.drain(self.config.drop_fork_drain_timeout).await;
3702        }
3703        let _flush_lock_guard = self.flush_lock.lock().await;
3704        self.flush_inline_under_lock(None).await?;
3705
3706        // Still under `flush_lock`: capture the allocator HWM, the MVCC
3707        // version HWM, and every existing dataset's Lance version so
3708        // nothing can interleave.
3709        let (vid_hwm, eid_hwm) = self.allocator.current_hwm().await;
3710        // The parent's current L0 version is the largest `_version` any
3711        // inherited row can carry (flushed or in-memory). A fork bootstraps
3712        // its version floor to this so a fork tx read still sees inherited
3713        // rows. Cheap read lock; no buffer clone.
3714        let version_hwm = self.l0_manager.get_current().read().current_version;
3715
3716        let base = self.storage.base_uri();
3717        let mut dataset_versions = BTreeMap::new();
3718        for name in candidate_dataset_names {
3719            let uri = join_lance_uri(base, name);
3720            if !lance_path_exists(&uri) {
3721                continue;
3722            }
3723            let version = crate::backend::lance_branch::current_version(&uri).await?;
3724            dataset_versions.insert(name.clone(), version);
3725        }
3726
3727        Ok(ForkPoint {
3728            vid_hwm,
3729            eid_hwm,
3730            dataset_versions,
3731            version_hwm,
3732        })
3733    }
3734
3735    /// Async-flush entry point: rotate under `flush_lock`, release the
3736    /// lock, then submit the stream phase to the [`FlushCoordinator`].
3737    /// Returns a [`FlushTicket`](crate::runtime::flush_coordinator::FlushTicket)
3738    /// that resolves when finalize completes.
3739    ///
3740    /// Errors if `config.async_flush_enabled = false` (the coordinator
3741    /// is `None` in that case — see `flush_coordinator` field doc).
3742    pub async fn flush_to_l1_async(
3743        self: &Arc<Self>,
3744        name: Option<String>,
3745    ) -> Result<crate::runtime::flush_coordinator::FlushTicket> {
3746        let coord = self
3747            .flush_coordinator
3748            .as_ref()
3749            .ok_or_else(|| anyhow!("async flush not enabled (config.async_flush_enabled=false)"))?
3750            .clone();
3751        // 1. Acquire permit FIRST (outside flush_lock) so we don't
3752        //    introduce a permit-while-holding-flush-lock convoy.
3753        let permit = coord.acquire_permit().await?;
3754        // 2. Rotate under flush_lock (µs work), then allocate the rotate seq
3755        //    and bump pending ONLY after the rotate succeeds (Bug #3). A failed
3756        //    rotate (the `?` below) must consume neither: the finalizer
3757        //    advances in strictly consecutive seq order and only decrements
3758        //    pending on finalize, so a leaked seq/pending would wedge it
3759        //    forever. The seq is allocated under `flush_lock`, immediately
3760        //    after the rotate and before the guard drops, so concurrent rotates
3761        //    keep seq order == rotation order, and the seq is unused until
3762        //    submit. On the `?` error path the permit drops, freeing the slot.
3763        let (
3764            RotateOutput {
3765                old_l0_arc,
3766                wal_lsn,
3767                current_version,
3768                flush_in_progress_guard,
3769            },
3770            seq,
3771        ) = {
3772            let _flush_lock_guard = self.flush_lock.lock().await;
3773            let rotate_out = self.flush_l0_rotate().await?;
3774            let seq = coord.next_rotate_seq();
3775            coord.note_pending();
3776            (rotate_out, seq)
3777        };
3778        // 3. Build the coordinator's RotatedFlush. parent_manifest is the
3779        //    cached_manifest snapshot at this moment.
3780        let parent_manifest = self.cached_manifest.lock().clone();
3781        let rotated = RotatedFlush {
3782            seq,
3783            old_l0_arc: old_l0_arc.clone(),
3784            wal_lsn,
3785            current_version,
3786            name: name.clone(),
3787            parent_manifest,
3788            permit,
3789            flush_in_progress_guard,
3790        };
3791        // 4. Spawn the stream phase via the coordinator. The closure
3792        //    captures Arc<Writer> transiently — drops when stream
3793        //    completes (bounded, ~50-500 ms).
3794        let writer = self.clone();
3795        let ticket = coord.submit_for_stream(rotated, move |old_l0, wal, ver, n| async move {
3796            let outcome = writer.flush_stream_l1(old_l0, wal, ver, n).await?;
3797            Ok(crate::runtime::flush_coordinator::FlushOutcome {
3798                new_manifest: outcome.manifest,
3799                snapshot_id: outcome.snapshot_id,
3800            })
3801        });
3802        Ok(ticket)
3803    }
3804
3805    /// Phase A+B+C of the flush: flush the WAL, rotate L0 (so the
3806    /// to-be-flushed buffer moves to `pending_flush` and a fresh L0 takes
3807    /// its place), and hand off the WAL to the new L0.
3808    ///
3809    /// Runs in microseconds. Must be called under `flush_lock` (the caller
3810    /// is responsible). The returned [`RotateOutput`] carries everything
3811    /// the subsequent stream + finalize phases need; in particular the
3812    /// [`FlushInProgressGuard`] is bound to the return value so it stays
3813    /// alive for the full flush lifetime — including any future async
3814    /// path where stream runs on a spawned task.
3815    async fn flush_l0_rotate(&self) -> Result<RotateOutput> {
3816        // Acquire the in-progress counter BEFORE any heavy work. The
3817        // guard lives on RotateOutput; dropping RotateOutput drops the
3818        // guard, so the counter goes back to zero exactly when the flush
3819        // is fully done.
3820        let flush_in_progress_guard = FlushInProgressGuard::new(&self.storage);
3821
3822        // A. Flush WAL BEFORE rotating L0. If WAL flush fails, the
3823        // current L0 is still active and mutations are retained in
3824        // memory until restart/retry.
3825        let wal_for_truncate = {
3826            let current_l0 = self.l0_manager.get_current();
3827            let l0_guard = current_l0.read();
3828            l0_guard.wal.clone()
3829        };
3830        // Test-only seam (no-op without the `failpoints` feature): inject a
3831        // WAL-flush failure here to drive the "failed async rotate wedges the
3832        // finalizer" regression (Bug #3). When configured to "return" it makes
3833        // `flush_l0_rotate` return Err exactly as a real WAL-flush failure would.
3834        fail::fail_point!("flush::rotate-fail", |_| {
3835            Err(anyhow!("flush::rotate-fail injected WAL-flush failure"))
3836        });
3837        let wal_lsn = if let Some(ref w) = wal_for_truncate {
3838            w.flush().await?
3839        } else {
3840            0
3841        };
3842
3843        // B. Begin flush: rotate L0 and keep old L0 visible to reads via
3844        // pending_flush until complete_flush is called by finalize.
3845        let old_l0_arc = self.l0_manager.begin_flush(0, None);
3846        metrics::counter!("uni_l0_buffer_rotations_total").increment(1);
3847
3848        // C. WAL handoff: record wal_lsn on old L0, transfer WAL handle
3849        // and current_version to the new L0.
3850        let current_version;
3851        {
3852            let mut old_l0_guard = old_l0_arc.write();
3853            current_version = old_l0_guard.current_version;
3854            old_l0_guard.wal_lsn_at_flush = wal_lsn;
3855            let wal = old_l0_guard.wal.take();
3856            let new_l0_arc = self.l0_manager.get_current();
3857            let mut new_l0_guard = new_l0_arc.write();
3858            new_l0_guard.wal = wal;
3859            new_l0_guard.current_version = current_version;
3860        }
3861
3862        Ok(RotateOutput {
3863            old_l0_arc,
3864            wal_lsn,
3865            current_version,
3866            flush_in_progress_guard,
3867        })
3868    }
3869
3870    /// Phases D, E, F, G of the flush: L1 collect, orphan resolve,
3871    /// manifest seed, Lance writes. Reads from `old_l0_arc` (kept in
3872    /// pending_flush by Phase B); writes append-only Lance datasets; does
3873    /// NOT call save_snapshot / set_latest_snapshot — those are
3874    /// finalize's job, so the manifest doesn't get published until the
3875    /// next phase.
3876    ///
3877    /// Today takes `&self`; in a follow-up commit this becomes a
3878    /// static `Send + 'static` function over `SharedFlushCtx` so it can
3879    /// run on a spawned task while concurrent commits proceed.
3880    async fn flush_stream_l1(
3881        &self,
3882        old_l0_arc: Arc<RwLock<L0Buffer>>,
3883        wal_lsn: u64,
3884        current_version: u64,
3885        name: Option<String>,
3886    ) -> Result<FlushOutcome> {
3887        // Test-only seam (no-op without the `failpoints` feature): the rotate
3888        // (begin_flush) already moved the to-be-flushed buffer onto
3889        // pending_flush and installed a fresh empty current buffer, but the
3890        // rotated rows are NOT yet durable in Lance. Pausing here holds that
3891        // window open to drive the unique-constraint-hole regression
3892        // (Bug #9 Mechanism A).
3893        fail::fail_point!("flush::after-rotate-before-lance");
3894
3895        // Phase B: materialize any deferred embeddings before column
3896        // extraction. No-op when `defer_embeddings` is off (the set will
3897        // be empty). On-demand reads of the embedding column are a TODO
3898        // for a future revision (see UniConfig::defer_embeddings docs).
3899        self.drain_pending_embeddings(&old_l0_arc).await?;
3900
3901        // Materialise MUVERA FDE columns from each row's source multi-vector (pure/sync;
3902        // no-op without a MUVERA index). Runs after embeddings so a row can be both
3903        // auto-embedded and FDE-encoded.
3904        self.materialize_fde_columns(&old_l0_arc)?;
3905
3906        let schema = self.schema_manager.schema();
3907        // 2. Acquire Read lock on Old L0 for flushing
3908        let mut entries_by_type: HashMap<u32, Vec<L1Entry>> = HashMap::new();
3909        // (Vid, labels, properties, deleted, version)
3910        type VertexEntry = (Vid, Vec<String>, Properties, bool, u64);
3911        let mut vertices_by_label: HashMap<u16, Vec<VertexEntry>> = HashMap::new();
3912        // Partial-column updates (Lance MergeInsert path). Per-VID tuple:
3913        // (vid, full L0 properties map, version, set of keys to update).
3914        // Only the keys in the HashSet are emitted to the partial source;
3915        // the full props map is retained so the per-row column extractor
3916        // can read each touched key's value.
3917        type PartialEntry = (Vid, Properties, u64, std::collections::HashSet<String>);
3918        let mut partial_by_label: HashMap<u16, Vec<PartialEntry>> = HashMap::new();
3919        // DELETE-via-MergeInsert (Round-12 §B): tombstones flush as a
3920        // partial source with just `_vid`, `_deleted=true`, `_version`,
3921        // `_updated_at`. Skips the wide-row Append payload that adds
3922        // nothing on a soft-delete.
3923        let mut tombstones_by_label: HashMap<u16, Vec<(Vid, u64)>> = HashMap::new();
3924        let mut main_vertex_tombstones: Vec<(Vid, u64)> = Vec::new();
3925        // Collect vertex timestamps from L0 for flushing to storage
3926        let mut vertex_created_at: HashMap<Vid, i64> = HashMap::new();
3927        let mut vertex_updated_at: HashMap<Vid, i64> = HashMap::new();
3928        // Track tombstones missing labels for storage query fallback
3929        let mut orphaned_tombstones: Vec<(Vid, u64)> = Vec::new();
3930
3931        {
3932            let old_l0 = old_l0_arc.read();
3933
3934            // 1. Collect all edges and tombstones from L0
3935            for edge in old_l0.graph.edges() {
3936                let properties = old_l0
3937                    .edge_properties
3938                    .get(&edge.eid)
3939                    .cloned()
3940                    .unwrap_or_default();
3941                let version = old_l0.edge_versions.get(&edge.eid).copied().unwrap_or(0);
3942
3943                // Get timestamps from L0 buffer (populated during insert)
3944                let created_at = old_l0.edge_created_at.get(&edge.eid).copied();
3945                let updated_at = old_l0.edge_updated_at.get(&edge.eid).copied();
3946
3947                entries_by_type
3948                    .entry(edge.edge_type)
3949                    .or_default()
3950                    .push(L1Entry {
3951                        src_vid: edge.src_vid,
3952                        dst_vid: edge.dst_vid,
3953                        eid: edge.eid,
3954                        op: Op::Insert,
3955                        version,
3956                        properties,
3957                        created_at,
3958                        updated_at,
3959                    });
3960            }
3961
3962            // From tombstones
3963            for tombstone in old_l0.tombstones.values() {
3964                let version = old_l0
3965                    .edge_versions
3966                    .get(&tombstone.eid)
3967                    .copied()
3968                    .unwrap_or(0);
3969                // Get timestamps - for deletes, updated_at reflects deletion time
3970                let created_at = old_l0.edge_created_at.get(&tombstone.eid).copied();
3971                let updated_at = old_l0.edge_updated_at.get(&tombstone.eid).copied();
3972
3973                entries_by_type
3974                    .entry(tombstone.edge_type)
3975                    .or_default()
3976                    .push(L1Entry {
3977                        src_vid: tombstone.src_vid,
3978                        dst_vid: tombstone.dst_vid,
3979                        eid: tombstone.eid,
3980                        op: Op::Delete,
3981                        version,
3982                        properties: HashMap::new(),
3983                        created_at,
3984                        updated_at,
3985                    });
3986            }
3987
3988            // 1b. Collect vertices by label (using vertex_labels from L0)
3989            //
3990            // Helper: fan-out a single vertex entry into per-label buckets.
3991            // Each per-label table row carries the full label set so multi-label
3992            // info is preserved after flush.
3993            let push_vertex_to_labels =
3994                |vid: Vid,
3995                 all_labels: &[String],
3996                 props: Properties,
3997                 deleted: bool,
3998                 version: u64,
3999                 out: &mut HashMap<u16, Vec<VertexEntry>>| {
4000                    for label in all_labels {
4001                        if let Some(label_id) = schema.label_id_by_name(label) {
4002                            out.entry(label_id).or_default().push((
4003                                vid,
4004                                all_labels.to_vec(),
4005                                props.clone(),
4006                                deleted,
4007                                version,
4008                            ));
4009                        }
4010                    }
4011                };
4012
4013            for (vid, props) in &old_l0.vertex_properties {
4014                let version = old_l0.vertex_versions.get(vid).copied().unwrap_or(0);
4015                // Collect timestamps for this vertex
4016                if let Some(&ts) = old_l0.vertex_created_at.get(vid) {
4017                    vertex_created_at.insert(*vid, ts);
4018                }
4019                if let Some(&ts) = old_l0.vertex_updated_at.get(vid) {
4020                    vertex_updated_at.insert(*vid, ts);
4021                }
4022                if let Some(labels) = old_l0.vertex_labels.get(vid) {
4023                    // Partial-write routing: when this VID was last
4024                    // touched via `insert_vertex_partial` AND the
4025                    // partial_lance_writes flag is on, send only the
4026                    // touched columns to a MergeInsert batch. Otherwise
4027                    // (CREATE, MERGE-ON-CREATE, full-replace SET, DELETE
4028                    // — or flag off) use the existing full-row Append.
4029                    let is_partial = self.config.partial_lance_writes
4030                        && old_l0.vertex_partial_keys.contains_key(vid);
4031                    if is_partial {
4032                        if let Some(touched) = old_l0.vertex_partial_keys.get(vid) {
4033                            for label in labels {
4034                                if let Some(label_id) = schema.label_id_by_name(label) {
4035                                    partial_by_label.entry(label_id).or_default().push((
4036                                        *vid,
4037                                        props.clone(),
4038                                        version,
4039                                        touched.clone(),
4040                                    ));
4041                                }
4042                            }
4043                        }
4044                    } else {
4045                        push_vertex_to_labels(
4046                            *vid,
4047                            labels,
4048                            props.clone(),
4049                            false,
4050                            version,
4051                            &mut vertices_by_label,
4052                        );
4053                    }
4054                }
4055            }
4056            for &vid in &old_l0.vertex_tombstones {
4057                let version = old_l0.vertex_versions.get(&vid).copied().unwrap_or(0);
4058                if let Some(&ts) = old_l0.vertex_updated_at.get(&vid) {
4059                    vertex_updated_at.insert(vid, ts);
4060                }
4061                if let Some(labels) = old_l0.vertex_labels.get(&vid) {
4062                    // Round-12 §B: tombstones flush via Lance MergeInsert
4063                    // (just `_vid`, `_deleted=true`, `_version`,
4064                    // `_updated_at`) — skipping the wide-row Append.
4065                    // Unconditional (no `partial_lance_writes` gating);
4066                    // tombstone Append carries no useful payload.
4067                    for label in labels {
4068                        if let Some(label_id) = schema.label_id_by_name(label) {
4069                            tombstones_by_label
4070                                .entry(label_id)
4071                                .or_default()
4072                                .push((vid, version));
4073                        }
4074                    }
4075                } else {
4076                    // Tombstone missing labels (old WAL format) - collect for storage query fallback
4077                    orphaned_tombstones.push((vid, version));
4078                }
4079            }
4080        } // Drop read lock
4081
4082        // Resolve orphaned tombstones (missing labels) from storage
4083        if !orphaned_tombstones.is_empty() {
4084            tracing::warn!(
4085                count = orphaned_tombstones.len(),
4086                "Tombstones missing labels in L0, querying storage as fallback"
4087            );
4088            for (vid, version) in orphaned_tombstones {
4089                if let Ok(Some(labels)) = self.find_vertex_labels_in_storage(vid).await
4090                    && !labels.is_empty()
4091                {
4092                    for label in &labels {
4093                        if let Some(label_id) = schema.label_id_by_name(label) {
4094                            // Round-12 §B: route through partial tombstone too.
4095                            tombstones_by_label
4096                                .entry(label_id)
4097                                .or_default()
4098                                .push((vid, version));
4099                        }
4100                    }
4101                }
4102            }
4103        }
4104
4105        // 1. Load previous snapshot from cache, or fall back to storage.
4106        //
4107        // Use clone() not take(): for the async path, multiple
4108        // concurrent streams may run; if we take() here, a sibling
4109        // stream sees cached_manifest = None and seeds from
4110        // load_latest_snapshot (stale), losing the chain. clone()
4111        // preserves the parent. Finalize writes back the new manifest
4112        // unconditionally.
4113        let mut manifest = if let Some(cached) = self.cached_manifest.lock().clone() {
4114            cached
4115        } else {
4116            self.storage
4117                .snapshot_manager()
4118                .load_latest_snapshot()
4119                .await?
4120                .unwrap_or_else(|| {
4121                    SnapshotManifest::new(Uuid::new_v4().to_string(), schema.schema_version)
4122                })
4123        };
4124
4125        // Update snapshot metadata
4126        // Save parent snapshot ID before generating new one (for lineage tracking)
4127        let parent_id = manifest.snapshot_id.clone();
4128        manifest.parent_snapshot = Some(parent_id);
4129        manifest.snapshot_id = Uuid::new_v4().to_string();
4130        manifest.name = name;
4131        manifest.created_at = Utc::now();
4132        manifest.version_high_water_mark = current_version;
4133        manifest.wal_high_water_mark = wal_lsn;
4134        let snapshot_id = manifest.snapshot_id.clone();
4135
4136        tracing::Span::current().record("snapshot_id", &snapshot_id);
4137
4138        // 2. Write main unified tables FIRST (before deltas).
4139        //    Ensures the dual-write invariant: by the time an EID appears in a
4140        //    delta table, it already exists in main_edges. This prevents the
4141        //    compaction debug_assert from firing when compaction interleaves
4142        //    with flush at async yield points.
4143        //
4144        // 2.1 Main edges table
4145        let (main_edges, edge_created_at_map, edge_updated_at_map) = {
4146            let _old_l0 = old_l0_arc.read();
4147            let mut main_edges: Vec<(
4148                uni_common::core::id::Eid,
4149                Vid,
4150                Vid,
4151                String,
4152                Properties,
4153                bool,
4154                u64,
4155            )> = Vec::new();
4156            let mut edge_created_at_map: HashMap<uni_common::core::id::Eid, i64> = HashMap::new();
4157            let mut edge_updated_at_map: HashMap<uni_common::core::id::Eid, i64> = HashMap::new();
4158
4159            for (&edge_type_id, entries) in entries_by_type.iter() {
4160                for entry in entries {
4161                    let edge_type_name = self
4162                        .storage
4163                        .schema_manager()
4164                        .edge_type_name_by_id_unified(edge_type_id)
4165                        .unwrap_or_else(|| "unknown".to_string());
4166
4167                    let deleted = matches!(entry.op, Op::Delete);
4168                    main_edges.push((
4169                        entry.eid,
4170                        entry.src_vid,
4171                        entry.dst_vid,
4172                        edge_type_name,
4173                        entry.properties.clone(),
4174                        deleted,
4175                        entry.version,
4176                    ));
4177
4178                    if let Some(ts) = entry.created_at {
4179                        edge_created_at_map.insert(entry.eid, ts);
4180                    }
4181                    if let Some(ts) = entry.updated_at {
4182                        edge_updated_at_map.insert(entry.eid, ts);
4183                    }
4184                }
4185            }
4186
4187            (main_edges, edge_created_at_map, edge_updated_at_map)
4188        };
4189
4190        if !main_edges.is_empty() {
4191            let main_edge_batch = MainEdgeDataset::build_record_batch(
4192                &main_edges,
4193                Some(&edge_created_at_map),
4194                Some(&edge_updated_at_map),
4195            )?;
4196            MainEdgeDataset::write_batch(self.storage.backend(), main_edge_batch).await?;
4197            MainEdgeDataset::ensure_default_indexes(self.storage.backend()).await?;
4198        }
4199
4200        // 2.2 Main vertices table
4201        let mut main_vertices: Vec<(Vid, Vec<String>, Properties, bool, u64)> = {
4202            let old_l0 = old_l0_arc.read();
4203            let mut vertices = Vec::new();
4204
4205            // Live vertices: full-row Append on the main table (the
4206            // props_json blob is required for global ID lookups). For
4207            // partial-row VIDs (vertex_partial_keys non-empty), the
4208            // main table still needs the full props for the
4209            // ext_id-uniqueness path; we keep the Append here. The
4210            // per-label Lance write IS partial via MergeInsert.
4211            for (vid, props) in &old_l0.vertex_properties {
4212                let version = old_l0.vertex_versions.get(vid).copied().unwrap_or(0);
4213                let labels = old_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
4214                vertices.push((*vid, labels, props.clone(), false, version));
4215            }
4216
4217            // Tombstones: collected into `main_vertex_tombstones` for
4218            // the MergeInsert path below; skipping the wide-row Append.
4219            for &vid in &old_l0.vertex_tombstones {
4220                let version = old_l0.vertex_versions.get(&vid).copied().unwrap_or(0);
4221                main_vertex_tombstones.push((vid, version));
4222            }
4223
4224            vertices
4225        };
4226
4227        // M8: durable label-only mutations across flush windows.
4228        //
4229        // `SET n:Label` / `REMOVE n:Label` mark the vid in
4230        // `vertex_label_overwrites` and update `vertex_labels`, but for a
4231        // vid flushed in a PRIOR window they never re-add it to
4232        // `vertex_properties`. The loops above key off `vertex_properties`,
4233        // so such a relabel would be silently lost: absent from the main
4234        // table, the per-label datasets, and the VidLabelsIndex (and so
4235        // `rebuild_vid_labels_index` reads stale labels after a restart).
4236        // The same-window create+relabel case already works because the
4237        // create put the vid in `vertex_properties`.
4238        //
4239        // Re-derive each overwrite-only vid by fetching its persisted props
4240        // and labels, then route it into `main_vertices` (main table +
4241        // index), the new per-label datasets, and a tombstone in any
4242        // per-label dataset it left. `MATCH (n:OldLabel)` scans the
4243        // per-label table directly, so the old-label tombstone is required.
4244        let overwrite_only: Vec<(Vid, Vec<String>, u64)> = {
4245            let old_l0 = old_l0_arc.read();
4246            old_l0
4247                .vertex_label_overwrites
4248                .iter()
4249                .filter(|vid| {
4250                    !old_l0.vertex_properties.contains_key(*vid)
4251                        && !old_l0.vertex_tombstones.contains(*vid)
4252                })
4253                .map(|vid| {
4254                    let labels = old_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
4255                    let version = old_l0.vertex_versions.get(vid).copied().unwrap_or(0);
4256                    (*vid, labels, version)
4257                })
4258                .collect()
4259        };
4260        for (vid, new_labels, version) in overwrite_only {
4261            // Persisted props of the prior-window row — required so the
4262            // re-Appended main row does not blank the vertex's properties.
4263            let Some(props) = MainVertexDataset::find_props_by_vid(
4264                self.storage.backend(),
4265                vid,
4266                self.storage.version_high_water_mark(),
4267            )
4268            .await?
4269            else {
4270                tracing::warn!(
4271                    vid = vid.as_u64(),
4272                    "label-only mutation for a vid with no persisted main row; skipping flush \
4273                     of its relabel"
4274                );
4275                continue;
4276            };
4277            // Labels the vid carried BEFORE this relabel; the storage read
4278            // reflects pre-flush state. Any label no longer present must be
4279            // tombstoned in its per-label dataset.
4280            let old_labels = self
4281                .find_vertex_labels_in_storage(vid)
4282                .await?
4283                .unwrap_or_default();
4284
4285            main_vertices.push((vid, new_labels.clone(), props.clone(), false, version));
4286            for label in &new_labels {
4287                if let Some(label_id) = schema.label_id_by_name(label) {
4288                    vertices_by_label.entry(label_id).or_default().push((
4289                        vid,
4290                        new_labels.clone(),
4291                        props.clone(),
4292                        false,
4293                        version,
4294                    ));
4295                }
4296            }
4297            for label in &old_labels {
4298                if !new_labels.contains(label)
4299                    && let Some(label_id) = schema.label_id_by_name(label)
4300                {
4301                    tombstones_by_label
4302                        .entry(label_id)
4303                        .or_default()
4304                        .push((vid, version));
4305                }
4306            }
4307        }
4308
4309        if !main_vertices.is_empty() {
4310            let main_vertex_batch = MainVertexDataset::build_record_batch(
4311                &main_vertices,
4312                Some(&vertex_created_at),
4313                Some(&vertex_updated_at),
4314            )?;
4315            MainVertexDataset::write_batch(self.storage.backend(), main_vertex_batch).await?;
4316        }
4317        // Round-12 §B: tombstones via MergeInsert on the main vertices
4318        // table. Independent of `vertex_properties` length.
4319        if !main_vertex_tombstones.is_empty() {
4320            let tomb_batch = MainVertexDataset::build_tombstone_partial_batch(
4321                &main_vertex_tombstones,
4322                Some(&vertex_updated_at),
4323            )?;
4324            MainVertexDataset::merge_insert_tombstone_batch(self.storage.backend(), tomb_batch)
4325                .await?;
4326        }
4327        if !main_vertices.is_empty() || !main_vertex_tombstones.is_empty() {
4328            MainVertexDataset::ensure_default_indexes(self.storage.backend()).await?;
4329        }
4330
4331        // Keep the VidLabelsIndex current for every flushed vertex. This is the
4332        // single place that sees all vertices: the per-label fan-out below skips
4333        // undeclared (schemaless) labels, so updating the index there would miss
4334        // them. Traversal-time label predicates read this index to resolve
4335        // labels for vertices that live only in Lance — notably on a fork, whose
4336        // data is flushed to Lance before branching. (GitHub #99)
4337        for (vid, labels, _props, _deleted, _version) in &main_vertices {
4338            self.storage.update_vid_labels_index(*vid, labels.clone());
4339        }
4340        for (vid, _version) in &main_vertex_tombstones {
4341            self.storage.remove_from_vid_labels_index(*vid);
4342        }
4343
4344        // 3. For each edge type, write FWD and BWD delta runs
4345        for (&edge_type_id, entries) in entries_by_type.iter() {
4346            // Get edge type name from unified lookup (handles both schema'd and schemaless)
4347            let edge_type_name = self
4348                .storage
4349                .schema_manager()
4350                .edge_type_name_by_id_unified(edge_type_id)
4351                .ok_or_else(|| anyhow!("Edge type ID {} not found", edge_type_id))?;
4352
4353            // FWD Run (sorted by src_vid)
4354            // Round-12 §A: split entries into full-row Append and
4355            // partial MergeInsert routes based on `edge_partial_keys`.
4356            // Edges in `edge_partial_keys` were last written via
4357            // `insert_edge_partial_full`; the per-edge-type delta
4358            // tables receive only the touched schema columns plus
4359            // (when any overflow key was touched) the regenerated
4360            // `overflow_json` blob. Untouched columns retain their
4361            // previous-version value via Lance MergeInsert.
4362            let partial_eids: std::collections::HashSet<Eid> = {
4363                let old_l0 = old_l0_arc.read();
4364                entries
4365                    .iter()
4366                    .filter(|e| {
4367                        self.config.partial_lance_writes
4368                            && old_l0.edge_partial_keys.contains_key(&e.eid)
4369                    })
4370                    .map(|e| e.eid)
4371                    .collect()
4372            };
4373            let touched_union_by_eid: HashMap<Eid, std::collections::HashSet<String>> = {
4374                let old_l0 = old_l0_arc.read();
4375                partial_eids
4376                    .iter()
4377                    .filter_map(|eid| old_l0.edge_partial_keys.get(eid).map(|s| (*eid, s.clone())))
4378                    .collect()
4379            };
4380            let (full_entries, partial_entries): (Vec<L1Entry>, Vec<L1Entry>) = entries
4381                .clone()
4382                .into_iter()
4383                .partition(|e| !partial_eids.contains(&e.eid));
4384
4385            let backend = self.storage.backend();
4386
4387            // FWD run (sorted by src_vid)
4388            let mut fwd_full = full_entries.clone();
4389            fwd_full.sort_by_key(|e| e.src_vid);
4390            let mut fwd_partial = partial_entries.clone();
4391            fwd_partial.sort_by_key(|e| e.src_vid);
4392            let fwd_ds = self.storage.delta_dataset(&edge_type_name, "fwd")?;
4393            if !fwd_full.is_empty() {
4394                let fwd_batch = fwd_ds.build_record_batch(&fwd_full, &schema)?;
4395                fwd_ds.write_run(backend, fwd_batch).await?;
4396            }
4397            if !fwd_partial.is_empty() {
4398                let touched_union: std::collections::HashSet<String> = fwd_partial
4399                    .iter()
4400                    .flat_map(|e| {
4401                        touched_union_by_eid
4402                            .get(&e.eid)
4403                            .cloned()
4404                            .unwrap_or_default()
4405                            .into_iter()
4406                    })
4407                    .collect();
4408                let fwd_partial_batch =
4409                    fwd_ds.build_partial_record_batch(&fwd_partial, &touched_union, &schema)?;
4410                fwd_ds
4411                    .merge_insert_partial_run(backend, fwd_partial_batch)
4412                    .await?;
4413            }
4414            fwd_ds.ensure_eid_index(backend).await?;
4415
4416            // BWD Run (sorted by dst_vid)
4417            let mut bwd_full = full_entries.clone();
4418            bwd_full.sort_by_key(|e| e.dst_vid);
4419            let mut bwd_partial = partial_entries.clone();
4420            bwd_partial.sort_by_key(|e| e.dst_vid);
4421            let bwd_ds = self.storage.delta_dataset(&edge_type_name, "bwd")?;
4422            if !bwd_full.is_empty() {
4423                let bwd_batch = bwd_ds.build_record_batch(&bwd_full, &schema)?;
4424                bwd_ds.write_run(backend, bwd_batch).await?;
4425            }
4426            if !bwd_partial.is_empty() {
4427                let touched_union: std::collections::HashSet<String> = bwd_partial
4428                    .iter()
4429                    .flat_map(|e| {
4430                        touched_union_by_eid
4431                            .get(&e.eid)
4432                            .cloned()
4433                            .unwrap_or_default()
4434                            .into_iter()
4435                    })
4436                    .collect();
4437                let bwd_partial_batch =
4438                    bwd_ds.build_partial_record_batch(&bwd_partial, &touched_union, &schema)?;
4439                bwd_ds
4440                    .merge_insert_partial_run(backend, bwd_partial_batch)
4441                    .await?;
4442            }
4443            bwd_ds.ensure_eid_index(backend).await?;
4444
4445            // Update Manifest
4446            let current_snap =
4447                manifest
4448                    .edges
4449                    .entry(edge_type_name.to_string())
4450                    .or_insert(EdgeSnapshot {
4451                        version: 0,
4452                        count: 0,
4453                        lance_version: 0,
4454                    });
4455            current_snap.version += 1;
4456            current_snap.count += entries.len() as u64;
4457            // LanceDB tables don't expose Lance version directly
4458            current_snap.lance_version = 0;
4459
4460            // Note: No CSR invalidation needed. AdjacencyManager's overlay
4461            // already has these edges via dual-write in insert_edge/delete_edge.
4462        }
4463
4464        // 4. Per-label vertex table writes
4465        // Iterate all labels that have either full-row OR partial-write
4466        // data pending. A label may appear in only one of the two maps
4467        // (e.g., all updates on this label were partial-only).
4468        let all_label_ids: std::collections::HashSet<u16> = vertices_by_label
4469            .keys()
4470            .chain(partial_by_label.keys())
4471            .chain(tombstones_by_label.keys())
4472            .copied()
4473            .collect();
4474        for label_id in all_label_ids {
4475            let vertices = vertices_by_label.remove(&label_id).unwrap_or_default();
4476            let label_name = schema
4477                .label_name_by_id(label_id)
4478                .ok_or_else(|| anyhow!("Label ID {} not found", label_id))?;
4479
4480            let ds = self.storage.vertex_dataset(label_name)?;
4481
4482            // Collect inverted index updates before consuming vertices
4483            // Maps: cfg.property -> (added, removed)
4484            type InvertedUpdateMap = HashMap<String, (HashMap<Vid, Vec<String>>, HashSet<Vid>)>;
4485            let mut inverted_updates: InvertedUpdateMap = HashMap::new();
4486
4487            for idx in &schema.indexes {
4488                if let IndexDefinition::Inverted(cfg) = idx
4489                    && cfg.label == label_name
4490                {
4491                    let mut added: HashMap<Vid, Vec<String>> = HashMap::new();
4492                    let mut removed: HashSet<Vid> = HashSet::new();
4493
4494                    for (vid, _labels, props, deleted, _version) in &vertices {
4495                        if *deleted {
4496                            removed.insert(*vid);
4497                        } else if let Some(prop_value) = props.get(&cfg.property) {
4498                            // Extract terms from the property value (List<String>)
4499                            if let Some(arr) = prop_value.as_array() {
4500                                let terms: Vec<String> = arr
4501                                    .iter()
4502                                    .filter_map(|v| v.as_str().map(ToString::to_string))
4503                                    .collect();
4504                                if !terms.is_empty() {
4505                                    added.insert(*vid, terms);
4506                                }
4507                            }
4508                        }
4509                    }
4510                    // Round-12 §B: tombstones no longer in `vertices`;
4511                    // pull them from `tombstones_by_label` for inverted
4512                    // index removal.
4513                    if let Some(tomb_rows) = tombstones_by_label.get(&label_id) {
4514                        for (vid, _) in tomb_rows {
4515                            removed.insert(*vid);
4516                        }
4517                    }
4518
4519                    if !added.is_empty() || !removed.is_empty() {
4520                        inverted_updates.insert(cfg.property.clone(), (added, removed));
4521                    }
4522                }
4523            }
4524
4525            let mut v_data = Vec::new();
4526            let mut d_data = Vec::new();
4527            let mut ver_data = Vec::new();
4528            for (vid, labels, props, deleted, version) in vertices {
4529                v_data.push((vid, labels, props));
4530                d_data.push(deleted);
4531                ver_data.push(version);
4532            }
4533
4534            let backend = self.storage.backend();
4535
4536            // Skip the full-row Append entirely if this label only has
4537            // partial-write rows pending.
4538            if !v_data.is_empty() {
4539                let batch = ds.build_record_batch_with_timestamps(
4540                    &v_data,
4541                    &d_data,
4542                    &ver_data,
4543                    &schema,
4544                    Some(&vertex_created_at),
4545                    Some(&vertex_updated_at),
4546                )?;
4547                ds.write_batch(backend, batch, &schema).await?;
4548            }
4549
4550            // Partial-column batch (Lance MergeInsert path). The flag
4551            // gates whether the routing classified any VIDs as partial;
4552            // outside the flag this collection is always empty so the
4553            // call below is a cheap no-op.
4554            if let Some(partial_rows) = partial_by_label.remove(&label_id)
4555                && !partial_rows.is_empty()
4556            {
4557                let touched_union: std::collections::HashSet<String> = partial_rows
4558                    .iter()
4559                    .flat_map(|(_, _, _, keys)| keys.iter().cloned())
4560                    .collect();
4561                let pairs: Vec<(Vid, Properties)> = partial_rows
4562                    .iter()
4563                    .map(|(vid, props, _, _)| (*vid, props.clone()))
4564                    .collect();
4565                let versions: Vec<u64> = partial_rows.iter().map(|(_, _, v, _)| *v).collect();
4566                let partial_batch = ds.build_partial_record_batch(
4567                    &pairs,
4568                    &versions,
4569                    &touched_union,
4570                    &schema,
4571                    Some(&vertex_updated_at),
4572                )?;
4573                if partial_batch.num_rows() > 0 {
4574                    ds.merge_insert_batch(backend, partial_batch).await?;
4575                }
4576            }
4577
4578            // Tombstone batch (Round-12 §B): always MergeInsert with
4579            // just `_vid`, `_deleted=true`, `_version`, `_updated_at`.
4580            // No partial_lance_writes gating — tombstones never carry
4581            // useful property payload to write. Captured tombstone vids
4582            // also drive `remove_from_vid_labels_index` below.
4583            let tombstone_rows = tombstones_by_label.remove(&label_id).unwrap_or_default();
4584            if !tombstone_rows.is_empty() {
4585                let tomb_batch =
4586                    ds.build_tombstone_partial_batch(&tombstone_rows, Some(&vertex_updated_at))?;
4587                if tomb_batch.num_rows() > 0 {
4588                    ds.merge_insert_batch(backend, tomb_batch).await?;
4589                }
4590            }
4591
4592            ds.ensure_default_indexes(backend).await?;
4593
4594            // VidLabelsIndex maintenance is centralized at the main-vertex
4595            // flush above (it sees both schema'd and schemaless vertices).
4596
4597            // Update Manifest
4598            let current_snap =
4599                manifest
4600                    .vertices
4601                    .entry(label_name.to_string())
4602                    .or_insert(LabelSnapshot {
4603                        version: 0,
4604                        count: 0,
4605                        lance_version: 0,
4606                    });
4607            current_snap.version += 1;
4608            current_snap.count += v_data.len() as u64;
4609            // LanceDB tables don't expose Lance version directly
4610            current_snap.lance_version = 0;
4611
4612            // Invalidate table cache to ensure next read picks up new version
4613            self.storage.invalidate_table_cache(label_name);
4614
4615            // Apply inverted index updates incrementally
4616            #[cfg(feature = "lance-backend")]
4617            for idx in &schema.indexes {
4618                if let IndexDefinition::Inverted(cfg) = idx
4619                    && cfg.label == label_name
4620                    && let Some((added, removed)) = inverted_updates.get(&cfg.property)
4621                {
4622                    self.storage
4623                        .index_manager()
4624                        .update_inverted_index_incremental(cfg, added, removed)
4625                        .await?;
4626                }
4627            }
4628
4629            // Update UID index with new vertex mappings
4630            // Collect (UniId, Vid) mappings from non-deleted vertices
4631            #[cfg(feature = "lance-backend")]
4632            {
4633                let mut uid_mappings: Vec<(uni_common::core::id::UniId, Vid)> = Vec::new();
4634                for (vid, _labels, props) in &v_data {
4635                    let ext_id = props.get("ext_id").and_then(|v| v.as_str());
4636                    let uid = crate::storage::vertex::VertexDataset::compute_vertex_uid(
4637                        label_name, ext_id, props,
4638                    );
4639                    uid_mappings.push((uid, *vid));
4640                }
4641
4642                if !uid_mappings.is_empty()
4643                    && let Ok(uid_index) = self.storage.uid_index(label_name)
4644                {
4645                    // Stamp mappings with this flush's MVCC version so a later
4646                    // re-create of the same UID deterministically outranks the
4647                    // stale mapping (review C3).
4648                    uid_index
4649                        .write_mapping_versioned(&uid_mappings, current_version)
4650                        .await?;
4651                }
4652            }
4653        }
4654        Ok(FlushOutcome {
4655            manifest,
4656            snapshot_id,
4657        })
4658    }
4659
4660    /// Composition entry that assumes the caller already holds `flush_lock`.
4661    /// Runs rotate + stream + finalize_locked in sequence. Used by
4662    /// [`Writer::flush_to_l1`] (acquires the lock first) and by
4663    /// `commit_transaction_l0`'s post-merge auto-flush branch (which already
4664    /// holds the lock from the commit critical section).
4665    #[instrument(
4666        skip(self),
4667        fields(snapshot_id, mutations_count, size_bytes),
4668        level = "info"
4669    )]
4670    async fn flush_inline_under_lock(&self, name: Option<String>) -> Result<String> {
4671        let start = std::time::Instant::now();
4672
4673        let (initial_size, initial_count) = {
4674            let l0_arc = self.l0_manager.get_current();
4675            let l0 = l0_arc.read();
4676            (l0.estimated_size, l0.mutation_count)
4677        };
4678        tracing::Span::current().record("size_bytes", initial_size);
4679        tracing::Span::current().record("mutations_count", initial_count);
4680
4681        debug!("Starting L0 flush to L1");
4682
4683        // Phases A (WAL pre-flush), B (rotate), C (WAL handoff).
4684        // FlushInProgressGuard lives on RotateOutput and stays alive for
4685        // the full flush — including the finalize_locked call below.
4686        let RotateOutput {
4687            old_l0_arc,
4688            wal_lsn,
4689            current_version,
4690            flush_in_progress_guard: _flush_guard,
4691        } = self.flush_l0_rotate().await?;
4692
4693        // Phases D (L1 collect), E (orphan resolve), F (manifest seed),
4694        // G (Lance writes). Builds the manifest but does NOT publish it.
4695        let FlushOutcome {
4696            manifest,
4697            snapshot_id,
4698        } = self
4699            .flush_stream_l1(old_l0_arc.clone(), wal_lsn, current_version, name)
4700            .await?;
4701
4702        // Phases H..S: publish manifest, complete_flush, WAL truncate,
4703        // property cache clear, last_flush_time, metrics, l1_runs++,
4704        // compaction trigger, index-rebuild scheduling, fork tick.
4705        self.flush_finalize_locked(
4706            old_l0_arc,
4707            wal_lsn,
4708            manifest,
4709            snapshot_id,
4710            initial_size,
4711            initial_count,
4712            start,
4713        )
4714        .await
4715    }
4716
4717    /// Phases H..S of the flush: publish the manifest and run all
4718    /// post-publish bookkeeping. Assumes the caller already holds
4719    /// `flush_lock` — see [`Writer::flush_finalize_now`] for the
4720    /// lock-acquiring variant used by the async finalize path.
4721    #[allow(clippy::too_many_arguments)]
4722    async fn flush_finalize_locked(
4723        &self,
4724        old_l0_arc: Arc<RwLock<L0Buffer>>,
4725        wal_lsn: u64,
4726        manifest: SnapshotManifest,
4727        snapshot_id: String,
4728        initial_size: usize,
4729        initial_count: usize,
4730        start: std::time::Instant,
4731    ) -> Result<String> {
4732        Self::flush_finalize_body(
4733            &self.shared_ctx(),
4734            old_l0_arc,
4735            wal_lsn,
4736            manifest,
4737            snapshot_id,
4738            initial_size,
4739            initial_count,
4740            start,
4741        )
4742        .await
4743    }
4744
4745    /// Phases H..S of the flush, lock-acquiring variant. Used by the
4746    /// async-flush finalizer task (running on a spawned tokio task),
4747    /// which holds neither `&self` nor `flush_lock`. Briefly re-acquires
4748    /// `flush_lock` to serialize the publish boundary, then runs the
4749    /// same body as `flush_finalize_locked` but over a SharedFlushCtx.
4750    #[allow(clippy::too_many_arguments)]
4751    pub(crate) async fn flush_finalize_now(
4752        shared: SharedFlushCtx,
4753        old_l0_arc: Arc<RwLock<L0Buffer>>,
4754        wal_lsn: u64,
4755        manifest: SnapshotManifest,
4756        snapshot_id: String,
4757        initial_size: usize,
4758        initial_count: usize,
4759        start: std::time::Instant,
4760    ) -> Result<String> {
4761        let _flush_lock_guard = shared.flush_lock.clone().lock_owned().await;
4762        Self::flush_finalize_body(
4763            &shared,
4764            old_l0_arc,
4765            wal_lsn,
4766            manifest,
4767            snapshot_id,
4768            initial_size,
4769            initial_count,
4770            start,
4771        )
4772        .await
4773    }
4774
4775    /// Shared body of `flush_finalize_locked` and `flush_finalize_now`.
4776    /// Static over `SharedFlushCtx`; the caller is responsible for
4777    /// holding `flush_lock`.
4778    #[allow(clippy::too_many_arguments)]
4779    async fn flush_finalize_body(
4780        shared: &SharedFlushCtx,
4781        old_l0_arc: Arc<RwLock<L0Buffer>>,
4782        wal_lsn: u64,
4783        mut manifest: SnapshotManifest,
4784        snapshot_id: String,
4785        initial_size: usize,
4786        initial_count: usize,
4787        start: std::time::Instant,
4788    ) -> Result<String> {
4789        // Parent-snapshot fixup. The stream phase built `manifest` with
4790        // parent_snapshot set from cached_manifest at stream time. If
4791        // OTHER flushes (sync or async) have finalized since then,
4792        // cached_manifest has advanced. Re-link this manifest to the
4793        // current cached chain so we don't orphan their data when we
4794        // overwrite cached_manifest below.
4795        let current_parent_id = shared
4796            .cached_manifest
4797            .lock()
4798            .as_ref()
4799            .map(|m| m.snapshot_id.clone());
4800        if current_parent_id.is_some() && manifest.parent_snapshot != current_parent_id {
4801            manifest.parent_snapshot = current_parent_id;
4802            metrics::counter!("uni_flush_parent_chain_fixups_total").increment(1);
4803        }
4804
4805        // H. Publish manifest (body first, then pointer — recovery is
4806        // idempotent if we crash between the two).
4807        // A fork writer must publish to a fork-scoped namespace, never the
4808        // global `catalog/latest` that the primary reopen reads (review C1).
4809        debug_assert_eq!(
4810            shared.fork_id.is_some(),
4811            shared.storage.snapshot_manager().is_fork_scoped(),
4812            "fork writer must publish to a fork-scoped snapshot namespace (review C1)"
4813        );
4814        shared
4815            .storage
4816            .snapshot_manager()
4817            .save_snapshot(&manifest)
4818            .await?;
4819        shared
4820            .storage
4821            .snapshot_manager()
4822            .set_latest_snapshot(&manifest.snapshot_id)
4823            .await?;
4824
4825        // H2. Durability barrier (review C4). `save_snapshot` / `set_latest_snapshot`
4826        // wrote the manifest body and the `catalog/latest` pointer through the
4827        // object store, which does NOT fsync. WAL truncation (K, below) removes
4828        // the only other durable copy of this flush's data, so a crash after K
4829        // but before the OS flushed those writes would lose the snapshot —
4830        // recovery could not resolve `latest`. Make them durable now (local-fs
4831        // only; remote stores provide their own durability on `put`).
4832        crate::snapshot::manager::fsync_snapshot_pointer(
4833            shared.storage.local_fs_root().as_deref(),
4834            shared.fork_id.as_ref(),
4835            &manifest.snapshot_id,
4836        )
4837        .map_err(|e| {
4838            anyhow!(
4839                "fsync snapshot {} before WAL truncate: {}",
4840                manifest.snapshot_id,
4841                e
4842            )
4843        })?;
4844
4845        // I. Cache manifest for next flush to avoid re-reading from object store.
4846        *shared.cached_manifest.lock() = Some(manifest.clone());
4847
4848        // L. Invalidate the property cache BEFORE removing the flushed buffer
4849        // from the L0 chain (Bug #10). `clear_cache` has no dependency on the
4850        // complete_flush (J) / WAL-truncate (K) steps below, so clearing it
4851        // first closes the non-monotonic-read window: once the buffer leaves
4852        // the L0 chain at J a freshly-written value would otherwise miss the
4853        // chain and fall through to a stale cache entry. By the time finalize
4854        // runs the streamed rows are already durable in L1, so a post-clear
4855        // read falls through to fresh storage instead. The finalizer holds
4856        // `flush_lock` throughout, so reordering L ahead of J is safe.
4857        if let Some(ref pm) = shared.property_manager {
4858            pm.clear_cache().await;
4859        }
4860
4861        // J. Complete flush: remove old L0 from pending_flush. MUST happen
4862        // BEFORE WAL truncation so min_pending_wal_lsn is accurate.
4863        shared.l0_manager.complete_flush(&old_l0_arc);
4864
4865        // Test-only seam (no-op without the `failpoints` feature): pause AFTER
4866        // complete_flush removed the buffer from the L0 chain (J) but BEFORE
4867        // WAL truncation (K). The property cache is already cleared (L moved
4868        // ahead of J above), so a read in this window falls through to fresh
4869        // L1 storage rather than a stale cache entry (Bug #10 — non-monotonic
4870        // read after flush finalize).
4871        fail::fail_point!("flush::after-complete-before-cache-clear");
4872
4873        // K. Truncate WAL up to the safe LSN.
4874        let wal_handle = shared.l0_manager.get_current().read().wal.clone();
4875        if let Some(w) = wal_handle {
4876            let safe_lsn = shared
4877                .l0_manager
4878                .min_pending_wal_lsn()
4879                .map(|min_pending| min_pending.min(wal_lsn))
4880                .unwrap_or(wal_lsn);
4881            w.truncate_before(safe_lsn).await?;
4882        }
4883
4884        // M. Reset last flush time for time-based auto-flush.
4885        *shared.last_flush_time.lock() = std::time::Instant::now();
4886
4887        info!(
4888            snapshot_id,
4889            mutations_count = initial_count,
4890            size_bytes = initial_size,
4891            "L0 flush to L1 completed successfully"
4892        );
4893        metrics::histogram!("uni_flush_duration_seconds").record(start.elapsed().as_secs_f64());
4894        metrics::counter!("uni_flush_bytes_total").increment(initial_size as u64);
4895        metrics::counter!("uni_flush_rows_total").increment(initial_count as u64);
4896
4897        // P. Increment flush generation counter for write throttling.
4898        {
4899            let mut status = uni_common::sync::acquire_mutex(
4900                &shared.storage.compaction_status,
4901                "compaction_status",
4902            )?;
4903            status.l1_runs += 1;
4904        }
4905
4906        // Q. Trigger CSR compaction if enough frozen segments have accumulated.
4907        let am = shared.adjacency_manager.clone();
4908        if am.should_compact(shared.compaction_config.frozen_segments_compact_threshold) {
4909            let previous_still_running = {
4910                let guard = shared.compaction_handle.read();
4911                guard.as_ref().is_some_and(|h| !h.is_finished())
4912            };
4913            if previous_still_running {
4914                info!("Skipping compaction: previous compaction still in progress");
4915            } else {
4916                let handle = tokio::spawn(async move {
4917                    am.compact();
4918                });
4919                *shared.compaction_handle.write() = Some(handle);
4920            }
4921        }
4922
4923        // R. Post-flush: check if any indexes need rebuilding based on thresholds.
4924        if shared.auto_rebuild_enabled
4925            && let Some(rebuild_mgr) = shared.index_rebuild_manager.get()
4926        {
4927            Self::schedule_index_rebuilds_if_needed_static(
4928                &manifest,
4929                rebuild_mgr.clone(),
4930                shared.schema_manager.clone(),
4931                shared.index_rebuild_config.clone(),
4932            );
4933        }
4934
4935        // S. Emit fork-fragment observability after a successful forked flush.
4936        Self::tick_fork_fragment_observability_static(
4937            shared.fork_id,
4938            shared.fork_flush_count.clone(),
4939            shared.fork_fragment_warn_fired.clone(),
4940            shared.fork_fragment_warn_threshold,
4941        );
4942
4943        Ok(snapshot_id)
4944    }
4945
4946    /// Increment fork-flush bookkeeping and fire the fragment warn
4947    /// once if the threshold is crossed.
4948    ///
4949    /// Each flush typically appends ~1 fragment per touched dataset on
4950    /// the fork's branches; without compaction (deferred to Phase 5)
4951    /// long-lived heavy-write forks degrade. The flush count is a
4952    /// proxy for actual fragment growth — reading
4953    /// `Dataset::manifest().fragments.len()` per dataset would add a
4954    /// per-flush object-store roundtrip on the hot commit path, which
4955    /// is too costly for a purely observational guard rail.
4956    ///
4957    /// No-op for primary writers (`fork_id == None`).
4958    #[allow(dead_code)] // called by tests; production path uses _static
4959    pub(crate) fn tick_fork_fragment_observability(&self) {
4960        Self::tick_fork_fragment_observability_static(
4961            self.fork_id,
4962            self.fork_flush_count.clone(),
4963            self.fork_fragment_warn_fired.clone(),
4964            self.config.fork_fragment_warn_threshold,
4965        );
4966    }
4967
4968    /// Static variant of [`Writer::tick_fork_fragment_observability`].
4969    /// Used by the async-flush finalize path, where we hold a
4970    /// [`SharedFlushCtx`] bundle of Arcs rather than `&Writer`.
4971    pub(crate) fn tick_fork_fragment_observability_static(
4972        fork_id: Option<ForkId>,
4973        fork_flush_count: Arc<AtomicU64>,
4974        fork_fragment_warn_fired: Arc<AtomicBool>,
4975        warn_threshold: usize,
4976    ) {
4977        let Some(fork_id) = fork_id else { return };
4978        // `Relaxed` is sufficient: observational counter, no synchronizes-with.
4979        let new_count = fork_flush_count.fetch_add(1, Ordering::Relaxed) + 1;
4980        let fork_label = fork_id.to_string();
4981        metrics::gauge!(
4982            "uni_fork_l1_flushes",
4983            "fork" => fork_label.clone(),
4984        )
4985        .set(new_count as f64);
4986        let threshold = warn_threshold as u64;
4987        if !fork_fragment_warn_fired.load(Ordering::Relaxed)
4988            && threshold > 0
4989            && new_count >= threshold
4990        {
4991            fork_fragment_warn_fired.store(true, Ordering::Relaxed);
4992            tracing::warn!(
4993                fork = %fork_label,
4994                flush_count = new_count,
4995                threshold,
4996                "fork has exceeded the L1 flush-count threshold; \
4997                 fork compaction is deferred to Phase 5 — consider \
4998                 drop+recreate or promotion to bound fragment growth"
4999            );
5000        }
5001    }
5002
5003    /// Check rebuild thresholds and schedule background index rebuilds for
5004    /// labels that exceed growth or age limits. Marks affected indexes as
5005    /// `Stale` and spawns an async task to schedule the rebuild.
5006    #[allow(dead_code)] // production path uses _static; kept as the
5007    // documented instance entry point.
5008    fn schedule_index_rebuilds_if_needed(
5009        &self,
5010        manifest: &SnapshotManifest,
5011        rebuild_mgr: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
5012    ) {
5013        Self::schedule_index_rebuilds_if_needed_static(
5014            manifest,
5015            rebuild_mgr,
5016            self.schema_manager.clone(),
5017            self.config.index_rebuild.clone(),
5018        );
5019    }
5020
5021    /// Static variant of [`Writer::schedule_index_rebuilds_if_needed`].
5022    /// Used by the async-flush finalize path, where we hold the
5023    /// [`SchemaManager`] via `SharedFlushCtx` rather than `&Writer`.
5024    pub(crate) fn schedule_index_rebuilds_if_needed_static(
5025        manifest: &SnapshotManifest,
5026        rebuild_mgr: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
5027        schema_manager: Arc<uni_common::core::schema::SchemaManager>,
5028        index_rebuild_config: uni_common::config::IndexRebuildConfig,
5029    ) {
5030        let checker =
5031            crate::storage::index_rebuild::RebuildTriggerChecker::new(index_rebuild_config);
5032        let schema = schema_manager.schema();
5033        let labels = checker.labels_needing_rebuild(manifest, &schema.indexes);
5034
5035        if labels.is_empty() {
5036            return;
5037        }
5038
5039        // Mark affected indexes as Stale
5040        for label in &labels {
5041            for idx in &schema.indexes {
5042                if idx.label() == label {
5043                    let _ = schema_manager.update_index_metadata(idx.name(), |m| {
5044                        m.status = uni_common::core::schema::IndexStatus::Stale;
5045                    });
5046                }
5047            }
5048        }
5049
5050        tokio::spawn(async move {
5051            if let Err(e) = rebuild_mgr.schedule(labels).await {
5052                tracing::warn!("Failed to schedule index rebuild: {e}");
5053            }
5054        });
5055    }
5056}
5057
5058/// `FinalizeFn` implementation that the `FlushCoordinator` invokes from
5059/// its single-task finalizer loop. Unit struct on purpose: it must NOT
5060/// hold `Arc<Writer>` (that would create a reference cycle Writer ->
5061/// FlushCoordinator -> Arc<dyn FinalizeFn> -> Writer). All state needed
5062/// for finalize travels in via `SharedFlushCtx`.
5063pub(crate) struct WriterFinalizer;
5064
5065impl FinalizeFn for WriterFinalizer {
5066    fn finalize<'a>(
5067        &'a self,
5068        rotated: RotatedFlush,
5069        outcome: AsyncFlushOutcome,
5070        shared: SharedFlushCtx,
5071    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<String>> + Send + 'a>> {
5072        Box::pin(async move {
5073            // Read initial_size / initial_count from the rotated L0 so
5074            // we don't have to plumb them through the coordinator
5075            // submission. The buffer is still alive in pending_flush
5076            // until `complete_flush` (J) below pops it.
5077            let (initial_size, initial_count) = {
5078                let l0 = rotated.old_l0_arc.read();
5079                (l0.estimated_size, l0.mutation_count)
5080            };
5081            let result = Writer::flush_finalize_now(
5082                shared,
5083                rotated.old_l0_arc.clone(),
5084                rotated.wal_lsn,
5085                outcome.new_manifest,
5086                outcome.snapshot_id,
5087                initial_size,
5088                initial_count,
5089                std::time::Instant::now(),
5090            )
5091            .await;
5092            // `rotated` (permit + flush_in_progress_guard) drops here.
5093            drop(rotated.permit);
5094            result
5095        })
5096    }
5097
5098    fn finalize_failure<'a>(
5099        &'a self,
5100        rotated: RotatedFlush,
5101        err: anyhow::Error,
5102        _shared: SharedFlushCtx,
5103    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Error> + Send + 'a>> {
5104        Box::pin(async move {
5105            tracing::warn!(
5106                error = %err,
5107                seq = rotated.seq,
5108                "async flush stream failed; old L0 remains in pending_flush, \
5109                 WAL retains its data, recovery via WAL replay on restart"
5110            );
5111            metrics::counter!("uni_flush_failures_total").increment(1);
5112            // Permit + guard drop here so back-pressure releases even on
5113            // failure.
5114            drop(rotated.permit);
5115            err
5116        })
5117    }
5118}
5119
5120#[cfg(test)]
5121mod tests {
5122    use super::*;
5123    use tempfile::tempdir;
5124
5125    /// Test that commit_transaction writes mutations to WAL before merging to main L0.
5126    /// This verifies fix for issue #137 (transaction commit atomicity).
5127    #[tokio::test]
5128    async fn test_commit_transaction_wal_before_merge() -> Result<()> {
5129        use crate::runtime::wal::WriteAheadLog;
5130        use crate::storage::manager::StorageManager;
5131        use object_store::local::LocalFileSystem;
5132        use object_store::path::Path as ObjectStorePath;
5133        use uni_common::core::schema::SchemaManager;
5134
5135        let dir = tempdir()?;
5136        let path = dir.path().to_str().unwrap();
5137        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5138        let schema_path = ObjectStorePath::from("schema.json");
5139
5140        let schema_manager =
5141            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5142        let _label_id = schema_manager.add_label("Test")?;
5143        schema_manager.save().await?;
5144
5145        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5146
5147        // Create WAL for main L0
5148        let wal_path = ObjectStorePath::from("wal");
5149        let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
5150
5151        let writer = Writer::new_with_config(
5152            storage.clone(),
5153            schema_manager.clone(),
5154            1,
5155            UniConfig::default(),
5156            Some(wal),
5157            None,
5158        )
5159        .await?;
5160
5161        // Begin transaction — create a transaction L0
5162        let tx_l0 = writer.create_transaction_l0();
5163
5164        // Insert data in transaction
5165        let vid_a = writer.next_vid().await?;
5166        let vid_b = writer.next_vid().await?;
5167
5168        let mut props = std::collections::HashMap::new();
5169        props.insert("test".to_string(), Value::String("data".to_string()));
5170
5171        writer
5172            .insert_vertex_with_labels(vid_a, props.clone(), &["Test".to_string()], Some(&tx_l0))
5173            .await?;
5174        writer
5175            .insert_vertex_with_labels(
5176                vid_b,
5177                std::collections::HashMap::new(),
5178                &["Test".to_string()],
5179                Some(&tx_l0),
5180            )
5181            .await?;
5182
5183        let eid = writer.next_eid(1).await?;
5184        writer
5185            .insert_edge(
5186                vid_a,
5187                vid_b,
5188                1,
5189                eid,
5190                std::collections::HashMap::new(),
5191                None,
5192                Some(&tx_l0),
5193            )
5194            .await?;
5195
5196        // Get WAL before commit
5197        let l0 = writer.l0_manager.get_current();
5198        let wal = l0.read().wal.clone().expect("Main L0 should have WAL");
5199        let mutations_before = wal.replay().await?;
5200        let count_before = mutations_before.len();
5201
5202        // Commit transaction - this should write to WAL first
5203        let writer = Arc::new(writer);
5204        writer.commit_transaction_l0(tx_l0).await?;
5205
5206        // Verify WAL has the new mutations
5207        let mutations_after = wal.replay().await?;
5208        assert!(
5209            mutations_after.len() > count_before,
5210            "WAL should contain transaction mutations after commit"
5211        );
5212
5213        // Verify mutations are in correct order: vertices first, then edges
5214        let new_mutations: Vec<_> = mutations_after.into_iter().skip(count_before).collect();
5215
5216        let mut saw_vertex_a = false;
5217        let mut saw_vertex_b = false;
5218        let mut saw_edge = false;
5219
5220        for mutation in &new_mutations {
5221            match mutation {
5222                crate::runtime::wal::Mutation::InsertVertex { vid, .. } => {
5223                    if *vid == vid_a {
5224                        saw_vertex_a = true;
5225                    }
5226                    if *vid == vid_b {
5227                        saw_vertex_b = true;
5228                    }
5229                    // Vertices should come before edges
5230                    assert!(!saw_edge, "Vertices should be logged to WAL before edges");
5231                }
5232                crate::runtime::wal::Mutation::InsertEdge { eid: e, .. } => {
5233                    if *e == eid {
5234                        saw_edge = true;
5235                    }
5236                    // Edges should come after vertices
5237                    assert!(
5238                        saw_vertex_a && saw_vertex_b,
5239                        "Edge should be logged after both vertices"
5240                    );
5241                }
5242                _ => {}
5243            }
5244        }
5245
5246        assert!(saw_vertex_a, "Vertex A should be in WAL");
5247        assert!(saw_vertex_b, "Vertex B should be in WAL");
5248        assert!(saw_edge, "Edge should be in WAL");
5249
5250        // Verify data is also in main L0
5251        let l0_read = l0.read();
5252        assert!(
5253            l0_read.vertex_properties.contains_key(&vid_a),
5254            "Vertex A should be in main L0"
5255        );
5256        assert!(
5257            l0_read.vertex_properties.contains_key(&vid_b),
5258            "Vertex B should be in main L0"
5259        );
5260        assert!(
5261            l0_read.edge_endpoints.contains_key(&eid),
5262            "Edge should be in main L0"
5263        );
5264
5265        Ok(())
5266    }
5267
5268    /// Test that failed WAL flush leaves transaction intact for retry or rollback.
5269    #[tokio::test]
5270    async fn test_commit_transaction_wal_failure_rollback() -> Result<()> {
5271        use crate::runtime::wal::WriteAheadLog;
5272        use crate::storage::manager::StorageManager;
5273        use object_store::local::LocalFileSystem;
5274        use object_store::path::Path as ObjectStorePath;
5275        use uni_common::core::schema::SchemaManager;
5276
5277        let dir = tempdir()?;
5278        let path = dir.path().to_str().unwrap();
5279        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5280        let schema_path = ObjectStorePath::from("schema.json");
5281
5282        let schema_manager =
5283            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5284        let _label_id = schema_manager.add_label("Test")?;
5285        let _baseline_label_id = schema_manager.add_label("Baseline")?;
5286        let _txdata_label_id = schema_manager.add_label("TxData")?;
5287        schema_manager.save().await?;
5288
5289        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5290
5291        // Create WAL for main L0
5292        let wal_path = ObjectStorePath::from("wal");
5293        let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
5294
5295        let writer = Writer::new_with_config(
5296            storage.clone(),
5297            schema_manager.clone(),
5298            1,
5299            UniConfig::default(),
5300            Some(wal),
5301            None,
5302        )
5303        .await?;
5304
5305        // Insert baseline data (outside transaction)
5306        let baseline_vid = writer.next_vid().await?;
5307        writer
5308            .insert_vertex_with_labels(
5309                baseline_vid,
5310                [("baseline".to_string(), Value::Bool(true))]
5311                    .into_iter()
5312                    .collect(),
5313                &["Baseline".to_string()],
5314                None,
5315            )
5316            .await?;
5317
5318        // Begin transaction — create a transaction L0
5319        let tx_l0 = writer.create_transaction_l0();
5320
5321        // Insert data in transaction
5322        let tx_vid = writer.next_vid().await?;
5323        writer
5324            .insert_vertex_with_labels(
5325                tx_vid,
5326                [("tx_data".to_string(), Value::Bool(true))]
5327                    .into_iter()
5328                    .collect(),
5329                &["TxData".to_string()],
5330                Some(&tx_l0),
5331            )
5332            .await?;
5333
5334        // Capture main L0 state before rollback
5335        let l0 = writer.l0_manager.get_current();
5336        let vertex_count_before = l0.read().vertex_properties.len();
5337
5338        // Rollback transaction (simulating what would happen after WAL flush failure)
5339        drop(tx_l0);
5340
5341        // Verify main L0 is unchanged
5342        let vertex_count_after = l0.read().vertex_properties.len();
5343        assert_eq!(
5344            vertex_count_before, vertex_count_after,
5345            "Main L0 should not change after rollback"
5346        );
5347
5348        // Baseline should still be present
5349        assert!(
5350            l0.read().vertex_properties.contains_key(&baseline_vid),
5351            "Baseline data should remain"
5352        );
5353
5354        // Transaction data should NOT be in main L0
5355        assert!(
5356            !l0.read().vertex_properties.contains_key(&tx_vid),
5357            "Transaction data should not be in main L0 after rollback"
5358        );
5359
5360        Ok(())
5361    }
5362
5363    /// Test that batch insert with shared labels does not clone labels per vertex.
5364    /// This verifies fix for issue #161 (redundant label cloning).
5365    #[tokio::test]
5366    async fn test_batch_insert_shared_labels() -> Result<()> {
5367        use crate::storage::manager::StorageManager;
5368        use object_store::local::LocalFileSystem;
5369        use object_store::path::Path as ObjectStorePath;
5370        use uni_common::core::schema::SchemaManager;
5371
5372        let dir = tempdir()?;
5373        let path = dir.path().to_str().unwrap();
5374        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5375        let schema_path = ObjectStorePath::from("schema.json");
5376
5377        let schema_manager =
5378            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5379        let _label_id = schema_manager.add_label("Person")?;
5380        schema_manager.save().await?;
5381
5382        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5383
5384        let writer = Writer::new(storage.clone(), schema_manager.clone(), 1).await?;
5385
5386        // Shared labels - should not be cloned per vertex
5387        let labels = &["Person".to_string()];
5388
5389        // Insert batch of vertices with same labels
5390        let mut vids = Vec::new();
5391        for i in 0..100 {
5392            let vid = writer.next_vid().await?;
5393            let mut props = std::collections::HashMap::new();
5394            props.insert("id".to_string(), Value::Int(i));
5395            writer
5396                .insert_vertex_with_labels(vid, props, labels, None)
5397                .await?;
5398            vids.push(vid);
5399        }
5400
5401        // Verify all vertices have the correct labels
5402        let l0 = writer.l0_manager.get_current();
5403        for vid in vids {
5404            let l0_guard = l0.read();
5405            let vertex_labels = l0_guard.vertex_labels.get(&vid);
5406            assert!(vertex_labels.is_some(), "Vertex should have labels");
5407            assert_eq!(
5408                vertex_labels.unwrap(),
5409                &vec!["Person".to_string()],
5410                "Labels should match"
5411            );
5412        }
5413
5414        Ok(())
5415    }
5416
5417    /// Test that estimated_size tracks mutations correctly and approximates size_bytes().
5418    /// This verifies fix for issue #147 (O(V+E) size_bytes() in metrics).
5419    #[tokio::test]
5420    async fn test_estimated_size_tracks_mutations() -> Result<()> {
5421        use crate::storage::manager::StorageManager;
5422        use object_store::local::LocalFileSystem;
5423        use object_store::path::Path as ObjectStorePath;
5424        use uni_common::core::schema::SchemaManager;
5425
5426        let dir = tempdir()?;
5427        let path = dir.path().to_str().unwrap();
5428        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5429        let schema_path = ObjectStorePath::from("schema.json");
5430
5431        let schema_manager =
5432            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5433        let _label_id = schema_manager.add_label("Test")?;
5434        schema_manager.save().await?;
5435
5436        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5437
5438        let writer = Writer::new(storage.clone(), schema_manager.clone(), 1).await?;
5439
5440        let l0 = writer.l0_manager.get_current();
5441
5442        // Initial state should be empty
5443        let initial_estimated = l0.read().estimated_size;
5444        let initial_actual = l0.read().size_bytes();
5445        assert_eq!(initial_estimated, 0, "Initial estimated_size should be 0");
5446        assert_eq!(initial_actual, 0, "Initial size_bytes should be 0");
5447
5448        // Insert vertices with properties
5449        let mut vids = Vec::new();
5450        for i in 0..10 {
5451            let vid = writer.next_vid().await?;
5452            let mut props = std::collections::HashMap::new();
5453            props.insert("name".to_string(), Value::String(format!("vertex_{}", i)));
5454            props.insert("index".to_string(), Value::Int(i));
5455            writer
5456                .insert_vertex_with_labels(vid, props, &[], None)
5457                .await?;
5458            vids.push(vid);
5459        }
5460
5461        // Verify estimated_size grew
5462        let after_vertices_estimated = l0.read().estimated_size;
5463        let after_vertices_actual = l0.read().size_bytes();
5464        assert!(
5465            after_vertices_estimated > 0,
5466            "estimated_size should grow after insertions"
5467        );
5468
5469        // Verify estimated_size is within reasonable bounds of actual size (within 2x)
5470        let ratio = after_vertices_estimated as f64 / after_vertices_actual as f64;
5471        assert!(
5472            (0.5..=2.0).contains(&ratio),
5473            "estimated_size ({}) should be within 2x of size_bytes ({}), ratio: {}",
5474            after_vertices_estimated,
5475            after_vertices_actual,
5476            ratio
5477        );
5478
5479        // Insert edges with a simple edge type
5480        let edge_type = 1u32;
5481        for i in 0..9 {
5482            let eid = writer.next_eid(edge_type).await?;
5483            writer
5484                .insert_edge(
5485                    vids[i],
5486                    vids[i + 1],
5487                    edge_type,
5488                    eid,
5489                    std::collections::HashMap::new(),
5490                    Some("NEXT".to_string()),
5491                    None,
5492                )
5493                .await?;
5494        }
5495
5496        // Verify estimated_size grew further
5497        let after_edges_estimated = l0.read().estimated_size;
5498        let after_edges_actual = l0.read().size_bytes();
5499        assert!(
5500            after_edges_estimated > after_vertices_estimated,
5501            "estimated_size should grow after edge insertions"
5502        );
5503
5504        // Verify still within reasonable bounds
5505        let ratio = after_edges_estimated as f64 / after_edges_actual as f64;
5506        assert!(
5507            (0.5..=2.0).contains(&ratio),
5508            "estimated_size ({}) should be within 2x of size_bytes ({}), ratio: {}",
5509            after_edges_estimated,
5510            after_edges_actual,
5511            ratio
5512        );
5513
5514        Ok(())
5515    }
5516
5517    /// Test that flushing WAL on a writer with no mutations succeeds cleanly.
5518    #[tokio::test]
5519    async fn test_flush_wal_empty_l0_is_noop() -> Result<()> {
5520        use crate::runtime::wal::WriteAheadLog;
5521        use crate::storage::manager::StorageManager;
5522        use object_store::local::LocalFileSystem;
5523        use object_store::path::Path as ObjectStorePath;
5524        use uni_common::core::schema::SchemaManager;
5525
5526        let dir = tempdir()?;
5527        let path = dir.path().to_str().unwrap();
5528        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5529        let schema_path = ObjectStorePath::from("schema.json");
5530
5531        let schema_manager =
5532            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5533        schema_manager.save().await?;
5534
5535        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5536
5537        let wal_path = ObjectStorePath::from("wal");
5538        let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
5539
5540        let writer = Writer::new_with_config(
5541            storage.clone(),
5542            schema_manager.clone(),
5543            1,
5544            UniConfig::default(),
5545            Some(wal.clone()),
5546            None,
5547        )
5548        .await?;
5549
5550        // Flush with no mutations — should succeed cleanly
5551        let lsn = writer.flush_wal().await?;
5552        // LSN should be 0 or 1 (no real mutations flushed)
5553        assert!(lsn <= 1, "Empty flush should produce low LSN, got {}", lsn);
5554
5555        Ok(())
5556    }
5557
5558    /// Test that transaction data does not leak into main L0 without commit.
5559    #[tokio::test]
5560    async fn test_transaction_isolation_without_commit() -> Result<()> {
5561        use crate::runtime::wal::WriteAheadLog;
5562        use crate::storage::manager::StorageManager;
5563        use object_store::local::LocalFileSystem;
5564        use object_store::path::Path as ObjectStorePath;
5565        use uni_common::core::schema::SchemaManager;
5566
5567        let dir = tempdir()?;
5568        let path = dir.path().to_str().unwrap();
5569        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5570        let schema_path = ObjectStorePath::from("schema.json");
5571
5572        let schema_manager =
5573            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5574        let _label_id = schema_manager.add_label("Person")?;
5575        schema_manager.save().await?;
5576
5577        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5578
5579        let wal_path = ObjectStorePath::from("wal");
5580        let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
5581
5582        let writer = Writer::new_with_config(
5583            storage.clone(),
5584            schema_manager.clone(),
5585            1,
5586            UniConfig::default(),
5587            Some(wal),
5588            None,
5589        )
5590        .await?;
5591
5592        // Create transaction L0
5593        let tx_l0 = writer.create_transaction_l0();
5594
5595        // Insert vertex into transaction L0
5596        let vid = writer.next_vid().await?;
5597        writer
5598            .insert_vertex_with_labels(
5599                vid,
5600                [("name".to_string(), Value::String("Ghost".to_string()))]
5601                    .into_iter()
5602                    .collect(),
5603                &["Person".to_string()],
5604                Some(&tx_l0),
5605            )
5606            .await?;
5607
5608        // Verify data is in transaction L0
5609        assert!(
5610            tx_l0.read().vertex_properties.contains_key(&vid),
5611            "Transaction L0 should contain the vertex"
5612        );
5613
5614        // Verify data is NOT in main L0
5615        let main_l0 = writer.l0_manager.get_current();
5616        assert!(
5617            !main_l0.read().vertex_properties.contains_key(&vid),
5618            "Main L0 should NOT contain uncommitted transaction data"
5619        );
5620
5621        // Drop transaction without committing — data should be lost
5622        drop(tx_l0);
5623
5624        // Main L0 still should not have it
5625        assert!(
5626            !main_l0.read().vertex_properties.contains_key(&vid),
5627            "Main L0 should remain clean after dropped transaction"
5628        );
5629
5630        Ok(())
5631    }
5632
5633    /// Phase 2 Day 12: the fork-fragment warn fires exactly once when
5634    /// the flush count crosses the configured threshold and stays
5635    /// silent on subsequent flushes for the lifetime of the writer.
5636    /// Primary writers (`fork_id == None`) never fire it.
5637    ///
5638    /// Tested directly against `tick_fork_fragment_observability` so
5639    /// the contract is locked in independently of the broader
5640    /// `flush_to_l1` path (the end-to-end fork-flush path is blocked
5641    /// on Day 10's on-the-fly schema overlay growth).
5642    #[tokio::test]
5643    async fn fork_fragment_warn_fires_once_then_silences() -> Result<()> {
5644        use crate::storage::manager::StorageManager;
5645        use object_store::local::LocalFileSystem;
5646        use object_store::path::Path as ObjectStorePath;
5647        use uni_common::core::fork::ForkId;
5648        use uni_common::core::schema::SchemaManager;
5649
5650        let dir = tempdir()?;
5651        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5652        let schema_path = ObjectStorePath::from("schema.json");
5653        let schema_manager =
5654            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5655        let storage = Arc::new(
5656            StorageManager::new(dir.path().to_str().unwrap(), schema_manager.clone()).await?,
5657        );
5658
5659        let config = UniConfig {
5660            fork_fragment_warn_threshold: 3,
5661            ..Default::default()
5662        };
5663        let mut writer =
5664            Writer::new_with_config(storage, schema_manager, 1, config, None, None).await?;
5665
5666        // Primary path: never fires.
5667        for _ in 0..10 {
5668            writer.tick_fork_fragment_observability();
5669        }
5670        assert!(!writer.fork_fragment_warn_fired.load(Ordering::Relaxed));
5671        assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 0);
5672
5673        // Fork path: tag and tick. Below threshold → no fire.
5674        writer.fork_id = Some(ForkId::new());
5675        writer.tick_fork_fragment_observability();
5676        writer.tick_fork_fragment_observability();
5677        assert!(!writer.fork_fragment_warn_fired.load(Ordering::Relaxed));
5678        assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 2);
5679
5680        // Crossing threshold → fires once.
5681        writer.tick_fork_fragment_observability();
5682        assert!(writer.fork_fragment_warn_fired.load(Ordering::Relaxed));
5683        assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 3);
5684
5685        // Subsequent ticks bump the gauge but do not re-fire.
5686        let fired_after = writer.fork_fragment_warn_fired.load(Ordering::Relaxed);
5687        for _ in 0..5 {
5688            writer.tick_fork_fragment_observability();
5689        }
5690        assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 8);
5691        assert_eq!(
5692            writer.fork_fragment_warn_fired.load(Ordering::Relaxed),
5693            fired_after
5694        );
5695
5696        Ok(())
5697    }
5698
5699    /// The hot-path mutators must not write to any `Writer` struct field.
5700    /// Phase 2 of the refactor
5701    /// gave them `&self` receivers, which the compiler enforces against
5702    /// direct `self.x = y` assignment — but interior-mutable writes
5703    /// (Mutex/Atomic/OnceLock) still compile. This regression test snapshots
5704    /// every potentially-writable field, calls each hot-path mutator, and
5705    /// asserts no field changed.
5706    ///
5707    /// Cold-path methods (`flush_to_l1`, `commit_transaction_l0`,
5708    /// `tick_fork_fragment_observability`) DO mutate fields by design and
5709    /// are intentionally out of scope here.
5710    #[tokio::test]
5711    async fn hot_path_mutators_do_not_change_writer_fields() -> Result<()> {
5712        use crate::storage::manager::StorageManager;
5713        use object_store::local::LocalFileSystem;
5714        use object_store::path::Path as ObjectStorePath;
5715        use uni_common::core::schema::SchemaManager;
5716
5717        let dir = tempdir()?;
5718        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5719        let schema_path = ObjectStorePath::from("schema.json");
5720        let schema_manager =
5721            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5722        schema_manager.add_label("Person")?;
5723        schema_manager.save().await?;
5724        let storage = Arc::new(
5725            StorageManager::new(dir.path().to_str().unwrap(), schema_manager.clone()).await?,
5726        );
5727
5728        let writer =
5729            Writer::new_with_config(storage, schema_manager, 1, UniConfig::default(), None, None)
5730                .await?;
5731
5732        /// Captures every `Writer` field that *could* be written by a
5733        /// hot-path mutator (i.e., every non-Arc, non-immutable-after-
5734        /// construction field). Arc'd substructures (`l0_manager`,
5735        /// `storage`, etc.) are intentionally not checked — they are
5736        /// re-pointed only at construction.
5737        #[derive(Debug, PartialEq)]
5738        struct Snapshot {
5739            last_flush_time: std::time::Instant,
5740            cached_manifest_some: bool,
5741            fork_flush_count: u64,
5742            fork_fragment_warn_fired: bool,
5743            xervo_runtime_some: bool,
5744            index_rebuild_manager_some: bool,
5745            fork_id: Option<ForkId>,
5746        }
5747
5748        fn snap(w: &Writer) -> Snapshot {
5749            Snapshot {
5750                last_flush_time: *w.last_flush_time.lock(),
5751                cached_manifest_some: w.cached_manifest.lock().is_some(),
5752                fork_flush_count: w.fork_flush_count.load(Ordering::Relaxed),
5753                fork_fragment_warn_fired: w.fork_fragment_warn_fired.load(Ordering::Relaxed),
5754                xervo_runtime_some: w.xervo_runtime.get().is_some(),
5755                index_rebuild_manager_some: w.index_rebuild_manager.get().is_some(),
5756                fork_id: w.fork_id,
5757            }
5758        }
5759
5760        // 1. insert_vertex_with_labels
5761        let before = snap(&writer);
5762        let vid = writer.next_vid().await?;
5763        writer
5764            .insert_vertex_with_labels(vid, Properties::new(), &["Person".to_string()], None)
5765            .await?;
5766        assert_eq!(
5767            snap(&writer),
5768            before,
5769            "insert_vertex_with_labels mutated a Writer field"
5770        );
5771
5772        // 2. insert_vertices_batch
5773        let before = snap(&writer);
5774        let vids = writer.allocate_vids(2).await?;
5775        writer
5776            .insert_vertices_batch(
5777                vids,
5778                vec![Properties::new(), Properties::new()],
5779                vec!["Person".into()],
5780                None,
5781            )
5782            .await?;
5783        assert_eq!(
5784            snap(&writer),
5785            before,
5786            "insert_vertices_batch mutated a Writer field"
5787        );
5788
5789        // 3. delete_vertex
5790        let before = snap(&writer);
5791        writer.delete_vertex(vid, None, None).await?;
5792        assert_eq!(
5793            snap(&writer),
5794            before,
5795            "delete_vertex mutated a Writer field"
5796        );
5797
5798        // (insert_edge / delete_edge are skipped here: their fixture cost is
5799        // disproportionate to the audit's marginal value, and the same
5800        // structural argument plus the compiler-enforced `&self` covers them.)
5801
5802        Ok(())
5803    }
5804}