Skip to main content

uni_store/runtime/
writer.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::runtime::context::QueryContext;
5use crate::runtime::flush_coordinator::{
6    FinalizeFn, FlushCoordinator, FlushOutcome as AsyncFlushOutcome, RotatedFlush, SharedFlushCtx,
7};
8use crate::runtime::id_allocator::IdAllocator;
9use crate::runtime::l0::{L0Buffer, serialize_constraint_key};
10use crate::runtime::l0_manager::L0Manager;
11use crate::runtime::property_manager::PropertyManager;
12use crate::runtime::wal::WriteAheadLog;
13use crate::storage::adjacency_manager::AdjacencyManager;
14use crate::storage::delta::{L1Entry, Op};
15use crate::storage::main_edge::MainEdgeDataset;
16use crate::storage::main_vertex::MainVertexDataset;
17use crate::storage::manager::StorageManager;
18use anyhow::{Result, anyhow};
19use chrono::Utc;
20use metrics;
21use parking_lot::{Mutex as PlMutex, RwLock};
22use std::collections::{BTreeMap, HashMap, HashSet};
23use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
24use std::sync::{Arc, OnceLock};
25use tracing::{debug, info, instrument};
26use uni_common::Properties;
27use uni_common::Value;
28use uni_common::config::UniConfig;
29use uni_common::core::fork::ForkId;
30use uni_common::core::id::{Eid, Vid};
31use uni_common::core::schema::{ConstraintTarget, ConstraintType, IndexDefinition};
32use uni_common::core::snapshot::{EdgeSnapshot, LabelSnapshot, SnapshotManifest};
33use uni_xervo::runtime::ModelRuntime;
34use uuid::Uuid;
35
36#[derive(Clone, Debug)]
37pub struct WriterConfig {
38    pub max_mutations: usize,
39    /// Enable the partial-column MergeInsert path for SET-only flushes.
40    ///
41    /// When `true`, `Writer::insert_vertex_partial` records the touched
42    /// property keys into `L0Buffer::vertex_partial_keys` and the flush
43    /// routes those VIDs through Lance `MergeInsertBuilder` with a
44    /// subset-of-schema source, skipping the read of (and write of)
45    /// the unchanged columns — including wide ones like embeddings.
46    ///
47    /// When `false`, `insert_vertex_partial` falls back to the
48    /// read-modify-write `insert_vertex_with_labels` path (preserving
49    /// bit-for-bit equivalence with prior releases). Default `false`
50    /// for the first release; flip to `true` after telemetry on the
51    /// issue #72 ingest workload confirms the win.
52    ///
53    /// See the soundness probe at
54    /// `crates/uni-store/tests/common/storage/lance_merge_insert_probe.rs`.
55    pub partial_lance_writes: bool,
56}
57
58impl Default for WriterConfig {
59    fn default() -> Self {
60        Self {
61            max_mutations: 10_000,
62            partial_lance_writes: false,
63        }
64    }
65}
66
67/// Parent state captured atomically at a fork point under `flush_lock`.
68///
69/// Holds the allocator high-water marks and every existing dataset's
70/// Lance main-branch version at the instant the parent's L0 was
71/// flushed. Because [`Writer::flush_and_capture_fork_point`] reads these
72/// while still holding `flush_lock`, no concurrent commit or flush can
73/// advance the allocator or any dataset tip between the flush and the
74/// reads. A fork built from these values therefore cannot collide VIDs
75/// with the parent nor inherit rows committed after the fork point.
76#[derive(Clone, Debug, Default)]
77pub struct ForkPoint {
78    /// Next vertex id the parent would allocate at the fork point.
79    pub vid_hwm: u64,
80    /// Next edge id the parent would allocate at the fork point.
81    pub eid_hwm: u64,
82    /// `dataset_name` → Lance main-branch version at the fork point.
83    ///
84    /// Keys use the same dataset naming as the fork branch loop
85    /// (`vertices`, `edges`, `vertices_{label}`, `deltas_{type}_{dir}`,
86    /// `adjacency_{type}_{dir}`). A dataset with no `.lance` directory
87    /// on disk at the fork point has no entry.
88    pub dataset_versions: BTreeMap<String, u64>,
89    /// Parent's MVCC version high-water-mark at the fork point: the
90    /// largest `_version` any inherited row can carry. A fork bootstraps
91    /// its own version counter to this floor so a fork transaction's
92    /// `_version <= pin` read still sees inherited (base_paths) rows,
93    /// while the fork's own writes get versions above it.
94    pub version_hwm: u64,
95}
96
97/// RAII latch on [`StorageManager::flush_in_progress`].
98///
99/// Sets the flag to `true` on construction (via CAS) and back to `false` on
100/// drop, so any `?` early-exit inside `flush_to_l1` cannot leave the flag
101/// stuck. Returns `None` if a flush is already in progress, providing
102/// forward-compatible exclusion once the outer writer-RwLock is removed in
103/// Phase 4 of the concurrent-writer refactor.
104// FlushInProgressGuard moved to storage/manager.rs so flush_coordinator.rs
105// can hold it on RotatedFlush without a writer.rs back-import cycle.
106pub use crate::storage::manager::FlushInProgressGuard;
107
108/// Output of [`Writer::flush_l0_rotate`]: the to-be-flushed L0 buffer,
109/// captured WAL LSN, current_version, and the in-progress guard whose
110/// lifetime spans the full flush (including the future async stream
111/// phase that runs on a spawned task).
112struct RotateOutput {
113    old_l0_arc: Arc<RwLock<L0Buffer>>,
114    wal_lsn: u64,
115    current_version: u64,
116    flush_in_progress_guard: FlushInProgressGuard,
117}
118
119/// Project a property map to a subset selected by `keys`. Used to
120/// run `touched_needs_full_read` against just the SET-touched keys
121/// when the caller passes a fully-merged `props` map.
122fn props_subset(props: &Properties, keys: &HashSet<String>) -> Properties {
123    let mut out = Properties::new();
124    for k in keys {
125        if let Some(v) = props.get(k) {
126            out.insert(k.clone(), v.clone());
127        }
128    }
129    out
130}
131
132/// Join a storage base URI and a dataset name into a `.lance` URI.
133///
134/// Mirrors the fork branch loop's `join_uri` so the versions captured
135/// by [`Writer::flush_and_capture_fork_point`] key the exact same
136/// datasets the fork later branches.
137fn join_lance_uri(base: &str, dataset: &str) -> String {
138    if base.ends_with('/') {
139        format!("{base}{dataset}.lance")
140    } else {
141        format!("{base}/{dataset}.lance")
142    }
143}
144
145/// Cheap on-disk existence check for a dataset `.lance` directory.
146///
147/// Local-fs heuristic: a URI with a `://` scheme is assumed remote and
148/// reported present, deferring the real check to `current_version`.
149/// Mirrors the fork branch loop's `path_exists`.
150fn lance_path_exists(uri: &str) -> bool {
151    if uri.contains("://") {
152        return true;
153    }
154    std::path::Path::new(uri).exists()
155}
156
157/// Output of [`Writer::flush_stream_l1`]: the built (but not yet
158/// published) snapshot manifest and its id. Finalize is responsible
159/// for `save_snapshot` + `set_latest_snapshot` + `cached_manifest`
160/// update.
161struct FlushOutcome {
162    manifest: SnapshotManifest,
163    snapshot_id: String,
164}
165
166pub struct Writer {
167    pub l0_manager: Arc<L0Manager>,
168    pub storage: Arc<StorageManager>,
169    pub schema_manager: Arc<uni_common::core::schema::SchemaManager>,
170    pub allocator: Arc<IdAllocator>,
171    pub config: UniConfig,
172    /// Optional embedding runtime. `OnceLock` so the initializer can run
173    /// on `&self` after the `Writer` has been wrapped in `Arc<Writer>`
174    /// (Phase 4 of concurrent_writer.md). Read through
175    /// [`Writer::xervo_runtime`] — the field itself is private to keep
176    /// callers oblivious to the OnceLock representation.
177    xervo_runtime: OnceLock<Arc<ModelRuntime>>,
178    /// Property manager for cache invalidation after flush
179    pub property_manager: Option<Arc<PropertyManager>>,
180    /// Adjacency manager for dual-write (edges survive flush).
181    adjacency_manager: Arc<AdjacencyManager>,
182    /// Timestamp of last flush or creation. Interior-mutable so that
183    /// `&self` callers can update it; uncontended in practice because all
184    /// writes happen inside the single-flusher critical section.
185    /// Arc-wrapped so it can travel into the SharedFlushCtx that the
186    /// async-flush coordinator passes to spawned stream/finalize tasks.
187    last_flush_time: Arc<PlMutex<std::time::Instant>>,
188    /// Background compaction task handle (prevents concurrent compaction races)
189    compaction_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
190    /// Optional index rebuild manager for post-flush automatic rebuild scheduling.
191    /// `OnceLock` for the same reason as `xervo_runtime`.
192    /// Wrapped in `Arc` so the async-flush finalize path can read it
193    /// from a spawned task via `SharedFlushCtx`.
194    index_rebuild_manager: Arc<OnceLock<Arc<crate::storage::index_rebuild::IndexRebuildManager>>>,
195    /// Cached snapshot manifest from the last flush. Avoids re-reading from
196    /// object store on every flush_to_l1 call. Wrapped in a `Mutex` for
197    /// `&self` access; uncontended because all access is inside the
198    /// single-flusher critical section.
199    cached_manifest: Arc<PlMutex<Option<SnapshotManifest>>>,
200    /// Identifier of the fork this writer serves, if any. `None` for
201    /// primary's writer. Set by [`crate::fork::writer_factory::new_for_fork`]
202    /// and read in `flush_to_l1` to emit fork-tagged metrics and to fire
203    /// the fragment-count guard rail (Phase 2 Day 12).
204    pub fork_id: Option<ForkId>,
205    /// Number of `flush_to_l1` calls since this writer was constructed.
206    /// Used as a proxy for L1 fragment growth on the fork's branches:
207    /// each flush typically appends ~1 fragment per touched dataset, so
208    /// the count tracks the order of magnitude of fragment accumulation.
209    /// Reading the actual `Dataset::manifest().fragments.len()` per
210    /// flush would add a per-dataset object-store roundtrip on the hot
211    /// commit path; the proxy keeps the guard rail purely observational
212    /// (Phase 5 introduces fork compaction proper). Only meaningful when
213    /// `fork_id.is_some()`. `Relaxed` is sufficient — observational only.
214    fork_flush_count: Arc<AtomicU64>,
215    /// Whether the fork-fragment warning has already fired at the
216    /// configured threshold. One-shot per writer lifetime. `Relaxed` is
217    /// sufficient — observational only.
218    fork_fragment_warn_fired: Arc<AtomicBool>,
219    /// Dedicated lock for the genuinely-exclusive flush path. Acquired by
220    /// the [`Writer::flush_to_l1`] entry and by `commit_transaction_l0`
221    /// across its WAL-append + L0-merge window. Replaces the outer
222    /// `Arc<RwLock<Writer>>` for flush exclusion once Phase 4 drops it.
223    /// Arc-wrapped so async-flush coordinator's finalize path can
224    /// re-acquire it from a spawned task via SharedFlushCtx.
225    flush_lock: Arc<tokio::sync::Mutex<()>>,
226    /// Coordinator for async-flush pipeline. Owns the back-pressure
227    /// semaphore, rotate-order sequence, single-finalizer task, and
228    /// pending-flush counter. Always present even when async flush is
229    /// disabled — the sync `flush_to_l1` path uses it for the future
230    /// `FlushInProgressGuard`/permit ownership model.
231    /// Coordinator is `None` when `async_flush_enabled = false`. The
232    /// coordinator's finalizer task captures `SharedFlushCtx` which
233    /// includes `Arc<StorageManager>`; on a fork-scoped Writer that
234    /// also pins the fork's `ForkScope` via `storage.fork_scope`, so
235    /// the holder count never drops. Constructing it only when the
236    /// feature is actually on avoids that side-effect for all
237    /// existing sync-flush paths. When async-flush graduates from
238    /// opt-in to default (Commit 12), `drop_fork` (Commit 8) handles
239    /// the drain explicitly.
240    #[allow(dead_code)] // first production use lands in Commit 6/7
241    pub(crate) flush_coordinator: Option<Arc<crate::runtime::flush_coordinator::FlushCoordinator>>,
242    /// Optimistic-concurrency commit-sequence counter (SSI). Incremented once
243    /// per successful commit under `flush_lock`; a transaction captures the
244    /// current value at begin as its read sequence (`L0Buffer::occ_read_seq`).
245    ///
246    /// Always allocated; consulted only when `config.ssi_enabled` is `true`.
247    ///
248    /// Typed through the [`crate::runtime::sync`] shim so the OCC commit core can
249    /// be model-checked under loom/shuttle; aliases to `std::AtomicU64` normally.
250    commit_sequence: Arc<crate::runtime::sync::AtomicU64>,
251    /// Bounded log of recently-committed write-sets for OCC conflict detection.
252    /// Read and updated only under `flush_lock`.
253    ///
254    /// Always allocated; consulted only when `config.ssi_enabled` is `true`.
255    committed_writes: Arc<PlMutex<crate::runtime::occ::CommitRegistry>>,
256    /// Per-row pessimistic locks for `FOR UPDATE` (SSI escape hatch), keyed by
257    /// canonical (label, key-props) bytes. A transaction holds the lock from
258    /// MATCH until commit/rollback, serializing concurrent `FOR UPDATE` writers
259    /// on the same key (avoiding optimistic abort-retry on hot keys).
260    ///
261    /// Always allocated; populated only when `config.ssi_enabled` is `true`.
262    for_update_locks: Arc<dashmap::DashMap<Vec<u8>, Arc<tokio::sync::Mutex<()>>>>,
263}
264
265/// Number of recent commits retained for OCC conflict detection. Large enough
266/// that under-run — and the resulting conservative abort — is rare in practice;
267/// each entry is a small set of touched ids.
268const OCC_REGISTRY_CAPACITY: usize = 4096;
269
270impl Writer {
271    pub async fn new(
272        storage: Arc<StorageManager>,
273        schema_manager: Arc<uni_common::core::schema::SchemaManager>,
274        start_version: u64,
275    ) -> Result<Self> {
276        Self::new_with_config(
277            storage,
278            schema_manager,
279            start_version,
280            UniConfig::default(),
281            None,
282            None,
283        )
284        .await
285    }
286
287    pub async fn new_with_config(
288        storage: Arc<StorageManager>,
289        schema_manager: Arc<uni_common::core::schema::SchemaManager>,
290        start_version: u64,
291        config: UniConfig,
292        wal: Option<Arc<WriteAheadLog>>,
293        allocator: Option<Arc<IdAllocator>>,
294    ) -> Result<Self> {
295        let allocator = if let Some(a) = allocator {
296            a
297        } else {
298            let store = storage.store();
299            let path = object_store::path::Path::from("id_allocator.json");
300            Arc::new(IdAllocator::new(store, path, 1000).await?)
301        };
302
303        let l0_manager = Arc::new(L0Manager::new(start_version, wal));
304
305        let property_manager = Some(Arc::new(PropertyManager::new(
306            storage.clone(),
307            schema_manager.clone(),
308            1000,
309        )));
310
311        let adjacency_manager = storage.adjacency_manager();
312
313        // Hoist the Arc'd fields so we can both stash them on Writer and
314        // hand the same Arcs to the SharedFlushCtx that FlushCoordinator
315        // captures. Single-source-of-truth for each piece of mutable
316        // shared state.
317        let last_flush_time = Arc::new(PlMutex::new(std::time::Instant::now()));
318        let cached_manifest = Arc::new(PlMutex::new(None));
319        let fork_flush_count = Arc::new(AtomicU64::new(0));
320        let fork_fragment_warn_fired = Arc::new(AtomicBool::new(false));
321        let flush_lock = Arc::new(tokio::sync::Mutex::new(()));
322        let compaction_handle = Arc::new(RwLock::new(None));
323        let index_rebuild_manager: Arc<
324            OnceLock<Arc<crate::storage::index_rebuild::IndexRebuildManager>>,
325        > = Arc::new(OnceLock::new());
326
327        let flush_coordinator = if config.async_flush_enabled {
328            let shared = SharedFlushCtx {
329                storage: storage.clone(),
330                l0_manager: l0_manager.clone(),
331                adjacency_manager: adjacency_manager.clone(),
332                property_manager: property_manager.clone(),
333                schema_manager: schema_manager.clone(),
334                cached_manifest: cached_manifest.clone(),
335                last_flush_time: last_flush_time.clone(),
336                fork_id: None,
337                fork_flush_count: fork_flush_count.clone(),
338                fork_fragment_warn_fired: fork_fragment_warn_fired.clone(),
339                fork_fragment_warn_threshold: config.fork_fragment_warn_threshold,
340                flush_lock: flush_lock.clone(),
341                index_rebuild_manager: index_rebuild_manager.clone(),
342                compaction_handle: compaction_handle.clone(),
343                compaction_config: config.compaction.clone(),
344                index_rebuild_config: config.index_rebuild.clone(),
345                auto_rebuild_enabled: config.index_rebuild.auto_rebuild_enabled,
346            };
347            let finalize_fn: Arc<dyn FinalizeFn> = Arc::new(WriterFinalizer);
348            Some(Arc::new(FlushCoordinator::new(
349                config.max_pending_flushes,
350                shared,
351                finalize_fn,
352            )))
353        } else {
354            None
355        };
356
357        let commit_sequence = Arc::new(crate::runtime::sync::AtomicU64::new(0));
358        let committed_writes = Arc::new(PlMutex::new(crate::runtime::occ::CommitRegistry::new(
359            OCC_REGISTRY_CAPACITY,
360        )));
361        let for_update_locks = Arc::new(dashmap::DashMap::new());
362
363        Ok(Self {
364            l0_manager,
365            storage,
366            schema_manager,
367            allocator,
368            config,
369            xervo_runtime: OnceLock::new(),
370            property_manager,
371            adjacency_manager,
372            last_flush_time,
373            compaction_handle,
374            index_rebuild_manager,
375            cached_manifest,
376            fork_id: None,
377            fork_flush_count,
378            fork_fragment_warn_fired,
379            flush_lock,
380            flush_coordinator,
381            commit_sequence,
382            committed_writes,
383            for_update_locks,
384        })
385    }
386
387    /// Returns the shared pessimistic lock handle for a `FOR UPDATE` row key,
388    /// creating it on first use. The caller `.lock_owned().await`s the returned
389    /// mutex and holds the guard for the transaction's lifetime.
390    pub fn row_lock_handle(&self, key: &[u8]) -> Arc<tokio::sync::Mutex<()>> {
391        self.for_update_locks
392            .entry(key.to_vec())
393            .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
394            .clone()
395    }
396
397    /// Prunes `FOR UPDATE` lock-map entries for `keys` that no live transaction
398    /// holds anymore, so the map does not grow without bound across the keyspace.
399    ///
400    /// Called when a transaction ends, **after** its guards have been dropped.
401    /// `remove_if` evaluates its predicate under the DashMap shard lock, which is
402    /// the same lock `row_lock_handle` takes to clone an entry — so the check
403    /// `strong_count == 1` (only the map holds the `Arc`) is race-free: a
404    /// concurrent acquirer either already cloned the `Arc` (count ≥ 2 → we skip
405    /// removal) or has not yet taken the shard lock (it will mint a fresh entry
406    /// after we remove). Either way no two transactions ever lock different
407    /// `Mutex` instances for the same key.
408    pub fn release_for_update_locks(&self, keys: &[Vec<u8>]) {
409        for key in keys {
410            self.for_update_locks
411                .remove_if(key, |_, handle| Arc::strong_count(handle) == 1);
412        }
413    }
414
415    /// Number of live entries in the `FOR UPDATE` lock map. Introspection for
416    /// tests that the map does not leak entries across transactions (G5).
417    pub fn for_update_lock_count(&self) -> usize {
418        self.for_update_locks.len()
419    }
420
421    /// The current OCC commit sequence. A `FOR UPDATE` acquisition re-stamps a
422    /// fresh transaction's `occ_read_seq` to this so its conflict-detection
423    /// baseline advances to lock-acquisition time (read-latest under the lock).
424    pub fn current_commit_sequence(&self) -> u64 {
425        self.commit_sequence
426            .load(crate::runtime::sync::Ordering::Relaxed)
427    }
428
429    /// Build a fresh `SharedFlushCtx` from this Writer's current state.
430    /// Used by the async-flush stream/finalize paths to pass into spawned
431    /// tasks without smuggling `Arc<Writer>` (which would create a cycle
432    /// with `flush_coordinator -> FinalizeFn -> Writer`).
433    pub(crate) fn shared_ctx(&self) -> SharedFlushCtx {
434        SharedFlushCtx {
435            storage: self.storage.clone(),
436            l0_manager: self.l0_manager.clone(),
437            adjacency_manager: self.adjacency_manager.clone(),
438            property_manager: self.property_manager.clone(),
439            schema_manager: self.schema_manager.clone(),
440            cached_manifest: self.cached_manifest.clone(),
441            last_flush_time: self.last_flush_time.clone(),
442            fork_id: self.fork_id,
443            fork_flush_count: self.fork_flush_count.clone(),
444            fork_fragment_warn_fired: self.fork_fragment_warn_fired.clone(),
445            fork_fragment_warn_threshold: self.config.fork_fragment_warn_threshold,
446            flush_lock: self.flush_lock.clone(),
447            index_rebuild_manager: self.index_rebuild_manager.clone(),
448            compaction_handle: self.compaction_handle.clone(),
449            compaction_config: self.config.compaction.clone(),
450            index_rebuild_config: self.config.index_rebuild.clone(),
451            auto_rebuild_enabled: self.config.index_rebuild.auto_rebuild_enabled,
452        }
453    }
454
455    /// Borrow the flush coordinator if async flush is enabled.
456    /// Returns `None` when `config.async_flush_enabled = false`.
457    /// External callers (`drop_fork`) use this to drain pending streams.
458    pub fn flush_coordinator(
459        &self,
460    ) -> Option<&Arc<crate::runtime::flush_coordinator::FlushCoordinator>> {
461        self.flush_coordinator.as_ref()
462    }
463
464    /// Set the index rebuild manager for post-flush automatic rebuild scheduling.
465    ///
466    /// One-shot: returns `Err` if already set. The receiver is `&self` so this
467    /// can be called after the `Writer` has been wrapped in `Arc<Writer>`.
468    pub fn set_index_rebuild_manager(
469        &self,
470        manager: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
471    ) -> Result<()> {
472        self.index_rebuild_manager
473            .set(manager)
474            .map_err(|_| anyhow!("index_rebuild_manager already set"))
475    }
476
477    /// Replay WAL mutations into the current L0 buffer.
478    pub async fn replay_wal(&self, wal_high_water_mark: u64) -> Result<usize> {
479        let l0 = self.l0_manager.get_current();
480        let wal = l0.read().wal.clone();
481
482        if let Some(wal) = wal {
483            wal.initialize().await?;
484            let mutations = wal.replay_since(wal_high_water_mark).await?;
485            let count = mutations.len();
486
487            if count > 0 {
488                log::info!(
489                    "Replaying {} mutations from WAL (LSN > {})",
490                    count,
491                    wal_high_water_mark
492                );
493                let mut l0_guard = l0.write();
494                l0_guard.replay_mutations(mutations)?;
495                // Rebuild the UNIQUE constraint index over the recovered rows
496                // (Bug #9 Mechanism B). `replay_mutations` restores
497                // vertices/properties/labels but never repopulates
498                // `constraint_index` (its only other caller is the live insert
499                // path). Without this, a unique key that lives only in the WAL
500                // (committed but not yet flushed to Lance) is invisible to
501                // `check_unique_constraint_multi` after recovery and a
502                // duplicate of it could be created.
503                self.rebuild_constraint_index(&mut l0_guard);
504            }
505
506            Ok(count)
507        } else {
508            Ok(0)
509        }
510    }
511
512    /// Rebuild the UNIQUE constraint index on a recovered L0 buffer.
513    ///
514    /// Scans every recovered vertex's properties and, for each enabled UNIQUE
515    /// constraint whose target label the vertex carries and whose member
516    /// properties are all present, inserts the same constraint key the live
517    /// insert path builds (`serialize_constraint_key`). Tombstoned vertices are
518    /// skipped. Called after [`L0Buffer::replay_mutations`] under the buffer's
519    /// write lock; the schema is already loaded on the `Writer`.
520    fn rebuild_constraint_index(&self, l0_guard: &mut L0Buffer) {
521        let schema = self.schema_manager.schema();
522        // Collect entries first to avoid borrowing `vertex_properties`
523        // immutably while mutating `constraint_index` through the same guard.
524        let mut keys: Vec<(Vec<u8>, Vid)> = Vec::new();
525        for (&vid, props) in &l0_guard.vertex_properties {
526            if l0_guard.vertex_tombstones.contains(&vid) {
527                continue;
528            }
529            let Some(labels) = l0_guard.vertex_labels.get(&vid) else {
530                continue;
531            };
532            for label in labels {
533                for constraint in &schema.constraints {
534                    if !constraint.enabled {
535                        continue;
536                    }
537                    let ConstraintTarget::Label(l) = &constraint.target else {
538                        continue;
539                    };
540                    if l != label {
541                        continue;
542                    }
543                    let ConstraintType::Unique {
544                        properties: unique_props,
545                    } = &constraint.constraint_type
546                    else {
547                        continue;
548                    };
549                    let mut key_values = Vec::new();
550                    let mut all_present = true;
551                    for prop in unique_props {
552                        if let Some(val) = props.get(prop) {
553                            key_values.push((prop.clone(), val.clone()));
554                        } else {
555                            all_present = false;
556                            break;
557                        }
558                    }
559                    if all_present {
560                        keys.push((serialize_constraint_key(label, &key_values), vid));
561                    }
562                }
563            }
564        }
565        for (key, vid) in keys {
566            l0_guard.insert_constraint_key(key, vid);
567        }
568    }
569
570    /// Allocates the next VID (pure auto-increment).
571    pub async fn next_vid(&self) -> Result<Vid> {
572        self.allocator.allocate_vid().await
573    }
574
575    /// Allocates multiple VIDs at once for bulk operations.
576    /// This is more efficient than calling next_vid() in a loop.
577    pub async fn allocate_vids(&self, count: usize) -> Result<Vec<Vid>> {
578        self.allocator.allocate_vids(count).await
579    }
580
581    /// Allocates the next EID (pure auto-increment).
582    pub async fn next_eid(&self, _type_id: u32) -> Result<Eid> {
583        self.allocator.allocate_eid().await
584    }
585
586    /// Allocates multiple EIDs at once for bulk operations.
587    /// This is more efficient than calling next_eid() in a loop.
588    pub async fn allocate_eids(&self, count: usize) -> Result<Vec<Eid>> {
589        self.allocator.allocate_eids(count).await
590    }
591
592    /// Install the embedding runtime exactly once. Receiver is `&self` so it
593    /// can be called after the `Writer` has been wrapped in `Arc<Writer>`.
594    pub fn set_xervo_runtime(&self, runtime: Arc<ModelRuntime>) -> Result<()> {
595        self.xervo_runtime
596            .set(runtime)
597            .map_err(|_| anyhow!("xervo_runtime already set"))
598    }
599
600    pub fn xervo_runtime(&self) -> Option<Arc<ModelRuntime>> {
601        self.xervo_runtime.get().cloned()
602    }
603
604    /// Create a new empty L0 buffer for transaction-scoped mutations.
605    ///
606    /// Only reads the current version — no exclusive lock required on Writer.
607    /// The returned buffer has no WAL reference; mutations are logged at
608    /// commit time via [`Self::commit_transaction_l0`].
609    pub fn create_transaction_l0(&self) -> Arc<RwLock<L0Buffer>> {
610        let current_version = self.l0_manager.get_current().read().current_version;
611        // Transaction mutations are logged to WAL at COMMIT time, not during the transaction.
612        let buf = L0Buffer::new(current_version, None);
613        // SSI: stamp the OCC read sequence at begin so commit can detect any
614        // transaction that committed since. Gated on the runtime `ssi_enabled`
615        // toggle — when off, `occ_read_set` stays `None` and every downstream
616        // read-set recording / commit validation self-gates to a no-op.
617        let buf = if self.config.ssi_enabled {
618            let mut buf = buf;
619            buf.occ_read_seq = self
620                .commit_sequence
621                .load(crate::runtime::sync::Ordering::Relaxed);
622            // The read path records observed ids here for SSI antidependency
623            // detection; commit consults it.
624            buf.occ_read_set = Some(Arc::new(parking_lot::Mutex::new(
625                crate::runtime::l0::OccReadSet::default(),
626            )));
627            buf
628        } else {
629            buf
630        };
631        Arc::new(RwLock::new(buf))
632    }
633
634    /// Resolve the target L0 buffer for a mutation.
635    ///
636    /// When `tx_l0` is `Some`, the mutation targets a transaction-private buffer.
637    /// When `None`, it targets the global L0 from the manager.
638    fn resolve_l0(&self, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) -> Arc<RwLock<L0Buffer>> {
639        tx_l0
640            .cloned()
641            .unwrap_or_else(|| self.l0_manager.get_current())
642    }
643
644    fn update_metrics(&self) {
645        let l0 = self.l0_manager.get_current();
646        let size = l0.read().estimated_size;
647        metrics::gauge!("l0_buffer_size_bytes").set(size as f64);
648    }
649
650    /// Overlay-aware issue-#77 edge-endpoint validation.
651    ///
652    /// The current buffer alone does not hold all committed-but-unflushed
653    /// tombstones — a flush rotation moves them onto `pending_flush` until
654    /// the Lance write completes. A vertex is effectively deleted iff,
655    /// walking newest-first (tx → current → pending newest→oldest), the
656    /// first buffer that knows the vid says "tombstoned" (an insert clears
657    /// the tombstone within a buffer, so props/tombstone are mutually
658    /// exclusive per buffer).
659    ///
660    /// Must run under `flush_lock` so the overlay cannot change before the
661    /// merge, and BEFORE the durable WAL flush (see the call site).
662    fn validate_edge_endpoints_overlay(&self, tx_l0: &L0Buffer) -> Result<()> {
663        let chain = self.l0_manager.get_pending_flush();
664        let current = self.l0_manager.get_current();
665        let effectively_deleted = |vid: &Vid| -> bool {
666            if tx_l0.vertex_properties.contains_key(vid) {
667                return false;
668            }
669            if tx_l0.vertex_tombstones.contains(vid) {
670                return true;
671            }
672            {
673                let cur = current.read();
674                if cur.vertex_properties.contains_key(vid) {
675                    return false;
676                }
677                if cur.vertex_tombstones.contains(vid) {
678                    return true;
679                }
680            }
681            for frozen in chain.iter().rev() {
682                let g = frozen.read();
683                if g.vertex_properties.contains_key(vid) {
684                    return false;
685                }
686                if g.vertex_tombstones.contains(vid) {
687                    return true;
688                }
689            }
690            false
691        };
692        for (eid, (src_vid, dst_vid, _etype)) in &tx_l0.edge_endpoints {
693            if tx_l0.tombstones.contains_key(eid) {
694                continue; // a deletion, not an insertion — never resurrects a vertex
695            }
696            if effectively_deleted(src_vid) {
697                anyhow::bail!(
698                    "Cannot insert edge {}: source vertex {} has been deleted (issue #77)",
699                    eid,
700                    src_vid
701                );
702            }
703            if effectively_deleted(dst_vid) {
704                anyhow::bail!(
705                    "Cannot insert edge {}: destination vertex {} has been deleted (issue #77)",
706                    eid,
707                    dst_vid
708                );
709            }
710        }
711        Ok(())
712    }
713
714    /// Seed `main_l0` (the current buffer) with the newest pending-overlay
715    /// value for each CRDT property the transaction writes, so the commit
716    /// merge MERGES against the committed CRDT state instead of shadowing it
717    /// (the carve-out lets concurrent CRDT writers commit on the assumption
718    /// that the merge sees the committed value — true only while that value
719    /// lives in current, not mid-flush on `pending_flush`). No-op when
720    /// nothing is pending or the property already exists in current. Vertex
721    /// properties only, mirroring the carve-out itself.
722    fn seed_crdt_state_from_chain(&self, tx_l0: &L0Buffer, main_l0: &mut L0Buffer) {
723        let chain = self.l0_manager.get_pending_flush();
724        if chain.is_empty() {
725            return;
726        }
727        for (vid, props) in &tx_l0.vertex_properties {
728            Self::seed_crdt_props(&chain, *vid, props, main_l0);
729        }
730    }
731
732    /// Per-vertex CRDT seeding (see [`Self::seed_crdt_state_from_chain`]).
733    /// Also used by the non-transactional vertex write path, which CRDT-merges
734    /// into the current buffer directly and has the same shadowing hazard
735    /// during a flush window.
736    fn seed_crdt_props(
737        chain: &[Arc<RwLock<L0Buffer>>],
738        vid: Vid,
739        props: &Properties,
740        target: &mut L0Buffer,
741    ) {
742        for (key, value) in props {
743            if crate::runtime::l0::try_as_crdt(value).is_none() {
744                continue;
745            }
746            if target
747                .vertex_properties
748                .get(&vid)
749                .is_some_and(|p| p.contains_key(key))
750            {
751                continue;
752            }
753            // Newest generation first: the first hit is the live state.
754            for frozen in chain.iter().rev() {
755                let g = frozen.read();
756                if let Some(v) = g.vertex_properties.get(&vid).and_then(|p| p.get(key)) {
757                    if crate::runtime::l0::try_as_crdt(v).is_some() {
758                        target
759                            .vertex_properties
760                            .entry(vid)
761                            .or_default()
762                            .insert(key.clone(), v.clone());
763                    }
764                    break;
765                }
766            }
767        }
768    }
769
770    /// Commit an externally-owned transaction L0 buffer.
771    ///
772    /// Writes mutations to WAL, flushes, merges into main L0, and replays
773    /// edges into the AdjacencyManager. Returns the WAL LSN of the commit
774    /// (0 when no WAL is configured).
775    /// Commit a transaction's private L0 buffer into main L0.
776    ///
777    /// Returns `(wal_lsn, flush_pending)`. When `flush_pending == true`, the
778    /// post-commit `should_flush()` predicate fired but no flush ran — the
779    /// caller is expected to spawn a background `flush_to_l1`. This is the
780    /// shape used when `UniConfig::async_flush_enabled` is set, so commits
781    /// don't block on L1-streaming I/O.
782    pub async fn commit_transaction_l0(
783        self: &Arc<Self>,
784        tx_l0_arc: Arc<RwLock<L0Buffer>>,
785    ) -> Result<(u64, bool)> {
786        // Hold `flush_lock` across WAL append + flush + main-L0 merge.
787        // Two concurrent commits serialize here; in Phase 3 the outer
788        // `Arc<RwLock<Writer>>` already provides this exclusion, so the
789        // acquisition is uncontended. Phase 4 drops the outer lock and
790        // this becomes the load-bearing serialization point.
791        let _flush_lock_guard = self.flush_lock.lock().await;
792
793        // Crash-recovery seam: simulate process death immediately after winning
794        // the commit serialization point but before any durable work. No-op
795        // unless built with `--features failpoints`. (See ssi_resilience tests.)
796        fail::fail_point!("commit::after-flush-lock");
797
798        // SSI: optimistic conflict detection. This MUST run before any WAL
799        // write — `flush_wal()` below is the durable commit point and the WAL
800        // has no abort marker, so aborting after it would resurrect this
801        // transaction on crash recovery. The write-set is reused for
802        // registration after a successful merge.
803        // Runtime-gated on `config.ssi_enabled`. When off, no validation runs
804        // and `occ_write_set` is `None`, so the post-merge registration below
805        // is skipped — reproducing last-writer-wins exactly.
806        let occ_write_set: Option<crate::runtime::occ::WriteSet> = if self.config.ssi_enabled {
807            let tx_l0 = tx_l0_arc.read();
808            let read_seq = tx_l0.occ_read_seq;
809            let write_set = crate::runtime::occ::WriteSet::from_l0(&tx_l0);
810            if !write_set.is_empty() {
811                // Telemetry: one validation per non-empty (writing) commit. The
812                // ratio of conflicts to validations is the headline abort rate.
813                metrics::counter!("uni_ssi_commit_validations_total").increment(1);
814                // Read-set is consulted only for writing transactions, so a
815                // read-only commit (empty write-set) runs at snapshot isolation.
816                let read_guard = tx_l0.occ_read_set.as_ref().map(|rs| rs.lock());
817                if let Some(conflict) =
818                    self.committed_writes
819                        .lock()
820                        .check(read_seq, &write_set, read_guard.as_deref())
821                {
822                    use crate::runtime::occ::Conflict;
823                    match &conflict {
824                        Conflict::WriteWrite { .. } => metrics::counter!(
825                            "uni_ssi_serialization_conflicts_total",
826                            "kind" => "write_write",
827                        )
828                        .increment(1),
829                        Conflict::ReadWrite { .. } => metrics::counter!(
830                            "uni_ssi_serialization_conflicts_total",
831                            "kind" => "read_write",
832                        )
833                        .increment(1),
834                        Conflict::HistoryTruncated { .. } => {
835                            metrics::counter!("uni_ssi_history_truncated_total").increment(1)
836                        }
837                    }
838                    return Err(anyhow::Error::new(
839                        uni_common::UniError::SerializationConflict {
840                            message: conflict.to_string(),
841                        },
842                    ));
843                }
844            }
845
846            // Validate against the committed-but-unflushed overlay under
847            // `flush_lock`: serializable MERGE uniqueness + CRDT carve-out
848            // soundness. The current buffer alone does not hold all committed
849            // state — a flush rotation moves it onto `pending_flush` until the
850            // Lance write completes (the Bug #9A window, here at the
851            // commit-time layer) — so every check walks [current, pending…].
852            {
853                let pending = self.l0_manager.get_pending_flush();
854                let main_l0 = self.l0_manager.get_current();
855                let overlay: Vec<Arc<RwLock<L0Buffer>>> =
856                    std::iter::once(main_l0).chain(pending).collect();
857
858                // SSI / serializable MERGE: abort if a concurrent transaction has
859                // already committed a row with one of this transaction's unique
860                // keys. Commits serialize here, so this closes the race window
861                // left by the per-insert check. (Empty index → no iterations.)
862                for (key, vid) in &tx_l0.constraint_index {
863                    if overlay
864                        .iter()
865                        .any(|b| b.read().has_constraint_key(key, *vid))
866                    {
867                        metrics::counter!("uni_ssi_constraint_conflicts_total").increment(1);
868                        return Err(anyhow::Error::new(
869                            uni_common::UniError::ConstraintConflict {
870                                message: "unique key already committed by a concurrent \
871                                          transaction"
872                                    .to_string(),
873                            },
874                        ));
875                    }
876                }
877
878                // Implicit MERGE phantom guard: a `MERGE` that *created* a node
879                // registered its (label, key-props) here even with no declared
880                // UNIQUE constraint. If a concurrent transaction already committed
881                // the same MERGE key, abort retriably so the two converge to one
882                // node on retry (the loser's MATCH then finds the committed row).
883                // Only MERGE-creates register keys, so a plain CREATE of the same
884                // properties never lands here. (Empty index → no iterations.)
885                for (key, vid) in &tx_l0.merge_guard_index {
886                    if overlay
887                        .iter()
888                        .any(|b| b.read().has_merge_guard_key(key, *vid))
889                    {
890                        metrics::counter!("uni_ssi_constraint_conflicts_total").increment(1);
891                        return Err(anyhow::Error::new(
892                            uni_common::UniError::ConstraintConflict {
893                                message: "MERGE key already committed by a concurrent \
894                                          transaction"
895                                    .to_string(),
896                            },
897                        ));
898                    }
899                }
900
901                // Same race window for global ext_id uniqueness: the per-insert
902                // check ran against an older main L0; re-probe the committed
903                // index here, where commits serialize.
904                for (ext_id, vid) in &tx_l0.extid_index {
905                    let taken = overlay.iter().any(|b| {
906                        matches!(b.read().extid_index.get(ext_id), Some(&owner) if owner != *vid)
907                    });
908                    if taken {
909                        metrics::counter!("uni_ssi_constraint_conflicts_total").increment(1);
910                        return Err(anyhow::Error::new(
911                            uni_common::UniError::ConstraintConflict {
912                                message: format!(
913                                    "ext_id '{ext_id}' already committed by a concurrent \
914                                     transaction"
915                                ),
916                            },
917                        ));
918                    }
919                }
920
921                // CRDT carve-out soundness: a pure-CRDT write was dropped from the
922                // write-set assuming its merge commutes. If the overlay holds a
923                // *different* CRDT variant for the same property, the merge would
924                // silently overwrite it — abort instead of losing the update.
925                // (Checked against every overlay buffer: conservative if an old
926                // generation held a different variant that a newer commit already
927                // replaced, but an abort+retry is always sound.)
928                for buf in &overlay {
929                    if let Some(conflict) =
930                        crate::runtime::occ::crdt_carveout_overwrite(&tx_l0, &buf.read())
931                    {
932                        metrics::counter!("uni_ssi_crdt_aborts_total").increment(1);
933                        return Err(anyhow::Error::new(
934                            uni_common::UniError::SerializationConflict {
935                                message: conflict.to_string(),
936                            },
937                        ));
938                    }
939                }
940            }
941            Some(write_set)
942        } else {
943            None
944        };
945
946        // Issue #77: an edge whose endpoint is effectively deleted makes the
947        // merge below bail. That bail MUST happen before the durable WAL flush —
948        // after it the transaction is committed-but-unmerged (a ghost commit),
949        // and WAL replay re-hits the same bail, making the database unopenable.
950        // SSI validation was deliberately placed before the flush for exactly
951        // this reason; the endpoint check belongs here too. Runs unconditionally
952        // (issue #77 is not SSI-gated) under `flush_lock`, so the overlay
953        // tombstone state cannot change between here and the merge. A
954        // tombstone may live in a flush-rotated pending buffer rather than
955        // the current buffer, so the check walks the overlay newest-first.
956        {
957            let tx_l0 = tx_l0_arc.read();
958            self.validate_edge_endpoints_overlay(&tx_l0)?;
959        }
960
961        // Crash-recovery seam: SSI validation has passed; the transaction is
962        // about to become durable. A crash here must leave NO trace (validation
963        // happens before the WAL is touched). No-op unless `failpoints`.
964        fail::fail_point!("commit::after-validate");
965
966        // 1. Write transaction mutations to WAL BEFORE merging into main L0
967        // This ensures durability before visibility.
968        {
969            let tx_l0 = tx_l0_arc.read();
970            let main_l0_arc = self.l0_manager.get_current();
971            let main_l0 = main_l0_arc.read();
972
973            // If WAL exists, write mutations to it for durability
974            if let Some(wal) = main_l0.wal.as_ref() {
975                // Order: vertices first, then edges (to ensure src/dst exist on replay)
976
977                // Vertex insertions
978                for (vid, properties) in &tx_l0.vertex_properties {
979                    if !tx_l0.vertex_tombstones.contains(vid) {
980                        let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
981                        wal.append(crate::runtime::wal::Mutation::InsertVertex {
982                            vid: *vid,
983                            properties: properties.clone(),
984                            labels,
985                        })?;
986                    }
987                }
988
989                // Vertex deletions
990                for vid in &tx_l0.vertex_tombstones {
991                    let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
992                    wal.append(crate::runtime::wal::Mutation::DeleteVertex { vid: *vid, labels })?;
993                }
994
995                // Label-only mutations (SET n:Label / REMOVE n:Label). After
996                // vertex inserts (so the vertex exists on replay), before edges,
997                // and skipping vertices deleted in this same commit.
998                for vid in &tx_l0.vertex_label_overwrites {
999                    if tx_l0.vertex_tombstones.contains(vid) {
1000                        continue;
1001                    }
1002                    let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
1003                    wal.append(crate::runtime::wal::Mutation::SetVertexLabels {
1004                        vid: *vid,
1005                        labels,
1006                    })?;
1007                }
1008
1009                // Crash-recovery seam: vertices appended, edges not yet. Tests
1010                // assert that a crash here (before `flush_wal`) recovers NOTHING
1011                // — the durable commit point is the flush below, not append.
1012                fail::fail_point!("commit::mid-wal");
1013
1014                // Edge insertions and deletions from edge_endpoints
1015                for (eid, (src_vid, dst_vid, edge_type)) in &tx_l0.edge_endpoints {
1016                    if tx_l0.tombstones.contains_key(eid) {
1017                        let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
1018                        wal.append(crate::runtime::wal::Mutation::DeleteEdge {
1019                            eid: *eid,
1020                            src_vid: *src_vid,
1021                            dst_vid: *dst_vid,
1022                            edge_type: *edge_type,
1023                            version,
1024                        })?;
1025                    } else {
1026                        let properties =
1027                            tx_l0.edge_properties.get(eid).cloned().unwrap_or_default();
1028                        let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
1029                        let edge_type_name = tx_l0.edge_types.get(eid).cloned();
1030                        wal.append(crate::runtime::wal::Mutation::InsertEdge {
1031                            src_vid: *src_vid,
1032                            dst_vid: *dst_vid,
1033                            edge_type: *edge_type,
1034                            eid: *eid,
1035                            version,
1036                            properties,
1037                            edge_type_name,
1038                        })?;
1039                    }
1040                }
1041
1042                // Tombstones for edges that only exist in the global L0 (not in
1043                // this transaction's edge_endpoints).  Without this, deletes of
1044                // pre-existing edges would be silently lost.
1045                for (eid, tombstone) in &tx_l0.tombstones {
1046                    if !tx_l0.edge_endpoints.contains_key(eid) {
1047                        let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
1048                        wal.append(crate::runtime::wal::Mutation::DeleteEdge {
1049                            eid: *eid,
1050                            src_vid: tombstone.src_vid,
1051                            dst_vid: tombstone.dst_vid,
1052                            edge_type: tombstone.edge_type,
1053                            version,
1054                        })?;
1055                    }
1056                }
1057            }
1058        }
1059
1060        // 2. Flush WAL to durable storage - THIS IS THE COMMIT POINT
1061        let wal_lsn = self.flush_wal().await?;
1062
1063        // Crash-recovery seam: the WAL is durable but main L0 has NOT merged.
1064        // A crash here must RECOVER the transaction on replay (it is committed),
1065        // even though it was never made visible in-process. No-op unless `failpoints`.
1066        fail::fail_point!("commit::after-wal-flush");
1067
1068        // Component C1: if an outstanding snapshot pins the current generation,
1069        // clone it aside (lazy copy-on-write) before merging, so the pinning
1070        // transaction's reads stay isolated from this commit. No-op — and zero
1071        // cost — when nothing is pinned (the common case). We hold `flush_lock`,
1072        // so this cannot race a flush rotate or another commit's merge; the merge
1073        // below re-fetches `get_current()`, landing in the fresh post-freeze buffer.
1074        // Self-gates on the runtime SSI toggle: a snapshot is only ever pinned by
1075        // a transaction begun under `ssi_enabled`, so `is_current_pinned()` is
1076        // always false when SSI is off and this is a zero-cost no-op.
1077        if self.l0_manager.is_current_pinned() {
1078            self.l0_manager.freeze_current_for_snapshot();
1079            metrics::counter!("uni_l0_snapshot_freezes_total").increment(1);
1080        }
1081
1082        // 3. Merge into main L0 and make visible
1083        {
1084            // Write-lock the tx buffer: `merge_take` moves its property maps
1085            // into main L0 instead of cloning them. The commit consumes the
1086            // transaction, so the drained maps are never observed afterwards;
1087            // everything read below (endpoints, versions, tombstones) is left
1088            // intact.
1089            let mut tx_l0 = tx_l0_arc.write();
1090            let main_l0_arc = self.l0_manager.get_current();
1091            let mut main_l0 = main_l0_arc.write();
1092            // A CRDT property's committed state may live only in a
1093            // flush-rotated pending buffer (the post-rotation current is
1094            // empty until the Lance write completes — the Bug #9A window).
1095            // `merge_crdt_properties` merges against the CURRENT buffer's
1096            // value — without seeding, the tx's CRDT state would SHADOW the
1097            // pending buffer's at read time (newest buffer wins per property)
1098            // and concurrent increments would be lost. Seed the newest
1099            // overlay value for each CRDT property the tx writes that current
1100            // lacks, so the merge below merges instead of replaces.
1101            self.seed_crdt_state_from_chain(&tx_l0, &mut main_l0);
1102            main_l0.merge_take(&mut tx_l0)?;
1103
1104            // Replay transaction edges into the AdjacencyManager overlay
1105            for (eid, (src, dst, etype)) in &tx_l0.edge_endpoints {
1106                let edge_version = tx_l0
1107                    .edge_versions
1108                    .get(eid)
1109                    .copied()
1110                    .unwrap_or(main_l0.current_version);
1111                if tx_l0.tombstones.contains_key(eid) {
1112                    self.adjacency_manager
1113                        .add_tombstone(*eid, *src, *dst, *etype, edge_version);
1114                } else {
1115                    self.adjacency_manager
1116                        .insert_edge(*src, *dst, *eid, *etype, edge_version);
1117                }
1118            }
1119
1120            // Replay tombstones for edges that only exist in the global L0
1121            // (not in this transaction's edge_endpoints).
1122            for (eid, tombstone) in &tx_l0.tombstones {
1123                if !tx_l0.edge_endpoints.contains_key(eid) {
1124                    let edge_version = tx_l0
1125                        .edge_versions
1126                        .get(eid)
1127                        .copied()
1128                        .unwrap_or(main_l0.current_version);
1129                    self.adjacency_manager.add_tombstone(
1130                        *eid,
1131                        tombstone.src_vid,
1132                        tombstone.dst_vid,
1133                        tombstone.edge_type,
1134                        edge_version,
1135                    );
1136                }
1137            }
1138        }
1139
1140        // Crash-recovery seam: durable AND merged, but the in-memory commit
1141        // registry has not recorded this write-set yet. A crash here is
1142        // indistinguishable from one at `after-wal-flush` on reopen (the
1143        // registry is in-memory and rebuilt empty); the tx still recovers.
1144        fail::fail_point!("commit::after-merge");
1145
1146        // SSI: register this commit's write-set under a fresh commit sequence so
1147        // later transactions detect conflicts against it. Still under
1148        // `flush_lock`, before the async-flush branch can drop the guard.
1149        // `occ_write_set` is `Some` only when `config.ssi_enabled`.
1150        if let Some(write_set) = occ_write_set
1151            && !write_set.is_empty()
1152        {
1153            // Bump-then-record via the shared OCC seam (see `CommitRegistry::commit`)
1154            // so production and the loom/shuttle models exercise identical logic.
1155            self.committed_writes
1156                .lock()
1157                .commit(&self.commit_sequence, write_set);
1158        }
1159
1160        self.update_metrics();
1161
1162        // 4. Best-effort post-commit auto-flush.
1163        //
1164        // Two paths:
1165        // - async_flush_enabled = false (default): inline under our
1166        //   existing flush_lock guard via flush_inline_under_lock.
1167        // - async_flush_enabled = true: rotate inline, drop flush_lock,
1168        //   then submit the stream phase to the coordinator. Gated on
1169        //   `pending_flush_count() < max_pending_flushes` so we don't
1170        //   stack up rotations beyond the configured pipeline depth.
1171        //   `try_acquire_permit` is non-blocking: if we lose the race
1172        //   for the last permit, we just skip this trigger (the next
1173        //   commit retries).
1174        let mut flush_pending = false;
1175        if self.should_flush() {
1176            if self.config.async_flush_enabled
1177                && let Some(coord) = self.flush_coordinator.as_ref()
1178                && coord.pending_flush_count() < self.config.max_pending_flushes
1179            {
1180                match coord.try_acquire_permit() {
1181                    Some(permit) => {
1182                        match self.flush_l0_rotate().await {
1183                            Ok(rotate_out) => {
1184                                // Allocate the rotate seq and bump pending ONLY
1185                                // after the rotate succeeds (Bug #3). A failed
1186                                // rotate must consume neither: the finalizer
1187                                // advances strictly in consecutive seq order and
1188                                // only decrements pending on finalize, so a
1189                                // leaked seq/pending from a failed rotate would
1190                                // wedge the finalizer forever and climb pending
1191                                // toward `max_pending_flushes`. The seq is still
1192                                // allocated under `flush_lock` (immediately after
1193                                // the rotate, before the guard drops below), so
1194                                // concurrent rotates keep seq order == rotation
1195                                // order, and the seq is not used until submit.
1196                                let seq = coord.next_rotate_seq();
1197                                coord.note_pending();
1198                                // Release flush_lock BEFORE the spawn so concurrent
1199                                // commits can proceed while the stream runs.
1200                                drop(_flush_lock_guard);
1201                                let parent_manifest = self.cached_manifest.lock().clone();
1202                                let rotated = crate::runtime::flush_coordinator::RotatedFlush {
1203                                    seq,
1204                                    old_l0_arc: rotate_out.old_l0_arc.clone(),
1205                                    wal_lsn: rotate_out.wal_lsn,
1206                                    current_version: rotate_out.current_version,
1207                                    name: None,
1208                                    parent_manifest,
1209                                    permit,
1210                                    flush_in_progress_guard: rotate_out.flush_in_progress_guard,
1211                                };
1212                                let writer = self.clone();
1213                                let _ticket = coord.submit_for_stream(
1214                                    rotated,
1215                                    move |old_l0, wal, ver, n| async move {
1216                                        let outcome =
1217                                            writer.flush_stream_l1(old_l0, wal, ver, n).await?;
1218                                        Ok(crate::runtime::flush_coordinator::FlushOutcome {
1219                                            new_manifest: outcome.manifest,
1220                                            snapshot_id: outcome.snapshot_id,
1221                                        })
1222                                    },
1223                                );
1224                                flush_pending = true;
1225                                // Early return — flush_lock already dropped.
1226                                return Ok((wal_lsn, flush_pending));
1227                            }
1228                            Err(e) => {
1229                                tracing::warn!("Async rotate failed (non-critical): {}", e);
1230                                // No seq was allocated and pending was not
1231                                // bumped (both moved into the Ok arm for Bug
1232                                // #3), so the finalizer is not wedged. The
1233                                // permit drops here, freeing the slot.
1234                            }
1235                        }
1236                    }
1237                    None => {
1238                        // Race: someone else grabbed the last permit. Skip;
1239                        // next commit will retry should_flush().
1240                        metrics::counter!("uni_flush_trigger_skipped_total").increment(1);
1241                    }
1242                }
1243            } else if let Err(e) = self.flush_inline_under_lock(None).await {
1244                tracing::warn!("Post-commit flush check failed (non-critical): {}", e);
1245            }
1246        }
1247
1248        Ok((wal_lsn, flush_pending))
1249    }
1250
1251    /// Flush the WAL buffer to durable storage.
1252    ///
1253    /// Returns the LSN of the flushed segment, or `0` when no WAL is configured.
1254    pub async fn flush_wal(&self) -> Result<u64> {
1255        let l0 = self.l0_manager.get_current();
1256        let wal = l0.read().wal.clone();
1257
1258        match wal {
1259            Some(wal) => Ok(wal.flush().await?),
1260            None => Ok(0),
1261        }
1262    }
1263
1264    /// Record property removals in the active L0 mutation stats.
1265    ///
1266    /// Routes to the transaction L0 if provided, otherwise to the main L0.
1267    pub fn track_properties_removed(&self, count: usize, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) {
1268        if count == 0 {
1269            return;
1270        }
1271        let l0 = self.resolve_l0(tx_l0);
1272        l0.write().mutation_stats.properties_removed += count;
1273    }
1274
1275    /// Validates vertex constraints for the given properties.
1276    /// In the new design, label is passed as a parameter since VID no longer embeds label.
1277    async fn validate_vertex_constraints_for_label(
1278        &self,
1279        vid: Vid,
1280        properties: &Properties,
1281        label: &str,
1282        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1283    ) -> Result<()> {
1284        self.validate_vertex_constraints_for_label_impl(vid, properties, label, tx_l0, false)
1285            .await
1286    }
1287
1288    /// Partial-update sibling: validates only constraints touching keys
1289    /// present in `properties` (the touched set). NOT NULL is checked
1290    /// only for touched keys; multi-key UNIQUE / CHECK / EXISTS are
1291    /// skipped when any referenced key is absent (the caller is
1292    /// expected to have routed to the full-row path in that case via
1293    /// `touched_needs_full_read`).
1294    async fn validate_vertex_constraints_for_label_partial(
1295        &self,
1296        vid: Vid,
1297        properties: &Properties,
1298        label: &str,
1299        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1300    ) -> Result<()> {
1301        self.validate_vertex_constraints_for_label_impl(vid, properties, label, tx_l0, true)
1302            .await
1303    }
1304
1305    async fn validate_vertex_constraints_for_label_impl(
1306        &self,
1307        vid: Vid,
1308        properties: &Properties,
1309        label: &str,
1310        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1311        partial: bool,
1312    ) -> Result<()> {
1313        let schema = self.schema_manager.schema();
1314
1315        {
1316            // 1. Check NOT NULL constraints (from Property definitions).
1317            //    Under partial-update mode, skip properties NOT in
1318            //    `properties` — they retain their previous (already-
1319            //    validated) value.
1320            if let Some(props_meta) = schema.properties.get(label) {
1321                for (prop_name, meta) in props_meta {
1322                    if !meta.nullable {
1323                        let present = properties.get(prop_name);
1324                        if partial && present.is_none() {
1325                            continue;
1326                        }
1327                        if present.is_none_or(|v| v.is_null()) {
1328                            log::warn!(
1329                                "Constraint violation: Property '{}' cannot be null for label '{}'",
1330                                prop_name,
1331                                label
1332                            );
1333                            return Err(anyhow!(
1334                                "Constraint violation: Property '{}' cannot be null",
1335                                prop_name
1336                            ));
1337                        }
1338                    }
1339                }
1340            }
1341
1342            // 2. Check Explicit Constraints (Unique, Check, etc.)
1343            for constraint in &schema.constraints {
1344                if !constraint.enabled {
1345                    continue;
1346                }
1347                match &constraint.target {
1348                    ConstraintTarget::Label(l) if l == label => {}
1349                    _ => continue,
1350                }
1351
1352                match &constraint.constraint_type {
1353                    ConstraintType::Unique {
1354                        properties: unique_props,
1355                    } => {
1356                        // Support single and multi-property unique constraints
1357                        if !unique_props.is_empty() {
1358                            let mut key_values = Vec::new();
1359                            let mut missing = false;
1360                            for prop in unique_props {
1361                                if let Some(val) = properties.get(prop) {
1362                                    key_values.push((prop.clone(), val.clone()));
1363                                } else {
1364                                    missing = true; // Can't enforce if property missing (partial update?)
1365                                    // For INSERT, missing means null?
1366                                    // If property is nullable, unique constraint typically allows multiple nulls or ignores?
1367                                    // For now, only check if ALL keys are present
1368                                }
1369                            }
1370
1371                            if !missing {
1372                                self.check_unique_constraint_multi(label, &key_values, vid, tx_l0)
1373                                    .await?;
1374                            }
1375                        }
1376                    }
1377                    ConstraintType::Exists { property } => {
1378                        if properties.get(property).is_none_or(|v| v.is_null()) {
1379                            log::warn!(
1380                                "Constraint violation: Property '{}' must exist for label '{}'",
1381                                property,
1382                                label
1383                            );
1384                            return Err(anyhow!(
1385                                "Constraint violation: Property '{}' must exist",
1386                                property
1387                            ));
1388                        }
1389                    }
1390                    ConstraintType::Check { expression } => {
1391                        if !self.evaluate_check_constraint(expression, properties)? {
1392                            return Err(anyhow!(
1393                                "CHECK constraint '{}' violated: expression '{}' evaluated to false",
1394                                constraint.name,
1395                                expression
1396                            ));
1397                        }
1398                    }
1399                    _ => {
1400                        return Err(anyhow!("Unsupported constraint type"));
1401                    }
1402                }
1403            }
1404        }
1405        Ok(())
1406    }
1407
1408    /// Validates vertex constraints for a vertex with the given labels.
1409    /// Labels must be passed explicitly since the vertex may not yet be in L0.
1410    /// Unknown labels (not in schema) are skipped.
1411    async fn validate_vertex_constraints(
1412        &self,
1413        vid: Vid,
1414        properties: &Properties,
1415        labels: &[String],
1416        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1417    ) -> Result<()> {
1418        let schema = self.schema_manager.schema();
1419
1420        // Validate constraints only for known labels
1421        for label in labels {
1422            // Skip unknown labels (schemaless support)
1423            if schema.get_label_case_insensitive(label).is_none() {
1424                continue;
1425            }
1426            self.validate_vertex_constraints_for_label(vid, properties, label, tx_l0)
1427                .await?;
1428        }
1429
1430        // Check global ext_id uniqueness if ext_id is provided
1431        if let Some(ext_id) = properties.get("ext_id").and_then(|v| v.as_str()) {
1432            self.check_extid_globally_unique(ext_id, vid, tx_l0).await?;
1433        }
1434
1435        Ok(())
1436    }
1437
1438    /// Partial sibling of `validate_vertex_constraints` — validates only
1439    /// constraints touching keys present in `properties`. Used by
1440    /// `insert_vertex_partial`'s fast path; the caller pre-screens for
1441    /// multi-key UNIQUE constraints via `touched_needs_full_read`.
1442    async fn validate_vertex_constraints_partial(
1443        &self,
1444        vid: Vid,
1445        touched: &Properties,
1446        labels: &[String],
1447        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1448    ) -> Result<()> {
1449        let schema = self.schema_manager.schema();
1450        for label in labels {
1451            if schema.get_label_case_insensitive(label).is_none() {
1452                continue;
1453            }
1454            self.validate_vertex_constraints_for_label_partial(vid, touched, label, tx_l0)
1455                .await?;
1456        }
1457        if let Some(ext_id) = touched.get("ext_id").and_then(|v| v.as_str()) {
1458            self.check_extid_globally_unique(ext_id, vid, tx_l0).await?;
1459        }
1460        Ok(())
1461    }
1462
1463    /// Collect ext_ids and unique constraint keys from an iterator of vertex properties.
1464    ///
1465    /// Used to build a constraint key index from L0 buffers for batch validation.
1466    fn collect_constraint_keys_from_properties<'a>(
1467        properties_iter: impl Iterator<Item = &'a Properties>,
1468        label: &str,
1469        constraints: &[uni_common::core::schema::Constraint],
1470        existing_keys: &mut HashMap<String, HashSet<String>>,
1471        existing_extids: &mut HashSet<String>,
1472    ) {
1473        for props in properties_iter {
1474            if let Some(ext_id) = props.get("ext_id").and_then(|v| v.as_str()) {
1475                existing_extids.insert(ext_id.to_string());
1476            }
1477
1478            for constraint in constraints {
1479                if !constraint.enabled {
1480                    continue;
1481                }
1482                if let ConstraintTarget::Label(l) = &constraint.target {
1483                    if l != label {
1484                        continue;
1485                    }
1486                } else {
1487                    continue;
1488                }
1489
1490                if let ConstraintType::Unique {
1491                    properties: unique_props,
1492                } = &constraint.constraint_type
1493                {
1494                    let mut key_parts = Vec::new();
1495                    let mut all_present = true;
1496                    for prop in unique_props {
1497                        if let Some(val) = props.get(prop) {
1498                            key_parts.push(format!("{}:{}", prop, val));
1499                        } else {
1500                            all_present = false;
1501                            break;
1502                        }
1503                    }
1504                    if all_present {
1505                        let key = key_parts.join("|");
1506                        existing_keys
1507                            .entry(constraint.name.clone())
1508                            .or_default()
1509                            .insert(key);
1510                    }
1511                }
1512            }
1513        }
1514    }
1515
1516    /// Validates constraints for a batch of vertices efficiently.
1517    ///
1518    /// This method builds an in-memory index from L0 buffers ONCE instead of scanning
1519    /// per vertex, reducing complexity from O(n²) to O(n) for bulk inserts.
1520    ///
1521    /// # Arguments
1522    /// * `vids` - VIDs of vertices being inserted
1523    /// * `properties_batch` - Properties for each vertex
1524    /// * `label` - Label for all vertices (assumes single label for now)
1525    ///
1526    /// # Performance
1527    /// For N vertices with unique constraints:
1528    /// - Old approach: O(N²) - scan L0 buffer N times
1529    /// - New approach: O(N) - scan L0 buffer once, build HashSet, check each vertex in O(1)
1530    async fn validate_vertex_batch_constraints(
1531        &self,
1532        vids: &[Vid],
1533        properties_batch: &[Properties],
1534        label: &str,
1535        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1536    ) -> Result<()> {
1537        if vids.len() != properties_batch.len() {
1538            return Err(anyhow!("VID/properties length mismatch"));
1539        }
1540
1541        let schema = self.schema_manager.schema();
1542
1543        // 1. Validate NOT NULL constraints for each vertex
1544        if let Some(props_meta) = schema.properties.get(label) {
1545            for (idx, properties) in properties_batch.iter().enumerate() {
1546                for (prop_name, meta) in props_meta {
1547                    if !meta.nullable && properties.get(prop_name).is_none_or(|v| v.is_null()) {
1548                        return Err(anyhow!(
1549                            "Constraint violation at index {}: Property '{}' cannot be null",
1550                            idx,
1551                            prop_name
1552                        ));
1553                    }
1554                }
1555            }
1556        }
1557
1558        // 2. Build constraint key index from L0 buffers (ONCE for entire batch)
1559        let mut existing_keys: HashMap<String, HashSet<String>> = HashMap::new();
1560        let mut existing_extids: HashSet<String> = HashSet::new();
1561
1562        // Scan current L0 buffer
1563        {
1564            let l0 = self.l0_manager.get_current();
1565            let l0_guard = l0.read();
1566            Self::collect_constraint_keys_from_properties(
1567                l0_guard.vertex_properties.values(),
1568                label,
1569                &schema.constraints,
1570                &mut existing_keys,
1571                &mut existing_extids,
1572            );
1573        }
1574
1575        // Scan transaction L0 if present
1576        if let Some(tx_l0) = tx_l0 {
1577            let tx_l0_guard = tx_l0.read();
1578            Self::collect_constraint_keys_from_properties(
1579                tx_l0_guard.vertex_properties.values(),
1580                label,
1581                &schema.constraints,
1582                &mut existing_keys,
1583                &mut existing_extids,
1584            );
1585        }
1586
1587        // 3. Check batch vertices against index AND check for duplicates within batch
1588        let mut batch_keys: HashMap<String, HashMap<String, usize>> = HashMap::new();
1589        let mut batch_extids: HashMap<String, usize> = HashMap::new();
1590
1591        for (idx, (_vid, properties)) in vids.iter().zip(properties_batch.iter()).enumerate() {
1592            // Check ext_id uniqueness
1593            if let Some(ext_id) = properties.get("ext_id").and_then(|v| v.as_str()) {
1594                if existing_extids.contains(ext_id) {
1595                    return Err(anyhow!(
1596                        "Constraint violation at index {}: ext_id '{}' already exists",
1597                        idx,
1598                        ext_id
1599                    ));
1600                }
1601                if let Some(first_idx) = batch_extids.get(ext_id) {
1602                    return Err(anyhow!(
1603                        "Constraint violation: ext_id '{}' duplicated in batch at indices {} and {}",
1604                        ext_id,
1605                        first_idx,
1606                        idx
1607                    ));
1608                }
1609                // Also check the main vertices table — the L0 scans above
1610                // miss vertices already flushed to L1, so without this a
1611                // batch insert (e.g. a fork promote onto primary) silently
1612                // twins a duplicate ext_id instead of erroring. Mirrors the
1613                // single-vertex `check_extid_globally_unique`.
1614                if let Ok(Some(found_vid)) =
1615                    MainVertexDataset::find_by_ext_id(self.storage.backend(), ext_id, None).await
1616                {
1617                    return Err(anyhow!(
1618                        "Constraint violation at index {}: ext_id '{}' already exists (vertex {:?})",
1619                        idx,
1620                        ext_id,
1621                        found_vid
1622                    ));
1623                }
1624                batch_extids.insert(ext_id.to_string(), idx);
1625            }
1626
1627            // Check unique constraints
1628            for constraint in &schema.constraints {
1629                if !constraint.enabled {
1630                    continue;
1631                }
1632                if let ConstraintTarget::Label(l) = &constraint.target {
1633                    if l != label {
1634                        continue;
1635                    }
1636                } else {
1637                    continue;
1638                }
1639
1640                match &constraint.constraint_type {
1641                    ConstraintType::Unique {
1642                        properties: unique_props,
1643                    } => {
1644                        let mut key_parts = Vec::new();
1645                        let mut all_present = true;
1646                        for prop in unique_props {
1647                            if let Some(val) = properties.get(prop) {
1648                                key_parts.push(format!("{}:{}", prop, val));
1649                            } else {
1650                                all_present = false;
1651                                break;
1652                            }
1653                        }
1654
1655                        if all_present {
1656                            let key = key_parts.join("|");
1657
1658                            // Check against existing L0 keys
1659                            if let Some(keys) = existing_keys.get(&constraint.name)
1660                                && keys.contains(&key)
1661                            {
1662                                return Err(anyhow!(
1663                                    "Constraint violation at index {}: Duplicate composite key for label '{}' (constraint '{}')",
1664                                    idx,
1665                                    label,
1666                                    constraint.name
1667                                ));
1668                            }
1669
1670                            // Check for duplicates within batch
1671                            let batch_constraint_keys =
1672                                batch_keys.entry(constraint.name.clone()).or_default();
1673                            if let Some(first_idx) = batch_constraint_keys.get(&key) {
1674                                return Err(anyhow!(
1675                                    "Constraint violation: Duplicate key '{}' in batch at indices {} and {}",
1676                                    key,
1677                                    first_idx,
1678                                    idx
1679                                ));
1680                            }
1681                            batch_constraint_keys.insert(key, idx);
1682                        }
1683                    }
1684                    ConstraintType::Exists { property }
1685                        if properties.get(property).is_none_or(|v| v.is_null()) =>
1686                    {
1687                        return Err(anyhow!(
1688                            "Constraint violation at index {}: Property '{}' must exist",
1689                            idx,
1690                            property
1691                        ));
1692                    }
1693                    ConstraintType::Check { expression }
1694                        if !self.evaluate_check_constraint(expression, properties)? =>
1695                    {
1696                        return Err(anyhow!(
1697                            "Constraint violation at index {}: CHECK constraint '{}' violated",
1698                            idx,
1699                            constraint.name
1700                        ));
1701                    }
1702                    _ => {}
1703                }
1704            }
1705        }
1706
1707        // 4. Check storage for unique constraints (can batch this into a single query)
1708        for constraint in &schema.constraints {
1709            if !constraint.enabled {
1710                continue;
1711            }
1712            if let ConstraintTarget::Label(l) = &constraint.target {
1713                if l != label {
1714                    continue;
1715                }
1716            } else {
1717                continue;
1718            }
1719
1720            if let ConstraintType::Unique {
1721                properties: unique_props,
1722            } = &constraint.constraint_type
1723            {
1724                // Build compound OR filter for all batch vertices
1725                let mut or_filters = Vec::new();
1726                for properties in properties_batch.iter() {
1727                    let mut and_parts = Vec::new();
1728                    let mut all_present = true;
1729                    for prop in unique_props {
1730                        if let Some(val) = properties.get(prop) {
1731                            let val_str = match val {
1732                                Value::String(s) => format!("'{}'", s.replace('\'', "''")),
1733                                Value::Int(n) => n.to_string(),
1734                                Value::Float(f) => f.to_string(),
1735                                Value::Bool(b) => b.to_string(),
1736                                _ => {
1737                                    all_present = false;
1738                                    break;
1739                                }
1740                            };
1741                            and_parts.push(format!("{} = {}", prop, val_str));
1742                        } else {
1743                            all_present = false;
1744                            break;
1745                        }
1746                    }
1747                    if all_present {
1748                        or_filters.push(format!("({})", and_parts.join(" AND ")));
1749                    }
1750                }
1751
1752                #[cfg(feature = "lance-backend")]
1753                if !or_filters.is_empty() {
1754                    let vid_list: Vec<String> =
1755                        vids.iter().map(|v| v.as_u64().to_string()).collect();
1756                    let filter = format!(
1757                        "({}) AND _deleted = false AND _vid NOT IN ({})",
1758                        or_filters.join(" OR "),
1759                        vid_list.join(", ")
1760                    );
1761
1762                    if let Ok(ds) = self.storage.vertex_dataset(label)
1763                        && let Ok(lance_ds) = ds.open_raw().await
1764                    {
1765                        let count = lance_ds.count_rows(Some(filter.clone())).await?;
1766                        if count > 0 {
1767                            return Err(anyhow!(
1768                                "Constraint violation: Duplicate composite key for label '{}' in storage (constraint '{}')",
1769                                label,
1770                                constraint.name
1771                            ));
1772                        }
1773                    }
1774                }
1775            }
1776        }
1777
1778        Ok(())
1779    }
1780
1781    /// Checks that ext_id is globally unique across all vertices.
1782    ///
1783    /// Searches L0 buffers (current, transaction, pending) and the main vertices table
1784    /// to ensure no other vertex uses this ext_id.
1785    ///
1786    /// # Errors
1787    ///
1788    /// Returns error if another vertex with the same ext_id exists.
1789    async fn check_extid_globally_unique(
1790        &self,
1791        ext_id: &str,
1792        current_vid: Vid,
1793        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1794    ) -> Result<()> {
1795        // Check L0 buffers: current, transaction, and pending flush
1796        let l0_buffers_to_check: Vec<Arc<RwLock<L0Buffer>>> = {
1797            let mut buffers = vec![self.l0_manager.get_current()];
1798            if let Some(tx_l0) = tx_l0 {
1799                buffers.push(tx_l0.clone());
1800            }
1801            buffers.extend(self.l0_manager.get_pending_flush());
1802            buffers
1803        };
1804
1805        for l0 in &l0_buffers_to_check {
1806            // O(1) per buffer via the maintained `extid_index` (the previous
1807            // full `vertex_properties` scan made constrained ingest O(n²)).
1808            if let Some(&vid) = l0.read().extid_index.get(ext_id)
1809                && vid != current_vid
1810            {
1811                return Err(anyhow!(
1812                    "Constraint violation: ext_id '{}' already exists (vertex {:?})",
1813                    ext_id,
1814                    vid
1815                ));
1816            }
1817        }
1818
1819        // Check main vertices table (if it exists)
1820        // Pass None for global uniqueness check (not snapshot-isolated)
1821        let backend = self.storage.backend();
1822        if let Ok(Some(found_vid)) = MainVertexDataset::find_by_ext_id(backend, ext_id, None).await
1823            && found_vid != current_vid
1824        {
1825            return Err(anyhow!(
1826                "Constraint violation: ext_id '{}' already exists (vertex {:?})",
1827                ext_id,
1828                found_vid
1829            ));
1830        }
1831
1832        Ok(())
1833    }
1834
1835    /// Helper to get vertex labels from L0 buffer.
1836    fn get_vertex_labels_from_l0(&self, vid: Vid) -> Option<Vec<String>> {
1837        let l0 = self.l0_manager.get_current();
1838        let l0_guard = l0.read();
1839        // Check if vertex is tombstoned (deleted) - if so, return None
1840        if l0_guard.vertex_tombstones.contains(&vid) {
1841            return None;
1842        }
1843        l0_guard.get_vertex_labels(vid).map(|l| l.to_vec())
1844    }
1845
1846    /// Get vertex labels from all sources: current L0, pending L0s, and storage.
1847    /// This is the proper way to read vertex labels after a flush, as it checks both
1848    /// in-memory buffers and persisted storage.
1849    pub async fn get_vertex_labels(
1850        &self,
1851        vid: Vid,
1852        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1853    ) -> Option<Vec<String>> {
1854        // 1. Check current L0
1855        if let Some(labels) = self.get_vertex_labels_from_l0(vid) {
1856            return Some(labels);
1857        }
1858
1859        // 2. Check transaction L0 if present
1860        if let Some(tx_l0) = tx_l0 {
1861            let guard = tx_l0.read();
1862            if guard.vertex_tombstones.contains(&vid) {
1863                return None;
1864            }
1865            if let Some(labels) = guard.get_vertex_labels(vid) {
1866                return Some(labels.to_vec());
1867            }
1868        }
1869
1870        // 3. Check pending flush L0s
1871        for pending_l0 in self.l0_manager.get_pending_flush() {
1872            let guard = pending_l0.read();
1873            if guard.vertex_tombstones.contains(&vid) {
1874                return None;
1875            }
1876            if let Some(labels) = guard.get_vertex_labels(vid) {
1877                return Some(labels.to_vec());
1878            }
1879        }
1880
1881        // 4. Check storage
1882        self.find_vertex_labels_in_storage(vid).await.ok().flatten()
1883    }
1884
1885    /// Helper to get edge type from L0 buffer.
1886    fn get_edge_type_from_l0(&self, eid: Eid) -> Option<String> {
1887        let l0 = self.l0_manager.get_current();
1888        let l0_guard = l0.read();
1889        l0_guard.get_edge_type(eid).map(|s| s.to_string())
1890    }
1891
1892    /// Look up the edge type ID (u32) for an EID from the L0 buffer's edge endpoints.
1893    /// Falls back to the transaction L0 if available.
1894    pub fn get_edge_type_id_from_l0(
1895        &self,
1896        eid: Eid,
1897        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1898    ) -> Option<u32> {
1899        // Check transaction L0 first
1900        if let Some(tx_l0) = tx_l0 {
1901            let guard = tx_l0.read();
1902            if let Some((_, _, etype)) = guard.get_edge_endpoint_full(eid) {
1903                return Some(etype);
1904            }
1905        }
1906        // Fall back to main L0
1907        let l0 = self.l0_manager.get_current();
1908        let l0_guard = l0.read();
1909        l0_guard
1910            .get_edge_endpoint_full(eid)
1911            .map(|(_, _, etype)| etype)
1912    }
1913
1914    /// Set the type name for an edge (used for schemaless edge types).
1915    /// This is called during CREATE for edge types not found in the schema.
1916    pub fn set_edge_type(
1917        &self,
1918        eid: Eid,
1919        type_name: String,
1920        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1921    ) {
1922        self.resolve_l0(tx_l0).write().set_edge_type(eid, type_name);
1923    }
1924
1925    /// Evaluate a simple CHECK constraint expression.
1926    /// Supports: "property op value" (e.g., "age > 18", "status = 'active'")
1927    fn evaluate_check_constraint(&self, expression: &str, properties: &Properties) -> Result<bool> {
1928        let parts: Vec<&str> = expression.split_whitespace().collect();
1929        if parts.len() != 3 {
1930            // For now, only support "prop op val"
1931            // Fallback to true if too complex to avoid breaking, but warn
1932            log::warn!(
1933                "Complex CHECK constraint expression '{}' not fully supported yet; allowing write.",
1934                expression
1935            );
1936            return Ok(true);
1937        }
1938
1939        let prop_part = parts[0].trim_start_matches('(');
1940        // Handle "variable.property" format - take the part after the dot
1941        let prop_name = if let Some(idx) = prop_part.find('.') {
1942            &prop_part[idx + 1..]
1943        } else {
1944            prop_part
1945        };
1946
1947        let op = parts[1];
1948        let val_str = parts[2].trim_end_matches(')');
1949
1950        let prop_val = match properties.get(prop_name) {
1951            Some(v) => v,
1952            None => return Ok(true), // If property missing, CHECK usually passes (unless NOT NULL)
1953        };
1954
1955        // Parse value string (handle quotes for strings)
1956        let target_val = if (val_str.starts_with('\'') && val_str.ends_with('\''))
1957            || (val_str.starts_with('"') && val_str.ends_with('"'))
1958        {
1959            Value::String(val_str[1..val_str.len() - 1].to_string())
1960        } else if let Ok(n) = val_str.parse::<i64>() {
1961            Value::Int(n)
1962        } else if let Ok(n) = val_str.parse::<f64>() {
1963            Value::Float(n)
1964        } else if let Ok(b) = val_str.parse::<bool>() {
1965            Value::Bool(b)
1966        } else {
1967            // Check for internal format wrappers if they somehow leaked through
1968            if val_str.starts_with("Number(") && val_str.ends_with(')') {
1969                let n_str = &val_str[7..val_str.len() - 1];
1970                if let Ok(n) = n_str.parse::<i64>() {
1971                    Value::Int(n)
1972                } else if let Ok(n) = n_str.parse::<f64>() {
1973                    Value::Float(n)
1974                } else {
1975                    Value::String(val_str.to_string())
1976                }
1977            } else {
1978                Value::String(val_str.to_string())
1979            }
1980        };
1981
1982        match op {
1983            "=" | "==" => Ok(prop_val == &target_val),
1984            "!=" | "<>" => Ok(prop_val != &target_val),
1985            ">" => self
1986                .compare_values(prop_val, &target_val)
1987                .map(|o| o.is_gt()),
1988            "<" => self
1989                .compare_values(prop_val, &target_val)
1990                .map(|o| o.is_lt()),
1991            ">=" => self
1992                .compare_values(prop_val, &target_val)
1993                .map(|o| o.is_ge()),
1994            "<=" => self
1995                .compare_values(prop_val, &target_val)
1996                .map(|o| o.is_le()),
1997            _ => {
1998                log::warn!("Unsupported operator '{}' in CHECK constraint", op);
1999                Ok(true)
2000            }
2001        }
2002    }
2003
2004    fn compare_values(&self, a: &Value, b: &Value) -> Result<std::cmp::Ordering> {
2005        use std::cmp::Ordering;
2006
2007        fn cmp_f64(x: f64, y: f64) -> Ordering {
2008            x.partial_cmp(&y).unwrap_or(Ordering::Equal)
2009        }
2010
2011        match (a, b) {
2012            (Value::Int(n1), Value::Int(n2)) => Ok(n1.cmp(n2)),
2013            (Value::Float(f1), Value::Float(f2)) => Ok(cmp_f64(*f1, *f2)),
2014            (Value::Int(n), Value::Float(f)) => Ok(cmp_f64(*n as f64, *f)),
2015            (Value::Float(f), Value::Int(n)) => Ok(cmp_f64(*f, *n as f64)),
2016            (Value::String(s1), Value::String(s2)) => Ok(s1.cmp(s2)),
2017            _ => Err(anyhow!(
2018                "Cannot compare incompatible types: {:?} vs {:?}",
2019                a,
2020                b
2021            )),
2022        }
2023    }
2024
2025    async fn check_unique_constraint_multi(
2026        &self,
2027        label: &str,
2028        key_values: &[(String, Value)],
2029        current_vid: Vid,
2030        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2031    ) -> Result<()> {
2032        // Serialize constraint key once for O(1) lookups
2033        let key = serialize_constraint_key(label, key_values);
2034
2035        // 1. Check L0 (in-memory) using O(1) constraint index
2036        {
2037            let l0 = self.l0_manager.get_current();
2038            let l0_guard = l0.read();
2039            if l0_guard.has_constraint_key(&key, current_vid) {
2040                return Err(anyhow!(
2041                    "Constraint violation: Duplicate composite key for label '{}'",
2042                    label
2043                ));
2044            }
2045        }
2046
2047        // 1b. Check pending-flush buffers (Bug #9A). A flush rotates a key's
2048        // buffer onto `pending_flush` and installs a fresh empty current
2049        // buffer; until the rotated rows reach Lance the key is invisible to
2050        // both the current-buffer check above and the storage check below, so
2051        // a duplicate could slip through that flush window. Mirror the read
2052        // paths (e.g. `check_extid_globally_unique`, `get_vertex_labels`) that
2053        // already consult `pending_flush`.
2054        for pending_l0 in self.l0_manager.get_pending_flush() {
2055            if pending_l0.read().has_constraint_key(&key, current_vid) {
2056                return Err(anyhow!(
2057                    "Constraint violation: Duplicate composite key for label '{}' (in pending flush)",
2058                    label
2059                ));
2060            }
2061        }
2062
2063        // Check Transaction L0
2064        if let Some(tx_l0) = tx_l0 {
2065            let tx_l0_guard = tx_l0.read();
2066            if tx_l0_guard.has_constraint_key(&key, current_vid) {
2067                return Err(anyhow!(
2068                    "Constraint violation: Duplicate composite key for label '{}' (in tx)",
2069                    label
2070                ));
2071            }
2072        }
2073
2074        // 2. Check Storage (L1/L2)
2075        let filters: Vec<String> = key_values
2076            .iter()
2077            .map(|(prop, val)| {
2078                let val_str = match val {
2079                    Value::String(s) => format!("'{}'", s.replace('\'', "''")),
2080                    Value::Int(n) => n.to_string(),
2081                    Value::Float(f) => f.to_string(),
2082                    Value::Bool(b) => b.to_string(),
2083                    _ => "NULL".to_string(),
2084                };
2085                format!("{} = {}", prop, val_str)
2086            })
2087            .collect();
2088
2089        let mut filter = filters.join(" AND ");
2090        filter.push_str(&format!(
2091            " AND _deleted = false AND _vid != {}",
2092            current_vid.as_u64()
2093        ));
2094
2095        #[cfg(feature = "lance-backend")]
2096        if let Ok(ds) = self.storage.vertex_dataset(label)
2097            && let Ok(lance_ds) = ds.open_raw().await
2098        {
2099            let count = lance_ds.count_rows(Some(filter.clone())).await?;
2100            if count > 0 {
2101                return Err(anyhow!(
2102                    "Constraint violation: Duplicate composite key for label '{}' (in storage). Filter: {}",
2103                    label,
2104                    filter
2105                ));
2106            }
2107        }
2108
2109        Ok(())
2110    }
2111
2112    async fn check_write_pressure(&self) -> Result<()> {
2113        let status = self
2114            .storage
2115            .compaction_status()
2116            .map_err(|e| anyhow::anyhow!("Failed to get compaction status: {}", e))?;
2117        let l1_runs = status.l1_runs;
2118        let throttle = &self.config.throttle;
2119
2120        if l1_runs >= throttle.hard_limit {
2121            log::warn!("Write stalled: L1 runs ({}) at hard limit", l1_runs);
2122            // Simple polling for now
2123            while self
2124                .storage
2125                .compaction_status()
2126                .map_err(|e| anyhow::anyhow!("Failed to get compaction status: {}", e))?
2127                .l1_runs
2128                >= throttle.hard_limit
2129            {
2130                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2131            }
2132        } else if l1_runs >= throttle.soft_limit {
2133            let excess = l1_runs - throttle.soft_limit;
2134            // Cap multiplier to avoid overflow
2135            let excess = std::cmp::min(excess, 31);
2136            let multiplier = 2_u32.pow(excess as u32);
2137            let delay = throttle.base_delay * multiplier;
2138            tokio::time::sleep(delay).await;
2139        }
2140        Ok(())
2141    }
2142
2143    /// Check transaction memory limit to prevent OOM.
2144    /// No-op when no transaction is active.
2145    fn check_transaction_memory(&self, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) -> Result<()> {
2146        if let Some(tx_l0) = tx_l0 {
2147            let size = tx_l0.read().estimated_size;
2148            if size > self.config.max_transaction_memory {
2149                return Err(anyhow!(
2150                    "Transaction memory limit exceeded: {} bytes used, limit is {} bytes. \
2151                     Roll back or commit the current transaction.",
2152                    size,
2153                    self.config.max_transaction_memory
2154                ));
2155            }
2156        }
2157        Ok(())
2158    }
2159
2160    async fn get_query_context(
2161        &self,
2162        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2163    ) -> Option<QueryContext> {
2164        Some(QueryContext::new_with_pending(
2165            self.l0_manager.get_current(),
2166            tx_l0.cloned(),
2167            self.l0_manager.get_pending_flush(),
2168        ))
2169    }
2170
2171    /// Layer-1 CRDT variant enforcement, shared by the single-vertex and batch
2172    /// write paths.
2173    ///
2174    /// Rejects a declared CRDT property written as a parsed CRDT value
2175    /// (`Value::Map`) whose variant differs from the schema's declared variant.
2176    /// A mismatch would make the commit-time merge silently overwrite instead of
2177    /// merge, and the OCC CRDT carve-out (`occ::crdt_carveout_overwrite` /
2178    /// `WriteSet::from_l0`) would hide it as a lost update — so it must be caught
2179    /// at write time, on *every* write path. `try_as_crdt` is `Map`-gated, so the
2180    /// JSON-string (Cypher) form and non-CRDT values pass through untouched: they
2181    /// are never carved out and stay conflictable.
2182    fn enforce_crdt_variants(
2183        props_meta: &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
2184        properties: &Properties,
2185    ) -> Result<()> {
2186        for (key, value) in properties {
2187            let Some(meta) = props_meta.get(key) else {
2188                continue;
2189            };
2190            let uni_common::core::schema::DataType::Crdt(expected) = &meta.r#type else {
2191                continue;
2192            };
2193            if let Some(crdt) = crate::runtime::l0::try_as_crdt(value)
2194                && crdt.type_name() != expected.type_name()
2195            {
2196                return Err(anyhow::Error::new(uni_common::UniError::Constraint {
2197                    message: format!(
2198                        "CRDT property '{key}' must be written as a {} value",
2199                        expected.type_name()
2200                    ),
2201                }));
2202            }
2203        }
2204        Ok(())
2205    }
2206
2207    /// Prepare a vertex for upsert by merging CRDT properties with existing values.
2208    ///
2209    /// When `label` is provided, uses it directly to look up property metadata.
2210    /// Otherwise falls back to discovering the label from L0 buffers and storage.
2211    ///
2212    /// # Errors
2213    ///
2214    /// Returns an error if CRDT property merging fails.
2215    async fn prepare_vertex_upsert(
2216        &self,
2217        vid: Vid,
2218        properties: &mut Properties,
2219        label: Option<&str>,
2220        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2221    ) -> Result<()> {
2222        let Some(pm) = &self.property_manager else {
2223            return Ok(());
2224        };
2225
2226        let schema = self.schema_manager.schema();
2227
2228        // Resolve label: use provided label or discover from L0/storage
2229        let discovered_labels;
2230        let label_name = if let Some(l) = label {
2231            Some(l)
2232        } else {
2233            discovered_labels = self.get_vertex_labels(vid, tx_l0).await;
2234            discovered_labels
2235                .as_ref()
2236                .and_then(|l| l.first().map(|s| s.as_str()))
2237        };
2238
2239        let Some(label_str) = label_name else {
2240            return Ok(());
2241        };
2242        let Some(props_meta) = schema.properties.get(label_str) else {
2243            return Ok(());
2244        };
2245
2246        // Identify CRDT properties in the insert data
2247        let crdt_keys: Vec<String> = properties
2248            .keys()
2249            .filter(|key| {
2250                props_meta.get(*key).is_some_and(|meta| {
2251                    matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2252                })
2253            })
2254            .cloned()
2255            .collect();
2256
2257        if crdt_keys.is_empty() {
2258            return Ok(());
2259        }
2260
2261        // Enforce that each declared CRDT property written as a parsed CRDT value
2262        // (`Value::Map`) carries its declared variant. A mismatched variant makes
2263        // `merge_crdt_properties` overwrite rather than merge at commit, and the
2264        // OCC carve-out (`occ::crdt_carveout_overwrite` / `WriteSet::from_l0`)
2265        // would hide that as a silent lost update — reject it at the source.
2266        //
2267        // Only the `Map` form is checked: it is exactly the form the carve-out
2268        // applies to (`try_as_crdt` is `Map`-gated). A CRDT written as a JSON
2269        // string (the Cypher form) or a non-CRDT value is never carved out — it
2270        // stays conflictable — so it poses no carve-out soundness risk and is left
2271        // to the existing merge/parse path. This is the declared-property half of
2272        // the layered fix; the commit-time check covers undeclared CRDT-shaped values.
2273        Self::enforce_crdt_variants(props_meta, properties)?;
2274
2275        let ctx = self.get_query_context(tx_l0).await;
2276        for key in crdt_keys {
2277            let existing = pm.get_vertex_prop_with_ctx(vid, &key, ctx.as_ref()).await?;
2278            if !existing.is_null()
2279                && let Some(val) = properties.get_mut(&key)
2280            {
2281                *val = pm.merge_crdt_values(&existing, val)?;
2282            }
2283        }
2284
2285        Ok(())
2286    }
2287
2288    async fn prepare_edge_upsert(
2289        &self,
2290        eid: Eid,
2291        properties: &mut Properties,
2292        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2293    ) -> Result<()> {
2294        if let Some(pm) = &self.property_manager {
2295            let schema = self.schema_manager.schema();
2296            // Get edge type from L0 buffer instead of from EID
2297            let type_name = self.get_edge_type_from_l0(eid);
2298
2299            if let Some(ref t_name) = type_name
2300                && let Some(props_meta) = schema.properties.get(t_name)
2301            {
2302                let mut crdt_keys = Vec::new();
2303                for (key, _) in properties.iter() {
2304                    if let Some(meta) = props_meta.get(key)
2305                        && matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2306                    {
2307                        crdt_keys.push(key.clone());
2308                    }
2309                }
2310
2311                if !crdt_keys.is_empty() {
2312                    let ctx = self.get_query_context(tx_l0).await;
2313                    for key in crdt_keys {
2314                        let existing = pm.get_edge_prop(eid, &key, ctx.as_ref()).await?;
2315
2316                        if !existing.is_null()
2317                            && let Some(val) = properties.get_mut(&key)
2318                        {
2319                            *val = pm.merge_crdt_values(&existing, val)?;
2320                        }
2321                    }
2322                }
2323            }
2324        }
2325        Ok(())
2326    }
2327
2328    #[instrument(skip(self, properties), level = "trace")]
2329    pub async fn insert_vertex(
2330        &self,
2331        vid: Vid,
2332        properties: Properties,
2333        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2334    ) -> Result<()> {
2335        self.insert_vertex_with_labels(vid, properties, &[], tx_l0)
2336            .await?;
2337        Ok(())
2338    }
2339
2340    /// Component C1 (G4): before a non-transactional mutation merges into main
2341    /// L0, if an outstanding snapshot pins the current generation, freeze it
2342    /// aside so snapshots taken *before* this write stay isolated from it.
2343    ///
2344    /// `flush_lock` (acquired and released here) serializes the freeze against
2345    /// concurrent commit-time freezes/merges, matching the atomicity the tx
2346    /// commit path gets. No-op for transactional writes (their freeze happens at
2347    /// commit) and — the common case — when nothing is pinned, where it costs one
2348    /// atomic load. Freezes at most once per pinned generation: the freeze
2349    /// installs a fresh unpinned `current`, so later writes in the same bulk
2350    /// import see no pin and merge in place, and the snapshot keeps reading the
2351    /// frozen pre-import buffer.
2352    async fn freeze_for_non_tx_write_if_pinned(&self, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) {
2353        // Self-gates on the runtime SSI toggle: nothing pins a snapshot unless a
2354        // transaction began under `ssi_enabled`, so `is_current_pinned()` is
2355        // always false (one atomic load) when SSI is off.
2356        if tx_l0.is_none() && self.l0_manager.is_current_pinned() {
2357            let _flush_lock_guard = self.flush_lock.lock().await;
2358            // Re-check under the lock: a concurrent commit may have frozen first.
2359            if self.l0_manager.is_current_pinned() {
2360                self.l0_manager.freeze_current_for_snapshot();
2361                metrics::counter!("uni_l0_snapshot_freezes_total").increment(1);
2362            }
2363        }
2364    }
2365
2366    #[instrument(skip(self, properties, labels), level = "trace")]
2367    pub async fn insert_vertex_with_labels(
2368        &self,
2369        vid: Vid,
2370        mut properties: Properties,
2371        labels: &[String],
2372        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2373    ) -> Result<Properties> {
2374        let start = std::time::Instant::now();
2375        self.check_write_pressure().await?;
2376        self.check_transaction_memory(tx_l0)?;
2377
2378        // Component C1 (G4): a non-transactional write (`tx_l0 == None`, e.g. bulk
2379        // import / LOAD CSV) mutates main L0 directly, outside the commit-time
2380        // snapshot freeze. Freeze the pinned generation aside first so snapshots
2381        // taken before this write stay isolated from it.
2382        self.freeze_for_non_tx_write_if_pinned(tx_l0).await;
2383
2384        if !self.try_defer_embedding(labels, &properties, vid, tx_l0) {
2385            self.process_embeddings_for_labels(labels, &mut properties)
2386                .await?;
2387        }
2388        self.validate_vertex_constraints(vid, &properties, labels, tx_l0)
2389            .await?;
2390        self.prepare_vertex_upsert(
2391            vid,
2392            &mut properties,
2393            labels.first().map(|s| s.as_str()),
2394            tx_l0,
2395        )
2396        .await?;
2397
2398        // Clone properties and labels before moving into L0 to return them and populate constraint index
2399        let properties_copy = properties.clone();
2400        let labels_copy = labels.to_vec();
2401
2402        {
2403            // For a non-tx write, re-resolve the live `current` buffer and hold
2404            // `flush_lock` across the (synchronous) write so a concurrent flush
2405            // rotate (which takes `flush_lock` via `begin_flush`) cannot install
2406            // a fresh `current` between resolve and write, dropping our write
2407            // (Bug #4). For a tx write `resolve_l0` returns the tx-private
2408            // buffer, never the rotating `current`, so no `flush_lock` is needed
2409            // (and taking it would risk re-entrancy with the commit path).
2410            let _flush_lock_guard = if tx_l0.is_none() {
2411                Some(self.flush_lock.lock().await)
2412            } else {
2413                None
2414            };
2415            let l0 = self.resolve_l0(tx_l0);
2416            let mut l0_guard = l0.write();
2417            // Generation chaining: a non-tx CRDT write into the (post-freeze,
2418            // possibly empty) current buffer must merge against the chained
2419            // committed state, not shadow it. No-op when the chain is empty
2420            // or this is a tx-private write.
2421            if tx_l0.is_none() {
2422                let pending = self.l0_manager.get_pending_flush();
2423                if !pending.is_empty() {
2424                    Self::seed_crdt_props(&pending, vid, &properties, &mut l0_guard);
2425                }
2426            }
2427            l0_guard.insert_vertex_with_labels(vid, properties, labels);
2428
2429            // Populate constraint index for O(1) duplicate detection
2430            let schema = self.schema_manager.schema();
2431            for label in &labels_copy {
2432                if schema.get_label_case_insensitive(label).is_none() {
2433                    if self.config.strict_schema {
2434                        return Err(anyhow::anyhow!(
2435                            "Label '{}' is not defined in the schema \
2436                             (strict_schema is enabled).",
2437                            label
2438                        ));
2439                    }
2440                    continue; // Schemaless: skip unknown labels.
2441                }
2442
2443                // For each unique constraint on this label, insert into constraint index
2444                for constraint in &schema.constraints {
2445                    if !constraint.enabled {
2446                        continue;
2447                    }
2448                    if let ConstraintTarget::Label(l) = &constraint.target {
2449                        if l != label {
2450                            continue;
2451                        }
2452                    } else {
2453                        continue;
2454                    }
2455
2456                    if let ConstraintType::Unique {
2457                        properties: unique_props,
2458                    } = &constraint.constraint_type
2459                    {
2460                        let mut key_values = Vec::new();
2461                        let mut all_present = true;
2462                        for prop in unique_props {
2463                            if let Some(val) = properties_copy.get(prop) {
2464                                key_values.push((prop.clone(), val.clone()));
2465                            } else {
2466                                all_present = false;
2467                                break;
2468                            }
2469                        }
2470
2471                        if all_present {
2472                            let key = serialize_constraint_key(label, &key_values);
2473                            l0_guard.insert_constraint_key(key, vid);
2474                        }
2475                    }
2476                }
2477            }
2478        }
2479
2480        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2481        self.update_metrics();
2482
2483        if tx_l0.is_none() {
2484            self.check_flush().await?;
2485        }
2486        if start.elapsed().as_millis() > 100 {
2487            log::warn!("Slow insert_vertex: {}ms", start.elapsed().as_millis());
2488        }
2489        Ok(properties_copy)
2490    }
2491
2492    /// True iff routing this partial write through MergeInsert would
2493    /// miss a constraint check. Specifically: a multi-key UNIQUE
2494    /// constraint where the touched-set doesn't cover all member keys
2495    /// requires the unchanged keys from the existing row to compute
2496    /// the composite. Conservative: also returns true if any touched
2497    /// key is `ext_id` (uniqueness checked globally — handled in the
2498    /// full-row path).
2499    fn touched_needs_full_read(&self, touched: &Properties, labels: &[String]) -> bool {
2500        if touched.contains_key("ext_id") {
2501            return true;
2502        }
2503        let schema = self.schema_manager.schema();
2504        for label in labels {
2505            if schema.get_label_case_insensitive(label).is_none() {
2506                continue;
2507            }
2508            for constraint in &schema.constraints {
2509                if !constraint.enabled {
2510                    continue;
2511                }
2512                if let ConstraintTarget::Label(l) = &constraint.target {
2513                    if !l.eq_ignore_ascii_case(label) {
2514                        continue;
2515                    }
2516                } else {
2517                    continue;
2518                }
2519                if let ConstraintType::Unique {
2520                    properties: unique_props,
2521                } = &constraint.constraint_type
2522                {
2523                    if unique_props.len() < 2 {
2524                        continue; // single-key UNIQUE — partial path sees the key
2525                    }
2526                    if unique_props.iter().any(|p| touched.contains_key(p)) {
2527                        return true;
2528                    }
2529                }
2530            }
2531        }
2532        false
2533    }
2534
2535    /// Insert a vertex's FULL property row plus a touched-keys hint so
2536    /// the flush emits ONLY those columns via Lance MergeInsert.
2537    ///
2538    /// Caller must have read the full row (via PropertyManager) and
2539    /// applied SET-touched values on top before calling — same input
2540    /// shape as `insert_vertex_with_labels`. The new arg `touched_keys`
2541    /// is the set of property keys this SET statement actually
2542    /// assigned; L0 records it in `vertex_partial_keys[vid]` and the
2543    /// flush filters the MergeInsert source schema down to those keys.
2544    /// When `UniConfig::partial_lance_writes == false`, falls through
2545    /// to `insert_vertex_with_labels` (Append) — preserving bit-for-bit
2546    /// equivalence with prior releases.
2547    #[instrument(skip(self, props, touched_keys, labels), level = "trace")]
2548    pub async fn insert_vertex_partial_full(
2549        &self,
2550        vid: Vid,
2551        mut props: Properties,
2552        touched_keys: HashSet<String>,
2553        labels: &[String],
2554        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2555    ) -> Result<()> {
2556        if !self.config.partial_lance_writes
2557            || self.touched_needs_full_read(&props_subset(&props, &touched_keys), labels)
2558        {
2559            self.insert_vertex_with_labels(vid, props, labels, tx_l0)
2560                .await?;
2561            return Ok(());
2562        }
2563
2564        self.check_write_pressure().await?;
2565        self.check_transaction_memory(tx_l0)?;
2566        if !self.try_defer_embedding(labels, &props, vid, tx_l0) {
2567            self.process_embeddings_for_labels(labels, &mut props)
2568                .await?;
2569        }
2570        // Full-row validation runs because we have the complete map;
2571        // no need for the partial-only validator.
2572        self.validate_vertex_constraints(vid, &props, labels, tx_l0)
2573            .await?;
2574        {
2575            let l0 = self.resolve_l0(tx_l0);
2576            let mut l0_guard = l0.write();
2577            l0_guard.insert_vertex_partial_full(vid, props, touched_keys, labels);
2578        }
2579        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2580        metrics::counter!("uni_partial_writes_total").increment(1);
2581        self.update_metrics();
2582        if tx_l0.is_none() {
2583            self.check_flush().await?;
2584        }
2585        Ok(())
2586    }
2587
2588    /// Insert a vertex's *partial* property set without first reading the
2589    /// full row.
2590    ///
2591    /// When `WriterConfig::partial_lance_writes` is `true`, the touched
2592    /// keys flow into `L0Buffer::vertex_partial_keys` so the next flush
2593    /// emits them via Lance `MergeInsertBuilder` against a subset-of-
2594    /// schema source — preserving untouched columns (e.g., embeddings)
2595    /// byte-equal in Lance with no read at the caller and no write of
2596    /// those columns.
2597    ///
2598    /// When the flag is `false`, this falls back to the existing
2599    /// `insert_vertex_with_labels` path after merging `touched` with
2600    /// the current properties from L0/storage. The caller can therefore
2601    /// use this entry point unconditionally; the optimization activates
2602    /// only when the flag is on.
2603    #[instrument(skip(self, touched, labels), level = "trace")]
2604    pub async fn insert_vertex_partial(
2605        &self,
2606        vid: Vid,
2607        touched: Properties,
2608        labels: &[String],
2609        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2610    ) -> Result<()> {
2611        let needs_full_read =
2612            !self.config.partial_lance_writes || self.touched_needs_full_read(&touched, labels);
2613        if needs_full_read {
2614            // Flag-off fallback (or constraint-driven fallback): merge
2615            // `touched` with the current full property snapshot from
2616            // L0/storage and route through the existing path. Preserves
2617            // bit-for-bit equivalence with the pre-Round-11 release.
2618            let existing = if let Some(pm) = &self.property_manager {
2619                pm.get_all_vertex_props_with_ctx(vid, None)
2620                    .await
2621                    .unwrap_or_default()
2622                    .unwrap_or_default()
2623            } else {
2624                Properties::new()
2625            };
2626            let mut merged = existing;
2627            for (k, v) in touched {
2628                merged.insert(k, v);
2629            }
2630            self.insert_vertex_with_labels(vid, merged, labels, tx_l0)
2631                .await?;
2632            return Ok(());
2633        }
2634
2635        // Flag-on fast path: stage the partial update directly. Pressure
2636        // checks, embedding generation, constraint validation all still
2637        // run — but the validator is the partial-aware variant that
2638        // skips NOT NULL / multi-key UNIQUE / CHECK / EXISTS for
2639        // properties not present in `touched`. Multi-key UNIQUE that
2640        // overlaps the touched set forces a fallback above via
2641        // `touched_needs_full_read`.
2642        let mut touched = touched;
2643        self.check_write_pressure().await?;
2644        self.check_transaction_memory(tx_l0)?;
2645        if !self.try_defer_embedding(labels, &touched, vid, tx_l0) {
2646            self.process_embeddings_for_labels(labels, &mut touched)
2647                .await?;
2648        }
2649        self.validate_vertex_constraints_partial(vid, &touched, labels, tx_l0)
2650            .await?;
2651
2652        {
2653            let l0 = self.resolve_l0(tx_l0);
2654            let mut l0_guard = l0.write();
2655            l0_guard.insert_vertex_partial(vid, touched, labels);
2656        }
2657
2658        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2659        metrics::counter!("uni_partial_writes_total").increment(1);
2660        self.update_metrics();
2661        if tx_l0.is_none() {
2662            self.check_flush().await?;
2663        }
2664        Ok(())
2665    }
2666
2667    /// Insert multiple vertices with batched operations.
2668    ///
2669    /// This method uses batched operations to achieve O(N) complexity instead of O(N²)
2670    /// for bulk inserts with unique constraints.
2671    ///
2672    /// # Performance Improvements
2673    /// - Batch VID allocation: 1 call instead of N calls
2674    /// - Batch constraint validation: O(N) instead of O(N²)
2675    /// - Batch embedding generation: 1 API call per config instead of N calls
2676    /// - Transaction wrapping: Automatic flush deferral, atomicity
2677    ///
2678    /// # Arguments
2679    /// * `vids` - Pre-allocated VIDs for the vertices
2680    /// * `properties_batch` - Properties for each vertex
2681    /// * `labels` - Labels for all vertices (assumes single label for simplicity)
2682    ///
2683    /// # Errors
2684    /// Returns error if:
2685    /// - VID/properties length mismatch
2686    /// - Constraint violation detected
2687    /// - Embedding generation fails
2688    /// - Transaction commit fails
2689    ///
2690    /// # Atomicity
2691    /// If this method fails, all changes are rolled back (if transaction was started here).
2692    pub async fn insert_vertices_batch(
2693        &self,
2694        vids: Vec<Vid>,
2695        mut properties_batch: Vec<Properties>,
2696        labels: Vec<String>,
2697        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2698    ) -> Result<Vec<Properties>> {
2699        let start = std::time::Instant::now();
2700
2701        // Validate inputs
2702        if vids.len() != properties_batch.len() {
2703            return Err(anyhow!(
2704                "VID/properties size mismatch: {} vids, {} properties",
2705                vids.len(),
2706                properties_batch.len()
2707            ));
2708        }
2709
2710        if vids.is_empty() {
2711            return Ok(Vec::new());
2712        }
2713
2714        // Batch operations — writes go directly to the resolved L0.
2715        // Atomicity is guaranteed by the caller holding the writer lock.
2716        let result = async {
2717            self.check_write_pressure().await?;
2718            self.check_transaction_memory(tx_l0)?;
2719
2720            // Component C1 (G4): batch bulk-import is the canonical non-tx write —
2721            // freeze the pinned generation aside before merging so snapshot
2722            // readers stay isolated. No-op when unpinned or transactional.
2723            self.freeze_for_non_tx_write_if_pinned(tx_l0).await;
2724
2725            // Batch embedding generation (1 API call per config)
2726            self.process_embeddings_for_batch(&labels, &mut properties_batch)
2727                .await?;
2728
2729            // Batch constraint validation (O(N) instead of O(N²))
2730            let label = labels
2731                .first()
2732                .ok_or_else(|| anyhow!("No labels provided"))?;
2733            self.validate_vertex_batch_constraints(&vids, &properties_batch, label, tx_l0)
2734                .await?;
2735
2736            // Batch prepare (CRDT merging if needed)
2737            // Check schema once: skip entirely if no CRDT properties for this label.
2738            // For new vertices (freshly allocated VIDs), there are no existing CRDT
2739            // values to merge, so the per-vertex lookup is unnecessary in that case.
2740            let has_crdt_fields = {
2741                let schema = self.schema_manager.schema();
2742                schema
2743                    .properties
2744                    .get(label.as_str())
2745                    .is_some_and(|props_meta| {
2746                        props_meta.values().any(|meta| {
2747                            matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2748                        })
2749                    })
2750            };
2751
2752            if has_crdt_fields {
2753                // Layer-1 variant enforcement (G3): the batch path must reject a
2754                // declared-CRDT variant mismatch exactly as the single-vertex
2755                // `prepare_vertex_upsert` does. Without this, a wrong-variant CRDT
2756                // written via batch import slips past write-time validation and
2757                // the OCC carve-out then masks the overwrite as a lost update.
2758                {
2759                    let schema = self.schema_manager.schema();
2760                    if let Some(props_meta) = schema.properties.get(label.as_str()) {
2761                        for props in &properties_batch {
2762                            Self::enforce_crdt_variants(props_meta, props)?;
2763                        }
2764                    }
2765                }
2766
2767                // Batch fetch existing CRDT values: collect VIDs that need merging,
2768                // then query once via PropertyManager instead of per-vertex lookups.
2769                let schema = self.schema_manager.schema();
2770                let crdt_keys: Vec<String> = schema
2771                    .properties
2772                    .get(label.as_str())
2773                    .map(|props_meta| {
2774                        props_meta
2775                            .iter()
2776                            .filter(|(_, meta)| {
2777                                matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2778                            })
2779                            .map(|(key, _)| key.clone())
2780                            .collect()
2781                    })
2782                    .unwrap_or_default();
2783
2784                if let Some(pm) = &self.property_manager {
2785                    let ctx = self.get_query_context(tx_l0).await;
2786                    for (vid, props) in vids.iter().zip(&mut properties_batch) {
2787                        for key in &crdt_keys {
2788                            if props.contains_key(key) {
2789                                let existing =
2790                                    pm.get_vertex_prop_with_ctx(*vid, key, ctx.as_ref()).await?;
2791                                if !existing.is_null()
2792                                    && let Some(val) = props.get_mut(key)
2793                                {
2794                                    *val = pm.merge_crdt_values(&existing, val)?;
2795                                }
2796                            }
2797                        }
2798                    }
2799                }
2800            }
2801
2802            // Batch L0 writes — route to active L0 (transaction L0 if active, else current).
2803            let target_l0 = self.resolve_l0(tx_l0);
2804
2805            let properties_result = properties_batch.clone();
2806            {
2807                let mut l0_guard = target_l0.write();
2808                for (vid, props) in vids.iter().zip(properties_batch.iter()) {
2809                    l0_guard.insert_vertex_with_labels(*vid, props.clone(), &labels);
2810                }
2811            }
2812
2813            // Update metrics (batch increment)
2814            metrics::counter!("uni_l0_buffer_mutations_total").increment(vids.len() as u64);
2815            self.update_metrics();
2816
2817            Ok::<Vec<Properties>, anyhow::Error>(properties_result)
2818        }
2819        .await;
2820
2821        let props = result?;
2822
2823        if start.elapsed().as_millis() > 100 {
2824            log::warn!(
2825                "Slow insert_vertices_batch ({} vertices): {}ms",
2826                vids.len(),
2827                start.elapsed().as_millis()
2828            );
2829        }
2830
2831        Ok(props)
2832    }
2833
2834    /// Delete a vertex by VID.
2835    ///
2836    /// When `labels` is provided, uses them directly to populate L0 for
2837    /// correct tombstone flushing. Otherwise discovers labels from L0
2838    /// buffers and storage (which can be slow for many vertices).
2839    ///
2840    /// # Errors
2841    ///
2842    /// Returns an error if write pressure stalls, label lookup fails, or
2843    /// the L0 delete operation fails.
2844    #[instrument(skip(self, labels), level = "trace")]
2845    pub async fn delete_vertex(
2846        &self,
2847        vid: Vid,
2848        labels: Option<Vec<String>>,
2849        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2850    ) -> Result<()> {
2851        let start = std::time::Instant::now();
2852        self.check_write_pressure().await?;
2853        self.check_transaction_memory(tx_l0)?;
2854        self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
2855
2856        // Before deleting, ensure we have the vertex's labels stored in L0 so
2857        // the tombstone can be flushed to the correct label datasets. Discover
2858        // them up front (this may await storage) WITHOUT pinning the buffer we
2859        // will eventually mutate — for non-tx writes the live `current` buffer
2860        // is re-resolved below under `flush_lock`, so a concurrent rotate can't
2861        // drop our write (Bug #4). `resolve_l0` here is only used for cheap
2862        // reads that tolerate a racing rotate.
2863        let has_labels = {
2864            let l0_guard = self.resolve_l0(tx_l0);
2865            let guard = l0_guard.read();
2866            guard.vertex_labels.contains_key(&vid)
2867        };
2868
2869        let backfill_labels = if has_labels {
2870            None
2871        } else if let Some(provided) = labels {
2872            // Caller provided labels — skip the lookup entirely
2873            Some(provided)
2874        } else {
2875            // Discover labels from pending flush L0s, then storage
2876            let mut found = None;
2877            for pending_l0 in self.l0_manager.get_pending_flush() {
2878                let pending_guard = pending_l0.read();
2879                if let Some(l) = pending_guard.get_vertex_labels(vid) {
2880                    found = Some(l.to_vec());
2881                    break;
2882                }
2883            }
2884            if found.is_none() {
2885                found = self.find_vertex_labels_in_storage(vid).await?;
2886            }
2887            found
2888        };
2889
2890        // Test-only seam (no-op without the `failpoints` feature): pause a
2891        // non-transactional delete AFTER the awaited label discovery but BEFORE
2892        // it re-resolves the live buffer and writes the tombstone. A concurrent
2893        // flush can rotate+complete a buffer in this window; the fix re-resolves
2894        // `get_current()` and mutates it under `flush_lock`, so the tombstone
2895        // always lands in the live buffer (Bug #4 — silent lost delete across
2896        // L0 rotation).
2897        fail::fail_point!("nontx::after-capture");
2898
2899        // Apply the label backfill and the tombstone together. For a non-tx
2900        // write, hold `flush_lock` across the (synchronous) re-resolve + write
2901        // so a concurrent flush rotate (which takes `flush_lock` via
2902        // `begin_flush`) cannot install a fresh `current` between our resolve
2903        // and our write. For a tx write `resolve_l0` returns the tx-private
2904        // buffer (never the rotating `current`), so no `flush_lock` is needed
2905        // — and taking it there would risk re-entrancy with the commit path.
2906        if tx_l0.is_none() {
2907            let _flush_lock_guard = self.flush_lock.lock().await;
2908            let l0 = self.l0_manager.get_current();
2909            let mut guard = l0.write();
2910            if let Some(found_labels) = backfill_labels {
2911                guard.vertex_labels.insert(vid, found_labels);
2912            }
2913            guard.delete_vertex(vid)?;
2914        } else {
2915            let l0 = self.resolve_l0(tx_l0);
2916            let mut guard = l0.write();
2917            if let Some(found_labels) = backfill_labels {
2918                guard.vertex_labels.insert(vid, found_labels);
2919            }
2920            guard.delete_vertex(vid)?;
2921        }
2922        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2923        self.update_metrics();
2924
2925        if tx_l0.is_none() {
2926            self.check_flush().await?;
2927        }
2928        if start.elapsed().as_millis() > 100 {
2929            log::warn!("Slow delete_vertex: {}ms", start.elapsed().as_millis());
2930        }
2931        Ok(())
2932    }
2933
2934    /// Find vertex labels from storage by querying the main vertices table.
2935    /// Returns the labels from the latest non-deleted version of the vertex.
2936    async fn find_vertex_labels_in_storage(&self, vid: Vid) -> Result<Option<Vec<String>>> {
2937        use crate::backend::types::ScanRequest;
2938        use arrow_array::Array;
2939        use arrow_array::cast::AsArray;
2940
2941        let backend = self.storage.backend();
2942        let table_name = MainVertexDataset::table_name();
2943
2944        // Check if table exists first; if not, vertex hasn't been flushed to storage yet
2945        if !backend.table_exists(table_name).await? {
2946            return Ok(None);
2947        }
2948
2949        // Query for this specific vid (don't filter by _deleted yet - we need to find the latest version first)
2950        let filter = format!("_vid = {}", vid.as_u64());
2951        let batches = backend
2952            .scan(
2953                ScanRequest::all(table_name)
2954                    .with_filter(filter)
2955                    .with_columns(vec![
2956                        "_vid".to_string(),
2957                        "labels".to_string(),
2958                        "_version".to_string(),
2959                        "_deleted".to_string(),
2960                    ]),
2961            )
2962            .await
2963            .unwrap_or_default();
2964
2965        // Find the row with the highest version number
2966        let mut max_version: Option<u64> = None;
2967        let mut labels: Option<Vec<String>> = None;
2968        let mut is_deleted = false;
2969
2970        for batch in batches {
2971            if batch.num_rows() == 0 {
2972                continue;
2973            }
2974
2975            let version_array = batch
2976                .column_by_name("_version")
2977                .unwrap()
2978                .as_primitive::<arrow_array::types::UInt64Type>();
2979
2980            let deleted_array = batch.column_by_name("_deleted").unwrap().as_boolean();
2981
2982            let labels_array = batch.column_by_name("labels").unwrap().as_list::<i32>();
2983
2984            for row_idx in 0..batch.num_rows() {
2985                let version = version_array.value(row_idx);
2986
2987                if max_version.is_none_or(|mv| version > mv) {
2988                    is_deleted = deleted_array.value(row_idx);
2989
2990                    let labels_list = labels_array.value(row_idx);
2991                    let string_array = labels_list.as_string::<i32>();
2992                    let vertex_labels: Vec<String> = (0..string_array.len())
2993                        .filter(|&i| !string_array.is_null(i))
2994                        .map(|i| string_array.value(i).to_string())
2995                        .collect();
2996
2997                    max_version = Some(version);
2998                    labels = Some(vertex_labels);
2999                }
3000            }
3001        }
3002
3003        // If the latest version is deleted, return None
3004        if is_deleted { Ok(None) } else { Ok(labels) }
3005    }
3006
3007    #[expect(clippy::too_many_arguments)]
3008    #[instrument(skip(self, props, touched_keys), level = "trace")]
3009    pub async fn insert_edge_partial_full(
3010        &self,
3011        src_vid: Vid,
3012        dst_vid: Vid,
3013        edge_type: u32,
3014        eid: Eid,
3015        props: Properties,
3016        edge_type_name: Option<String>,
3017        touched_keys: HashSet<String>,
3018        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
3019    ) -> Result<()> {
3020        self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
3021        if !self.config.partial_lance_writes {
3022            return self
3023                .insert_edge(
3024                    src_vid,
3025                    dst_vid,
3026                    edge_type,
3027                    eid,
3028                    props,
3029                    edge_type_name,
3030                    tx_l0,
3031                )
3032                .await;
3033        }
3034
3035        let start = std::time::Instant::now();
3036        self.check_write_pressure().await?;
3037        self.check_transaction_memory(tx_l0)?;
3038        let mut props = props;
3039        self.prepare_edge_upsert(eid, &mut props, tx_l0).await?;
3040
3041        let l0 = self.resolve_l0(tx_l0);
3042        l0.write().insert_edge_partial_full(
3043            src_vid,
3044            dst_vid,
3045            edge_type,
3046            eid,
3047            props,
3048            edge_type_name,
3049            touched_keys,
3050        )?;
3051
3052        if tx_l0.is_none() {
3053            let version = l0.read().current_version;
3054            self.adjacency_manager
3055                .insert_edge(src_vid, dst_vid, eid, edge_type, version);
3056        }
3057
3058        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
3059        metrics::counter!("uni_partial_writes_total").increment(1);
3060        self.update_metrics();
3061        if tx_l0.is_none() {
3062            self.check_flush().await?;
3063        }
3064        if start.elapsed().as_millis() > 100 {
3065            log::warn!(
3066                "Slow insert_edge_partial_full: {}ms",
3067                start.elapsed().as_millis()
3068            );
3069        }
3070        Ok(())
3071    }
3072
3073    #[expect(clippy::too_many_arguments)]
3074    pub async fn insert_edge(
3075        &self,
3076        src_vid: Vid,
3077        dst_vid: Vid,
3078        edge_type: u32,
3079        eid: Eid,
3080        mut properties: Properties,
3081        edge_type_name: Option<String>,
3082        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
3083    ) -> Result<()> {
3084        let start = std::time::Instant::now();
3085        self.check_write_pressure().await?;
3086        self.check_transaction_memory(tx_l0)?;
3087        self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
3088        self.prepare_edge_upsert(eid, &mut properties, tx_l0)
3089            .await?;
3090
3091        let l0 = self.resolve_l0(tx_l0);
3092        l0.write()
3093            .insert_edge(src_vid, dst_vid, edge_type, eid, properties, edge_type_name)?;
3094
3095        // Dual-write to AdjacencyManager overlay (survives flush).
3096        // Skip for transaction-local L0 -- transaction edges are overlaid separately.
3097        if tx_l0.is_none() {
3098            let version = l0.read().current_version;
3099            self.adjacency_manager
3100                .insert_edge(src_vid, dst_vid, eid, edge_type, version);
3101        }
3102
3103        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
3104        self.update_metrics();
3105
3106        if tx_l0.is_none() {
3107            self.check_flush().await?;
3108        }
3109        if start.elapsed().as_millis() > 100 {
3110            log::warn!("Slow insert_edge: {}ms", start.elapsed().as_millis());
3111        }
3112        Ok(())
3113    }
3114
3115    #[instrument(skip(self), level = "trace")]
3116    pub async fn delete_edge(
3117        &self,
3118        eid: Eid,
3119        src_vid: Vid,
3120        dst_vid: Vid,
3121        edge_type: u32,
3122        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
3123    ) -> Result<()> {
3124        let start = std::time::Instant::now();
3125        self.check_write_pressure().await?;
3126        self.check_transaction_memory(tx_l0)?;
3127        self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
3128        let l0 = self.resolve_l0(tx_l0);
3129
3130        l0.write().delete_edge(eid, src_vid, dst_vid, edge_type)?;
3131
3132        // Dual-write tombstone to AdjacencyManager overlay.
3133        if tx_l0.is_none() {
3134            let version = l0.read().current_version;
3135            self.adjacency_manager
3136                .add_tombstone(eid, src_vid, dst_vid, edge_type, version);
3137        }
3138        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
3139        self.update_metrics();
3140
3141        if tx_l0.is_none() {
3142            self.check_flush().await?;
3143        }
3144        if start.elapsed().as_millis() > 100 {
3145            log::warn!("Slow delete_edge: {}ms", start.elapsed().as_millis());
3146        }
3147        Ok(())
3148    }
3149
3150    /// Decide whether a flush should be triggered based on mutation count
3151    /// or elapsed time since the last flush.
3152    ///
3153    /// Extracted from [`Writer::check_flush`] so `commit_transaction_l0` can
3154    /// reuse the decision while bypassing the lock-acquiring entry point
3155    /// (it already holds `flush_lock`).
3156    fn should_flush(&self) -> bool {
3157        let count = self.l0_manager.get_current().read().mutation_count;
3158        if count == 0 {
3159            return false;
3160        }
3161        if count >= self.config.auto_flush_threshold {
3162            return true;
3163        }
3164        if let Some(interval) = self.config.auto_flush_interval
3165            && self.last_flush_time.lock().elapsed() >= interval
3166            && count >= self.config.auto_flush_min_mutations
3167        {
3168            return true;
3169        }
3170        false
3171    }
3172
3173    /// Check if flush should be triggered based on mutation count or time elapsed.
3174    /// This method is called after each write operation and can also be called
3175    /// by a background task for time-based flushing.
3176    pub async fn check_flush(&self) -> Result<()> {
3177        if self.should_flush() {
3178            self.flush_to_l1(None).await?;
3179        }
3180        Ok(())
3181    }
3182
3183    /// Process embeddings for a vertex using labels passed directly.
3184    /// Use this when labels haven't been stored to L0 yet.
3185    async fn process_embeddings_for_labels(
3186        &self,
3187        labels: &[String],
3188        properties: &mut Properties,
3189    ) -> Result<()> {
3190        let label_name = labels.first().map(|s| s.as_str());
3191        self.process_embeddings_impl(label_name, properties).await
3192    }
3193
3194    /// Phase B: if `defer_embeddings` is enabled in `UniConfig` and the
3195    /// vertex has an embedding config that hasn't been satisfied by the
3196    /// caller-provided properties, enqueue the VID in
3197    /// `L0Buffer::pending_embeddings` and return `true`. The caller then
3198    /// skips `process_embeddings_for_labels` and the embedding is computed
3199    /// in a single batched call at flush time via
3200    /// `drain_pending_embeddings`.
3201    ///
3202    /// Returns `false` (caller falls back to today's per-row eager embed)
3203    /// if any of:
3204    ///  - the flag is off,
3205    ///  - no label has an embedding config,
3206    ///  - the user already provided the target property (matches the
3207    ///    existing skip-if-present semantics at writer.rs:2727).
3208    ///
3209    /// Trade-off: when deferral is active, in-tx reads of the embedding
3210    /// column return only what was already in storage (or nothing for
3211    /// brand-new vertices). Existing tests that RETURN n.embedding in
3212    /// the same tx as a SET on the source column must run with the flag
3213    /// off; opt in only when no such reads happen between write and
3214    /// commit.
3215    fn try_defer_embedding(
3216        &self,
3217        labels: &[String],
3218        properties: &Properties,
3219        vid: Vid,
3220        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
3221    ) -> bool {
3222        if !self.config.defer_embeddings {
3223            return false;
3224        }
3225        let Some(label) = labels.first() else {
3226            return false;
3227        };
3228
3229        let schema = self.schema_manager.schema();
3230        let mut has_unsatisfied_cfg = false;
3231        for idx in &schema.indexes {
3232            if let IndexDefinition::Vector(v_cfg) = idx
3233                && v_cfg.label == *label
3234                && v_cfg.embedding_config.is_some()
3235                && !properties.contains_key(&v_cfg.property)
3236            {
3237                has_unsatisfied_cfg = true;
3238                break;
3239            }
3240        }
3241        if !has_unsatisfied_cfg {
3242            return false;
3243        }
3244
3245        let l0 = self.resolve_l0(tx_l0);
3246        let mut guard = l0.write();
3247        guard.pending_embeddings.insert(vid, label.clone());
3248        true
3249    }
3250
3251    /// Drain `pending_embeddings` from the rotated old-L0 right before
3252    /// `flush_stream_l1` reads it. Groups by label, issues one batched
3253    /// `process_embeddings_for_batch` call per label, and writes the
3254    /// resulting embedding vectors into each VID's `vertex_properties`
3255    /// map. After this returns, the flush proceeds against an L0 that
3256    /// looks no different from one whose embeddings were generated
3257    /// per-row at insert.
3258    ///
3259    /// Idempotent: a VID whose embedding was already materialized
3260    /// (e.g., by on-demand read paths in a future Phase B revision) is
3261    /// detected via `properties.contains_key(target_prop)` inside
3262    /// `process_embeddings_for_batch` (writer.rs:~2650), so re-running
3263    /// the drain is safe.
3264    async fn drain_pending_embeddings(&self, old_l0_arc: &Arc<RwLock<L0Buffer>>) -> Result<()> {
3265        let by_label: HashMap<String, Vec<Vid>> = {
3266            let guard = old_l0_arc.read();
3267            if guard.pending_embeddings.is_empty() {
3268                return Ok(());
3269            }
3270            let mut m: HashMap<String, Vec<Vid>> = HashMap::new();
3271            for (vid, label) in &guard.pending_embeddings {
3272                m.entry(label.clone()).or_default().push(*vid);
3273            }
3274            m
3275        };
3276
3277        for (label, vids) in by_label {
3278            let mut properties_batch: Vec<Properties> = {
3279                let guard = old_l0_arc.read();
3280                vids.iter()
3281                    .map(|vid| {
3282                        guard
3283                            .vertex_properties
3284                            .get(vid)
3285                            .cloned()
3286                            .unwrap_or_default()
3287                    })
3288                    .collect()
3289            };
3290
3291            self.process_embeddings_for_batch(std::slice::from_ref(&label), &mut properties_batch)
3292                .await?;
3293
3294            let mut guard = old_l0_arc.write();
3295            for (vid, props) in vids.iter().zip(properties_batch) {
3296                let target = guard.vertex_properties.entry(*vid).or_default();
3297                for (k, v) in props {
3298                    target.insert(k, v);
3299                }
3300                guard.pending_embeddings.remove(vid);
3301            }
3302        }
3303        Ok(())
3304    }
3305
3306    /// Process embeddings for a batch of vertices efficiently.
3307    ///
3308    /// Groups vertices by embedding config and makes batched API calls to the
3309    /// embedding service instead of calling once per vertex.
3310    ///
3311    /// # Performance
3312    /// For N vertices with embedding config:
3313    /// - Old approach: N API calls to embedding service
3314    /// - New approach: 1 API call per embedding config (usually 1 total)
3315    async fn process_embeddings_for_batch(
3316        &self,
3317        labels: &[String],
3318        properties_batch: &mut [Properties],
3319    ) -> Result<()> {
3320        let label_name = labels.first().map(|s| s.as_str());
3321        let schema = self.schema_manager.schema();
3322
3323        if let Some(label) = label_name {
3324            // Find vector indexes with embedding config for this label
3325            let mut configs = Vec::new();
3326            for idx in &schema.indexes {
3327                if let IndexDefinition::Vector(v_config) = idx
3328                    && v_config.label == label
3329                    && let Some(emb_config) = &v_config.embedding_config
3330                {
3331                    configs.push((v_config.property.clone(), emb_config.clone()));
3332                }
3333            }
3334
3335            if configs.is_empty() {
3336                return Ok(());
3337            }
3338
3339            for (target_prop, emb_config) in configs {
3340                // Collect input texts from all vertices that need embeddings
3341                let mut input_texts: Vec<String> = Vec::new();
3342                let mut needs_embedding: Vec<usize> = Vec::new();
3343
3344                for (idx, properties) in properties_batch.iter().enumerate() {
3345                    // Skip if target property already exists
3346                    if properties.contains_key(&target_prop) {
3347                        continue;
3348                    }
3349
3350                    // Check if source properties exist
3351                    let mut inputs = Vec::new();
3352                    for src_prop in &emb_config.source_properties {
3353                        if let Some(val) = properties.get(src_prop)
3354                            && let Some(s) = val.as_str()
3355                        {
3356                            inputs.push(s.to_string());
3357                        }
3358                    }
3359
3360                    if !inputs.is_empty() {
3361                        let input_text = inputs.join(" ");
3362                        let input_text = match &emb_config.document_prefix {
3363                            Some(prefix) => format!("{prefix}{input_text}"),
3364                            None => input_text,
3365                        };
3366                        input_texts.push(input_text);
3367                        needs_embedding.push(idx);
3368                    }
3369                }
3370
3371                if input_texts.is_empty() {
3372                    continue;
3373                }
3374
3375                let runtime = self.xervo_runtime.get().ok_or_else(|| {
3376                    anyhow!("Uni-Xervo runtime not configured for auto-embedding")
3377                })?;
3378                let embedder = runtime.embedding(&emb_config.alias).await?;
3379
3380                // Batch generate embeddings (single API call)
3381                let input_refs: Vec<&str> = input_texts.iter().map(|s| s.as_str()).collect();
3382                let embeddings = embedder.embed(input_refs).await?;
3383
3384                // Distribute results back to properties
3385                for (embedding_idx, &prop_idx) in needs_embedding.iter().enumerate() {
3386                    if let Some(vec) = embeddings.get(embedding_idx) {
3387                        let vals: Vec<Value> =
3388                            vec.iter().map(|f| Value::Float(*f as f64)).collect();
3389                        properties_batch[prop_idx].insert(target_prop.clone(), Value::List(vals));
3390                    }
3391                }
3392            }
3393        }
3394
3395        Ok(())
3396    }
3397
3398    async fn process_embeddings_impl(
3399        &self,
3400        label_name: Option<&str>,
3401        properties: &mut Properties,
3402    ) -> Result<()> {
3403        let schema = self.schema_manager.schema();
3404
3405        if let Some(label) = label_name {
3406            // Find vector indexes with embedding config for this label
3407            let mut configs = Vec::new();
3408            for idx in &schema.indexes {
3409                if let IndexDefinition::Vector(v_config) = idx
3410                    && v_config.label == label
3411                    && let Some(emb_config) = &v_config.embedding_config
3412                {
3413                    configs.push((v_config.property.clone(), emb_config.clone()));
3414                }
3415            }
3416
3417            if configs.is_empty() {
3418                log::info!("No embedding config found for label {}", label);
3419            }
3420
3421            for (target_prop, emb_config) in configs {
3422                // If target property already exists, skip (assume user provided it)
3423                if properties.contains_key(&target_prop) {
3424                    continue;
3425                }
3426
3427                // Check if source properties exist
3428                let mut inputs = Vec::new();
3429                for src_prop in &emb_config.source_properties {
3430                    if let Some(val) = properties.get(src_prop)
3431                        && let Some(s) = val.as_str()
3432                    {
3433                        inputs.push(s.to_string());
3434                    }
3435                }
3436
3437                if inputs.is_empty() {
3438                    continue;
3439                }
3440
3441                let input_text = inputs.join(" ");
3442                let input_text = match &emb_config.document_prefix {
3443                    Some(prefix) => format!("{prefix}{input_text}"),
3444                    None => input_text,
3445                };
3446
3447                let runtime = self.xervo_runtime.get().ok_or_else(|| {
3448                    anyhow!("Uni-Xervo runtime not configured for auto-embedding")
3449                })?;
3450                let embedder = runtime.embedding(&emb_config.alias).await?;
3451
3452                // Generate
3453                let embeddings = embedder.embed(vec![input_text.as_str()]).await?;
3454                if let Some(vec) = embeddings.first() {
3455                    // Store as array of floats
3456                    let vals: Vec<Value> = vec.iter().map(|f| Value::Float(*f as f64)).collect();
3457                    properties.insert(target_prop.clone(), Value::List(vals));
3458                }
3459            }
3460        }
3461        Ok(())
3462    }
3463
3464    /// Flushes the current in-memory L0 buffer to L1 storage.
3465    ///
3466    /// # Lock Ordering
3467    ///
3468    /// To prevent deadlocks, locks must be acquired in the following order:
3469    /// 1. `Writer` lock (held by caller via outer `Arc<RwLock<Writer>>`; removed in Phase 4)
3470    /// 2. `flush_lock` (acquired by this entry point; held across the whole flush)
3471    /// 3. `L0Manager` lock (via `begin_flush` / `get_current`)
3472    /// 4. `L0Buffer` lock (individual buffer RWLocks)
3473    /// 5. `Index` / `Storage` locks (during actual flush)
3474    ///
3475    /// Callers that already hold `flush_lock` (today only `commit_transaction_l0`)
3476    /// must call `flush_inline_under_lock` (private) directly to avoid a re-entrant
3477    /// `tokio::sync::Mutex` deadlock — see concurrent_writer.md §5.5.
3478    pub async fn flush_to_l1(&self, name: Option<String>) -> Result<String> {
3479        // Drain any in-flight async flushes first. `flush_to_l1` is a
3480        // SYNCHRONIZATION BARRIER — callers (test fixtures, fork
3481        // setup, shutdown paths) rely on it as "all writes are now
3482        // durably in Lance". Without the drain, an async stream from
3483        // a recent commit might still be writing to Lance when
3484        // `flush_to_l1` returns, leaving a window where forks branch
3485        // off pre-write Lance state and lose data.
3486        if let Some(coord) = self.flush_coordinator.as_ref() {
3487            let _ = coord.drain(self.config.drop_fork_drain_timeout).await;
3488        }
3489        let _flush_lock_guard = self.flush_lock.lock().await;
3490        self.flush_inline_under_lock(name).await
3491    }
3492
3493    /// Flush L0→L1 and capture the fork point under one held `flush_lock`.
3494    ///
3495    /// Drains in-flight async flushes, takes `flush_lock`, runs the inline
3496    /// flush, and then — still holding the lock — reads the allocator
3497    /// high-water marks and each existing candidate dataset's Lance
3498    /// version. Capturing under the held lock is what makes the fork point
3499    /// atomic: no concurrent commit can advance the allocator and no
3500    /// concurrent flush can advance a dataset tip between the flush and the
3501    /// reads. See [`ForkPoint`].
3502    ///
3503    /// `candidate_dataset_names` are resolved to `{base_uri}/{name}.lance`;
3504    /// names with no `.lance` directory on disk are skipped (returned map
3505    /// has no entry for them), matching the fork branch loop's existence
3506    /// check.
3507    ///
3508    /// # Errors
3509    /// Propagates flush failures from `flush_inline_under_lock` and any
3510    /// per-dataset version read failure from `lance_branch::current_version`.
3511    ///
3512    /// # Deadlocks
3513    /// Must not be called by a task already holding `flush_lock` (e.g.
3514    /// `commit_transaction_l0`); the `tokio::sync::Mutex` is not reentrant.
3515    /// Fork creation never holds the lock, so the sole call site is safe.
3516    pub async fn flush_and_capture_fork_point(
3517        &self,
3518        candidate_dataset_names: &[String],
3519    ) -> Result<ForkPoint> {
3520        if let Some(coord) = self.flush_coordinator.as_ref() {
3521            let _ = coord.drain(self.config.drop_fork_drain_timeout).await;
3522        }
3523        let _flush_lock_guard = self.flush_lock.lock().await;
3524        self.flush_inline_under_lock(None).await?;
3525
3526        // Still under `flush_lock`: capture the allocator HWM, the MVCC
3527        // version HWM, and every existing dataset's Lance version so
3528        // nothing can interleave.
3529        let (vid_hwm, eid_hwm) = self.allocator.current_hwm().await;
3530        // The parent's current L0 version is the largest `_version` any
3531        // inherited row can carry (flushed or in-memory). A fork bootstraps
3532        // its version floor to this so a fork tx read still sees inherited
3533        // rows. Cheap read lock; no buffer clone.
3534        let version_hwm = self.l0_manager.get_current().read().current_version;
3535
3536        let base = self.storage.base_uri();
3537        let mut dataset_versions = BTreeMap::new();
3538        for name in candidate_dataset_names {
3539            let uri = join_lance_uri(base, name);
3540            if !lance_path_exists(&uri) {
3541                continue;
3542            }
3543            let version = crate::backend::lance_branch::current_version(&uri).await?;
3544            dataset_versions.insert(name.clone(), version);
3545        }
3546
3547        Ok(ForkPoint {
3548            vid_hwm,
3549            eid_hwm,
3550            dataset_versions,
3551            version_hwm,
3552        })
3553    }
3554
3555    /// Async-flush entry point: rotate under `flush_lock`, release the
3556    /// lock, then submit the stream phase to the [`FlushCoordinator`].
3557    /// Returns a [`FlushTicket`](crate::runtime::flush_coordinator::FlushTicket)
3558    /// that resolves when finalize completes.
3559    ///
3560    /// Errors if `config.async_flush_enabled = false` (the coordinator
3561    /// is `None` in that case — see `flush_coordinator` field doc).
3562    pub async fn flush_to_l1_async(
3563        self: &Arc<Self>,
3564        name: Option<String>,
3565    ) -> Result<crate::runtime::flush_coordinator::FlushTicket> {
3566        let coord = self
3567            .flush_coordinator
3568            .as_ref()
3569            .ok_or_else(|| anyhow!("async flush not enabled (config.async_flush_enabled=false)"))?
3570            .clone();
3571        // 1. Acquire permit FIRST (outside flush_lock) so we don't
3572        //    introduce a permit-while-holding-flush-lock convoy.
3573        let permit = coord.acquire_permit().await?;
3574        // 2. Rotate under flush_lock (µs work), then allocate the rotate seq
3575        //    and bump pending ONLY after the rotate succeeds (Bug #3). A failed
3576        //    rotate (the `?` below) must consume neither: the finalizer
3577        //    advances in strictly consecutive seq order and only decrements
3578        //    pending on finalize, so a leaked seq/pending would wedge it
3579        //    forever. The seq is allocated under `flush_lock`, immediately
3580        //    after the rotate and before the guard drops, so concurrent rotates
3581        //    keep seq order == rotation order, and the seq is unused until
3582        //    submit. On the `?` error path the permit drops, freeing the slot.
3583        let (
3584            RotateOutput {
3585                old_l0_arc,
3586                wal_lsn,
3587                current_version,
3588                flush_in_progress_guard,
3589            },
3590            seq,
3591        ) = {
3592            let _flush_lock_guard = self.flush_lock.lock().await;
3593            let rotate_out = self.flush_l0_rotate().await?;
3594            let seq = coord.next_rotate_seq();
3595            coord.note_pending();
3596            (rotate_out, seq)
3597        };
3598        // 3. Build the coordinator's RotatedFlush. parent_manifest is the
3599        //    cached_manifest snapshot at this moment.
3600        let parent_manifest = self.cached_manifest.lock().clone();
3601        let rotated = RotatedFlush {
3602            seq,
3603            old_l0_arc: old_l0_arc.clone(),
3604            wal_lsn,
3605            current_version,
3606            name: name.clone(),
3607            parent_manifest,
3608            permit,
3609            flush_in_progress_guard,
3610        };
3611        // 4. Spawn the stream phase via the coordinator. The closure
3612        //    captures Arc<Writer> transiently — drops when stream
3613        //    completes (bounded, ~50-500 ms).
3614        let writer = self.clone();
3615        let ticket = coord.submit_for_stream(rotated, move |old_l0, wal, ver, n| async move {
3616            let outcome = writer.flush_stream_l1(old_l0, wal, ver, n).await?;
3617            Ok(crate::runtime::flush_coordinator::FlushOutcome {
3618                new_manifest: outcome.manifest,
3619                snapshot_id: outcome.snapshot_id,
3620            })
3621        });
3622        Ok(ticket)
3623    }
3624
3625    /// Phase A+B+C of the flush: flush the WAL, rotate L0 (so the
3626    /// to-be-flushed buffer moves to `pending_flush` and a fresh L0 takes
3627    /// its place), and hand off the WAL to the new L0.
3628    ///
3629    /// Runs in microseconds. Must be called under `flush_lock` (the caller
3630    /// is responsible). The returned [`RotateOutput`] carries everything
3631    /// the subsequent stream + finalize phases need; in particular the
3632    /// [`FlushInProgressGuard`] is bound to the return value so it stays
3633    /// alive for the full flush lifetime — including any future async
3634    /// path where stream runs on a spawned task.
3635    async fn flush_l0_rotate(&self) -> Result<RotateOutput> {
3636        // Acquire the in-progress counter BEFORE any heavy work. The
3637        // guard lives on RotateOutput; dropping RotateOutput drops the
3638        // guard, so the counter goes back to zero exactly when the flush
3639        // is fully done.
3640        let flush_in_progress_guard = FlushInProgressGuard::new(&self.storage);
3641
3642        // A. Flush WAL BEFORE rotating L0. If WAL flush fails, the
3643        // current L0 is still active and mutations are retained in
3644        // memory until restart/retry.
3645        let wal_for_truncate = {
3646            let current_l0 = self.l0_manager.get_current();
3647            let l0_guard = current_l0.read();
3648            l0_guard.wal.clone()
3649        };
3650        // Test-only seam (no-op without the `failpoints` feature): inject a
3651        // WAL-flush failure here to drive the "failed async rotate wedges the
3652        // finalizer" regression (Bug #3). When configured to "return" it makes
3653        // `flush_l0_rotate` return Err exactly as a real WAL-flush failure would.
3654        fail::fail_point!("flush::rotate-fail", |_| {
3655            Err(anyhow!("flush::rotate-fail injected WAL-flush failure"))
3656        });
3657        let wal_lsn = if let Some(ref w) = wal_for_truncate {
3658            w.flush().await?
3659        } else {
3660            0
3661        };
3662
3663        // B. Begin flush: rotate L0 and keep old L0 visible to reads via
3664        // pending_flush until complete_flush is called by finalize.
3665        let old_l0_arc = self.l0_manager.begin_flush(0, None);
3666        metrics::counter!("uni_l0_buffer_rotations_total").increment(1);
3667
3668        // C. WAL handoff: record wal_lsn on old L0, transfer WAL handle
3669        // and current_version to the new L0.
3670        let current_version;
3671        {
3672            let mut old_l0_guard = old_l0_arc.write();
3673            current_version = old_l0_guard.current_version;
3674            old_l0_guard.wal_lsn_at_flush = wal_lsn;
3675            let wal = old_l0_guard.wal.take();
3676            let new_l0_arc = self.l0_manager.get_current();
3677            let mut new_l0_guard = new_l0_arc.write();
3678            new_l0_guard.wal = wal;
3679            new_l0_guard.current_version = current_version;
3680        }
3681
3682        Ok(RotateOutput {
3683            old_l0_arc,
3684            wal_lsn,
3685            current_version,
3686            flush_in_progress_guard,
3687        })
3688    }
3689
3690    /// Phases D, E, F, G of the flush: L1 collect, orphan resolve,
3691    /// manifest seed, Lance writes. Reads from `old_l0_arc` (kept in
3692    /// pending_flush by Phase B); writes append-only Lance datasets; does
3693    /// NOT call save_snapshot / set_latest_snapshot — those are
3694    /// finalize's job, so the manifest doesn't get published until the
3695    /// next phase.
3696    ///
3697    /// Today takes `&self`; in a follow-up commit this becomes a
3698    /// static `Send + 'static` function over `SharedFlushCtx` so it can
3699    /// run on a spawned task while concurrent commits proceed.
3700    async fn flush_stream_l1(
3701        &self,
3702        old_l0_arc: Arc<RwLock<L0Buffer>>,
3703        wal_lsn: u64,
3704        current_version: u64,
3705        name: Option<String>,
3706    ) -> Result<FlushOutcome> {
3707        // Test-only seam (no-op without the `failpoints` feature): the rotate
3708        // (begin_flush) already moved the to-be-flushed buffer onto
3709        // pending_flush and installed a fresh empty current buffer, but the
3710        // rotated rows are NOT yet durable in Lance. Pausing here holds that
3711        // window open to drive the unique-constraint-hole regression
3712        // (Bug #9 Mechanism A).
3713        fail::fail_point!("flush::after-rotate-before-lance");
3714
3715        // Phase B: materialize any deferred embeddings before column
3716        // extraction. No-op when `defer_embeddings` is off (the set will
3717        // be empty). On-demand reads of the embedding column are a TODO
3718        // for a future revision (see UniConfig::defer_embeddings docs).
3719        self.drain_pending_embeddings(&old_l0_arc).await?;
3720
3721        let schema = self.schema_manager.schema();
3722        // 2. Acquire Read lock on Old L0 for flushing
3723        let mut entries_by_type: HashMap<u32, Vec<L1Entry>> = HashMap::new();
3724        // (Vid, labels, properties, deleted, version)
3725        type VertexEntry = (Vid, Vec<String>, Properties, bool, u64);
3726        let mut vertices_by_label: HashMap<u16, Vec<VertexEntry>> = HashMap::new();
3727        // Partial-column updates (Lance MergeInsert path). Per-VID tuple:
3728        // (vid, full L0 properties map, version, set of keys to update).
3729        // Only the keys in the HashSet are emitted to the partial source;
3730        // the full props map is retained so the per-row column extractor
3731        // can read each touched key's value.
3732        type PartialEntry = (Vid, Properties, u64, std::collections::HashSet<String>);
3733        let mut partial_by_label: HashMap<u16, Vec<PartialEntry>> = HashMap::new();
3734        // DELETE-via-MergeInsert (Round-12 §B): tombstones flush as a
3735        // partial source with just `_vid`, `_deleted=true`, `_version`,
3736        // `_updated_at`. Skips the wide-row Append payload that adds
3737        // nothing on a soft-delete.
3738        let mut tombstones_by_label: HashMap<u16, Vec<(Vid, u64)>> = HashMap::new();
3739        let mut main_vertex_tombstones: Vec<(Vid, u64)> = Vec::new();
3740        // Collect vertex timestamps from L0 for flushing to storage
3741        let mut vertex_created_at: HashMap<Vid, i64> = HashMap::new();
3742        let mut vertex_updated_at: HashMap<Vid, i64> = HashMap::new();
3743        // Track tombstones missing labels for storage query fallback
3744        let mut orphaned_tombstones: Vec<(Vid, u64)> = Vec::new();
3745
3746        {
3747            let old_l0 = old_l0_arc.read();
3748
3749            // 1. Collect all edges and tombstones from L0
3750            for edge in old_l0.graph.edges() {
3751                let properties = old_l0
3752                    .edge_properties
3753                    .get(&edge.eid)
3754                    .cloned()
3755                    .unwrap_or_default();
3756                let version = old_l0.edge_versions.get(&edge.eid).copied().unwrap_or(0);
3757
3758                // Get timestamps from L0 buffer (populated during insert)
3759                let created_at = old_l0.edge_created_at.get(&edge.eid).copied();
3760                let updated_at = old_l0.edge_updated_at.get(&edge.eid).copied();
3761
3762                entries_by_type
3763                    .entry(edge.edge_type)
3764                    .or_default()
3765                    .push(L1Entry {
3766                        src_vid: edge.src_vid,
3767                        dst_vid: edge.dst_vid,
3768                        eid: edge.eid,
3769                        op: Op::Insert,
3770                        version,
3771                        properties,
3772                        created_at,
3773                        updated_at,
3774                    });
3775            }
3776
3777            // From tombstones
3778            for tombstone in old_l0.tombstones.values() {
3779                let version = old_l0
3780                    .edge_versions
3781                    .get(&tombstone.eid)
3782                    .copied()
3783                    .unwrap_or(0);
3784                // Get timestamps - for deletes, updated_at reflects deletion time
3785                let created_at = old_l0.edge_created_at.get(&tombstone.eid).copied();
3786                let updated_at = old_l0.edge_updated_at.get(&tombstone.eid).copied();
3787
3788                entries_by_type
3789                    .entry(tombstone.edge_type)
3790                    .or_default()
3791                    .push(L1Entry {
3792                        src_vid: tombstone.src_vid,
3793                        dst_vid: tombstone.dst_vid,
3794                        eid: tombstone.eid,
3795                        op: Op::Delete,
3796                        version,
3797                        properties: HashMap::new(),
3798                        created_at,
3799                        updated_at,
3800                    });
3801            }
3802
3803            // 1b. Collect vertices by label (using vertex_labels from L0)
3804            //
3805            // Helper: fan-out a single vertex entry into per-label buckets.
3806            // Each per-label table row carries the full label set so multi-label
3807            // info is preserved after flush.
3808            let push_vertex_to_labels =
3809                |vid: Vid,
3810                 all_labels: &[String],
3811                 props: Properties,
3812                 deleted: bool,
3813                 version: u64,
3814                 out: &mut HashMap<u16, Vec<VertexEntry>>| {
3815                    for label in all_labels {
3816                        if let Some(label_id) = schema.label_id_by_name(label) {
3817                            out.entry(label_id).or_default().push((
3818                                vid,
3819                                all_labels.to_vec(),
3820                                props.clone(),
3821                                deleted,
3822                                version,
3823                            ));
3824                        }
3825                    }
3826                };
3827
3828            for (vid, props) in &old_l0.vertex_properties {
3829                let version = old_l0.vertex_versions.get(vid).copied().unwrap_or(0);
3830                // Collect timestamps for this vertex
3831                if let Some(&ts) = old_l0.vertex_created_at.get(vid) {
3832                    vertex_created_at.insert(*vid, ts);
3833                }
3834                if let Some(&ts) = old_l0.vertex_updated_at.get(vid) {
3835                    vertex_updated_at.insert(*vid, ts);
3836                }
3837                if let Some(labels) = old_l0.vertex_labels.get(vid) {
3838                    // Partial-write routing: when this VID was last
3839                    // touched via `insert_vertex_partial` AND the
3840                    // partial_lance_writes flag is on, send only the
3841                    // touched columns to a MergeInsert batch. Otherwise
3842                    // (CREATE, MERGE-ON-CREATE, full-replace SET, DELETE
3843                    // — or flag off) use the existing full-row Append.
3844                    let is_partial = self.config.partial_lance_writes
3845                        && old_l0.vertex_partial_keys.contains_key(vid);
3846                    if is_partial {
3847                        if let Some(touched) = old_l0.vertex_partial_keys.get(vid) {
3848                            for label in labels {
3849                                if let Some(label_id) = schema.label_id_by_name(label) {
3850                                    partial_by_label.entry(label_id).or_default().push((
3851                                        *vid,
3852                                        props.clone(),
3853                                        version,
3854                                        touched.clone(),
3855                                    ));
3856                                }
3857                            }
3858                        }
3859                    } else {
3860                        push_vertex_to_labels(
3861                            *vid,
3862                            labels,
3863                            props.clone(),
3864                            false,
3865                            version,
3866                            &mut vertices_by_label,
3867                        );
3868                    }
3869                }
3870            }
3871            for &vid in &old_l0.vertex_tombstones {
3872                let version = old_l0.vertex_versions.get(&vid).copied().unwrap_or(0);
3873                if let Some(&ts) = old_l0.vertex_updated_at.get(&vid) {
3874                    vertex_updated_at.insert(vid, ts);
3875                }
3876                if let Some(labels) = old_l0.vertex_labels.get(&vid) {
3877                    // Round-12 §B: tombstones flush via Lance MergeInsert
3878                    // (just `_vid`, `_deleted=true`, `_version`,
3879                    // `_updated_at`) — skipping the wide-row Append.
3880                    // Unconditional (no `partial_lance_writes` gating);
3881                    // tombstone Append carries no useful payload.
3882                    for label in labels {
3883                        if let Some(label_id) = schema.label_id_by_name(label) {
3884                            tombstones_by_label
3885                                .entry(label_id)
3886                                .or_default()
3887                                .push((vid, version));
3888                        }
3889                    }
3890                } else {
3891                    // Tombstone missing labels (old WAL format) - collect for storage query fallback
3892                    orphaned_tombstones.push((vid, version));
3893                }
3894            }
3895        } // Drop read lock
3896
3897        // Resolve orphaned tombstones (missing labels) from storage
3898        if !orphaned_tombstones.is_empty() {
3899            tracing::warn!(
3900                count = orphaned_tombstones.len(),
3901                "Tombstones missing labels in L0, querying storage as fallback"
3902            );
3903            for (vid, version) in orphaned_tombstones {
3904                if let Ok(Some(labels)) = self.find_vertex_labels_in_storage(vid).await
3905                    && !labels.is_empty()
3906                {
3907                    for label in &labels {
3908                        if let Some(label_id) = schema.label_id_by_name(label) {
3909                            // Round-12 §B: route through partial tombstone too.
3910                            tombstones_by_label
3911                                .entry(label_id)
3912                                .or_default()
3913                                .push((vid, version));
3914                        }
3915                    }
3916                }
3917            }
3918        }
3919
3920        // 1. Load previous snapshot from cache, or fall back to storage.
3921        //
3922        // Use clone() not take(): for the async path, multiple
3923        // concurrent streams may run; if we take() here, a sibling
3924        // stream sees cached_manifest = None and seeds from
3925        // load_latest_snapshot (stale), losing the chain. clone()
3926        // preserves the parent. Finalize writes back the new manifest
3927        // unconditionally.
3928        let mut manifest = if let Some(cached) = self.cached_manifest.lock().clone() {
3929            cached
3930        } else {
3931            self.storage
3932                .snapshot_manager()
3933                .load_latest_snapshot()
3934                .await?
3935                .unwrap_or_else(|| {
3936                    SnapshotManifest::new(Uuid::new_v4().to_string(), schema.schema_version)
3937                })
3938        };
3939
3940        // Update snapshot metadata
3941        // Save parent snapshot ID before generating new one (for lineage tracking)
3942        let parent_id = manifest.snapshot_id.clone();
3943        manifest.parent_snapshot = Some(parent_id);
3944        manifest.snapshot_id = Uuid::new_v4().to_string();
3945        manifest.name = name;
3946        manifest.created_at = Utc::now();
3947        manifest.version_high_water_mark = current_version;
3948        manifest.wal_high_water_mark = wal_lsn;
3949        let snapshot_id = manifest.snapshot_id.clone();
3950
3951        tracing::Span::current().record("snapshot_id", &snapshot_id);
3952
3953        // 2. Write main unified tables FIRST (before deltas).
3954        //    Ensures the dual-write invariant: by the time an EID appears in a
3955        //    delta table, it already exists in main_edges. This prevents the
3956        //    compaction debug_assert from firing when compaction interleaves
3957        //    with flush at async yield points.
3958        //
3959        // 2.1 Main edges table
3960        let (main_edges, edge_created_at_map, edge_updated_at_map) = {
3961            let _old_l0 = old_l0_arc.read();
3962            let mut main_edges: Vec<(
3963                uni_common::core::id::Eid,
3964                Vid,
3965                Vid,
3966                String,
3967                Properties,
3968                bool,
3969                u64,
3970            )> = Vec::new();
3971            let mut edge_created_at_map: HashMap<uni_common::core::id::Eid, i64> = HashMap::new();
3972            let mut edge_updated_at_map: HashMap<uni_common::core::id::Eid, i64> = HashMap::new();
3973
3974            for (&edge_type_id, entries) in entries_by_type.iter() {
3975                for entry in entries {
3976                    let edge_type_name = self
3977                        .storage
3978                        .schema_manager()
3979                        .edge_type_name_by_id_unified(edge_type_id)
3980                        .unwrap_or_else(|| "unknown".to_string());
3981
3982                    let deleted = matches!(entry.op, Op::Delete);
3983                    main_edges.push((
3984                        entry.eid,
3985                        entry.src_vid,
3986                        entry.dst_vid,
3987                        edge_type_name,
3988                        entry.properties.clone(),
3989                        deleted,
3990                        entry.version,
3991                    ));
3992
3993                    if let Some(ts) = entry.created_at {
3994                        edge_created_at_map.insert(entry.eid, ts);
3995                    }
3996                    if let Some(ts) = entry.updated_at {
3997                        edge_updated_at_map.insert(entry.eid, ts);
3998                    }
3999                }
4000            }
4001
4002            (main_edges, edge_created_at_map, edge_updated_at_map)
4003        };
4004
4005        if !main_edges.is_empty() {
4006            let main_edge_batch = MainEdgeDataset::build_record_batch(
4007                &main_edges,
4008                Some(&edge_created_at_map),
4009                Some(&edge_updated_at_map),
4010            )?;
4011            MainEdgeDataset::write_batch(self.storage.backend(), main_edge_batch).await?;
4012            MainEdgeDataset::ensure_default_indexes(self.storage.backend()).await?;
4013        }
4014
4015        // 2.2 Main vertices table
4016        let mut main_vertices: Vec<(Vid, Vec<String>, Properties, bool, u64)> = {
4017            let old_l0 = old_l0_arc.read();
4018            let mut vertices = Vec::new();
4019
4020            // Live vertices: full-row Append on the main table (the
4021            // props_json blob is required for global ID lookups). For
4022            // partial-row VIDs (vertex_partial_keys non-empty), the
4023            // main table still needs the full props for the
4024            // ext_id-uniqueness path; we keep the Append here. The
4025            // per-label Lance write IS partial via MergeInsert.
4026            for (vid, props) in &old_l0.vertex_properties {
4027                let version = old_l0.vertex_versions.get(vid).copied().unwrap_or(0);
4028                let labels = old_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
4029                vertices.push((*vid, labels, props.clone(), false, version));
4030            }
4031
4032            // Tombstones: collected into `main_vertex_tombstones` for
4033            // the MergeInsert path below; skipping the wide-row Append.
4034            for &vid in &old_l0.vertex_tombstones {
4035                let version = old_l0.vertex_versions.get(&vid).copied().unwrap_or(0);
4036                main_vertex_tombstones.push((vid, version));
4037            }
4038
4039            vertices
4040        };
4041
4042        // M8: durable label-only mutations across flush windows.
4043        //
4044        // `SET n:Label` / `REMOVE n:Label` mark the vid in
4045        // `vertex_label_overwrites` and update `vertex_labels`, but for a
4046        // vid flushed in a PRIOR window they never re-add it to
4047        // `vertex_properties`. The loops above key off `vertex_properties`,
4048        // so such a relabel would be silently lost: absent from the main
4049        // table, the per-label datasets, and the VidLabelsIndex (and so
4050        // `rebuild_vid_labels_index` reads stale labels after a restart).
4051        // The same-window create+relabel case already works because the
4052        // create put the vid in `vertex_properties`.
4053        //
4054        // Re-derive each overwrite-only vid by fetching its persisted props
4055        // and labels, then route it into `main_vertices` (main table +
4056        // index), the new per-label datasets, and a tombstone in any
4057        // per-label dataset it left. `MATCH (n:OldLabel)` scans the
4058        // per-label table directly, so the old-label tombstone is required.
4059        let overwrite_only: Vec<(Vid, Vec<String>, u64)> = {
4060            let old_l0 = old_l0_arc.read();
4061            old_l0
4062                .vertex_label_overwrites
4063                .iter()
4064                .filter(|vid| {
4065                    !old_l0.vertex_properties.contains_key(*vid)
4066                        && !old_l0.vertex_tombstones.contains(*vid)
4067                })
4068                .map(|vid| {
4069                    let labels = old_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
4070                    let version = old_l0.vertex_versions.get(vid).copied().unwrap_or(0);
4071                    (*vid, labels, version)
4072                })
4073                .collect()
4074        };
4075        for (vid, new_labels, version) in overwrite_only {
4076            // Persisted props of the prior-window row — required so the
4077            // re-Appended main row does not blank the vertex's properties.
4078            let Some(props) = MainVertexDataset::find_props_by_vid(
4079                self.storage.backend(),
4080                vid,
4081                self.storage.version_high_water_mark(),
4082            )
4083            .await?
4084            else {
4085                tracing::warn!(
4086                    vid = vid.as_u64(),
4087                    "label-only mutation for a vid with no persisted main row; skipping flush \
4088                     of its relabel"
4089                );
4090                continue;
4091            };
4092            // Labels the vid carried BEFORE this relabel; the storage read
4093            // reflects pre-flush state. Any label no longer present must be
4094            // tombstoned in its per-label dataset.
4095            let old_labels = self
4096                .find_vertex_labels_in_storage(vid)
4097                .await?
4098                .unwrap_or_default();
4099
4100            main_vertices.push((vid, new_labels.clone(), props.clone(), false, version));
4101            for label in &new_labels {
4102                if let Some(label_id) = schema.label_id_by_name(label) {
4103                    vertices_by_label.entry(label_id).or_default().push((
4104                        vid,
4105                        new_labels.clone(),
4106                        props.clone(),
4107                        false,
4108                        version,
4109                    ));
4110                }
4111            }
4112            for label in &old_labels {
4113                if !new_labels.contains(label)
4114                    && let Some(label_id) = schema.label_id_by_name(label)
4115                {
4116                    tombstones_by_label
4117                        .entry(label_id)
4118                        .or_default()
4119                        .push((vid, version));
4120                }
4121            }
4122        }
4123
4124        if !main_vertices.is_empty() {
4125            let main_vertex_batch = MainVertexDataset::build_record_batch(
4126                &main_vertices,
4127                Some(&vertex_created_at),
4128                Some(&vertex_updated_at),
4129            )?;
4130            MainVertexDataset::write_batch(self.storage.backend(), main_vertex_batch).await?;
4131        }
4132        // Round-12 §B: tombstones via MergeInsert on the main vertices
4133        // table. Independent of `vertex_properties` length.
4134        if !main_vertex_tombstones.is_empty() {
4135            let tomb_batch = MainVertexDataset::build_tombstone_partial_batch(
4136                &main_vertex_tombstones,
4137                Some(&vertex_updated_at),
4138            )?;
4139            MainVertexDataset::merge_insert_tombstone_batch(self.storage.backend(), tomb_batch)
4140                .await?;
4141        }
4142        if !main_vertices.is_empty() || !main_vertex_tombstones.is_empty() {
4143            MainVertexDataset::ensure_default_indexes(self.storage.backend()).await?;
4144        }
4145
4146        // Keep the VidLabelsIndex current for every flushed vertex. This is the
4147        // single place that sees all vertices: the per-label fan-out below skips
4148        // undeclared (schemaless) labels, so updating the index there would miss
4149        // them. Traversal-time label predicates read this index to resolve
4150        // labels for vertices that live only in Lance — notably on a fork, whose
4151        // data is flushed to Lance before branching. (GitHub #99)
4152        for (vid, labels, _props, _deleted, _version) in &main_vertices {
4153            self.storage.update_vid_labels_index(*vid, labels.clone());
4154        }
4155        for (vid, _version) in &main_vertex_tombstones {
4156            self.storage.remove_from_vid_labels_index(*vid);
4157        }
4158
4159        // 3. For each edge type, write FWD and BWD delta runs
4160        for (&edge_type_id, entries) in entries_by_type.iter() {
4161            // Get edge type name from unified lookup (handles both schema'd and schemaless)
4162            let edge_type_name = self
4163                .storage
4164                .schema_manager()
4165                .edge_type_name_by_id_unified(edge_type_id)
4166                .ok_or_else(|| anyhow!("Edge type ID {} not found", edge_type_id))?;
4167
4168            // FWD Run (sorted by src_vid)
4169            // Round-12 §A: split entries into full-row Append and
4170            // partial MergeInsert routes based on `edge_partial_keys`.
4171            // Edges in `edge_partial_keys` were last written via
4172            // `insert_edge_partial_full`; the per-edge-type delta
4173            // tables receive only the touched schema columns plus
4174            // (when any overflow key was touched) the regenerated
4175            // `overflow_json` blob. Untouched columns retain their
4176            // previous-version value via Lance MergeInsert.
4177            let partial_eids: std::collections::HashSet<Eid> = {
4178                let old_l0 = old_l0_arc.read();
4179                entries
4180                    .iter()
4181                    .filter(|e| {
4182                        self.config.partial_lance_writes
4183                            && old_l0.edge_partial_keys.contains_key(&e.eid)
4184                    })
4185                    .map(|e| e.eid)
4186                    .collect()
4187            };
4188            let touched_union_by_eid: HashMap<Eid, std::collections::HashSet<String>> = {
4189                let old_l0 = old_l0_arc.read();
4190                partial_eids
4191                    .iter()
4192                    .filter_map(|eid| old_l0.edge_partial_keys.get(eid).map(|s| (*eid, s.clone())))
4193                    .collect()
4194            };
4195            let (full_entries, partial_entries): (Vec<L1Entry>, Vec<L1Entry>) = entries
4196                .clone()
4197                .into_iter()
4198                .partition(|e| !partial_eids.contains(&e.eid));
4199
4200            let backend = self.storage.backend();
4201
4202            // FWD run (sorted by src_vid)
4203            let mut fwd_full = full_entries.clone();
4204            fwd_full.sort_by_key(|e| e.src_vid);
4205            let mut fwd_partial = partial_entries.clone();
4206            fwd_partial.sort_by_key(|e| e.src_vid);
4207            let fwd_ds = self.storage.delta_dataset(&edge_type_name, "fwd")?;
4208            if !fwd_full.is_empty() {
4209                let fwd_batch = fwd_ds.build_record_batch(&fwd_full, &schema)?;
4210                fwd_ds.write_run(backend, fwd_batch).await?;
4211            }
4212            if !fwd_partial.is_empty() {
4213                let touched_union: std::collections::HashSet<String> = fwd_partial
4214                    .iter()
4215                    .flat_map(|e| {
4216                        touched_union_by_eid
4217                            .get(&e.eid)
4218                            .cloned()
4219                            .unwrap_or_default()
4220                            .into_iter()
4221                    })
4222                    .collect();
4223                let fwd_partial_batch =
4224                    fwd_ds.build_partial_record_batch(&fwd_partial, &touched_union, &schema)?;
4225                fwd_ds
4226                    .merge_insert_partial_run(backend, fwd_partial_batch)
4227                    .await?;
4228            }
4229            fwd_ds.ensure_eid_index(backend).await?;
4230
4231            // BWD Run (sorted by dst_vid)
4232            let mut bwd_full = full_entries.clone();
4233            bwd_full.sort_by_key(|e| e.dst_vid);
4234            let mut bwd_partial = partial_entries.clone();
4235            bwd_partial.sort_by_key(|e| e.dst_vid);
4236            let bwd_ds = self.storage.delta_dataset(&edge_type_name, "bwd")?;
4237            if !bwd_full.is_empty() {
4238                let bwd_batch = bwd_ds.build_record_batch(&bwd_full, &schema)?;
4239                bwd_ds.write_run(backend, bwd_batch).await?;
4240            }
4241            if !bwd_partial.is_empty() {
4242                let touched_union: std::collections::HashSet<String> = bwd_partial
4243                    .iter()
4244                    .flat_map(|e| {
4245                        touched_union_by_eid
4246                            .get(&e.eid)
4247                            .cloned()
4248                            .unwrap_or_default()
4249                            .into_iter()
4250                    })
4251                    .collect();
4252                let bwd_partial_batch =
4253                    bwd_ds.build_partial_record_batch(&bwd_partial, &touched_union, &schema)?;
4254                bwd_ds
4255                    .merge_insert_partial_run(backend, bwd_partial_batch)
4256                    .await?;
4257            }
4258            bwd_ds.ensure_eid_index(backend).await?;
4259
4260            // Update Manifest
4261            let current_snap =
4262                manifest
4263                    .edges
4264                    .entry(edge_type_name.to_string())
4265                    .or_insert(EdgeSnapshot {
4266                        version: 0,
4267                        count: 0,
4268                        lance_version: 0,
4269                    });
4270            current_snap.version += 1;
4271            current_snap.count += entries.len() as u64;
4272            // LanceDB tables don't expose Lance version directly
4273            current_snap.lance_version = 0;
4274
4275            // Note: No CSR invalidation needed. AdjacencyManager's overlay
4276            // already has these edges via dual-write in insert_edge/delete_edge.
4277        }
4278
4279        // 4. Per-label vertex table writes
4280        // Iterate all labels that have either full-row OR partial-write
4281        // data pending. A label may appear in only one of the two maps
4282        // (e.g., all updates on this label were partial-only).
4283        let all_label_ids: std::collections::HashSet<u16> = vertices_by_label
4284            .keys()
4285            .chain(partial_by_label.keys())
4286            .chain(tombstones_by_label.keys())
4287            .copied()
4288            .collect();
4289        for label_id in all_label_ids {
4290            let vertices = vertices_by_label.remove(&label_id).unwrap_or_default();
4291            let label_name = schema
4292                .label_name_by_id(label_id)
4293                .ok_or_else(|| anyhow!("Label ID {} not found", label_id))?;
4294
4295            let ds = self.storage.vertex_dataset(label_name)?;
4296
4297            // Collect inverted index updates before consuming vertices
4298            // Maps: cfg.property -> (added, removed)
4299            type InvertedUpdateMap = HashMap<String, (HashMap<Vid, Vec<String>>, HashSet<Vid>)>;
4300            let mut inverted_updates: InvertedUpdateMap = HashMap::new();
4301
4302            for idx in &schema.indexes {
4303                if let IndexDefinition::Inverted(cfg) = idx
4304                    && cfg.label == label_name
4305                {
4306                    let mut added: HashMap<Vid, Vec<String>> = HashMap::new();
4307                    let mut removed: HashSet<Vid> = HashSet::new();
4308
4309                    for (vid, _labels, props, deleted, _version) in &vertices {
4310                        if *deleted {
4311                            removed.insert(*vid);
4312                        } else if let Some(prop_value) = props.get(&cfg.property) {
4313                            // Extract terms from the property value (List<String>)
4314                            if let Some(arr) = prop_value.as_array() {
4315                                let terms: Vec<String> = arr
4316                                    .iter()
4317                                    .filter_map(|v| v.as_str().map(ToString::to_string))
4318                                    .collect();
4319                                if !terms.is_empty() {
4320                                    added.insert(*vid, terms);
4321                                }
4322                            }
4323                        }
4324                    }
4325                    // Round-12 §B: tombstones no longer in `vertices`;
4326                    // pull them from `tombstones_by_label` for inverted
4327                    // index removal.
4328                    if let Some(tomb_rows) = tombstones_by_label.get(&label_id) {
4329                        for (vid, _) in tomb_rows {
4330                            removed.insert(*vid);
4331                        }
4332                    }
4333
4334                    if !added.is_empty() || !removed.is_empty() {
4335                        inverted_updates.insert(cfg.property.clone(), (added, removed));
4336                    }
4337                }
4338            }
4339
4340            let mut v_data = Vec::new();
4341            let mut d_data = Vec::new();
4342            let mut ver_data = Vec::new();
4343            for (vid, labels, props, deleted, version) in vertices {
4344                v_data.push((vid, labels, props));
4345                d_data.push(deleted);
4346                ver_data.push(version);
4347            }
4348
4349            let backend = self.storage.backend();
4350
4351            // Skip the full-row Append entirely if this label only has
4352            // partial-write rows pending.
4353            if !v_data.is_empty() {
4354                let batch = ds.build_record_batch_with_timestamps(
4355                    &v_data,
4356                    &d_data,
4357                    &ver_data,
4358                    &schema,
4359                    Some(&vertex_created_at),
4360                    Some(&vertex_updated_at),
4361                )?;
4362                ds.write_batch(backend, batch, &schema).await?;
4363            }
4364
4365            // Partial-column batch (Lance MergeInsert path). The flag
4366            // gates whether the routing classified any VIDs as partial;
4367            // outside the flag this collection is always empty so the
4368            // call below is a cheap no-op.
4369            if let Some(partial_rows) = partial_by_label.remove(&label_id)
4370                && !partial_rows.is_empty()
4371            {
4372                let touched_union: std::collections::HashSet<String> = partial_rows
4373                    .iter()
4374                    .flat_map(|(_, _, _, keys)| keys.iter().cloned())
4375                    .collect();
4376                let pairs: Vec<(Vid, Properties)> = partial_rows
4377                    .iter()
4378                    .map(|(vid, props, _, _)| (*vid, props.clone()))
4379                    .collect();
4380                let versions: Vec<u64> = partial_rows.iter().map(|(_, _, v, _)| *v).collect();
4381                let partial_batch = ds.build_partial_record_batch(
4382                    &pairs,
4383                    &versions,
4384                    &touched_union,
4385                    &schema,
4386                    Some(&vertex_updated_at),
4387                )?;
4388                if partial_batch.num_rows() > 0 {
4389                    ds.merge_insert_batch(backend, partial_batch).await?;
4390                }
4391            }
4392
4393            // Tombstone batch (Round-12 §B): always MergeInsert with
4394            // just `_vid`, `_deleted=true`, `_version`, `_updated_at`.
4395            // No partial_lance_writes gating — tombstones never carry
4396            // useful property payload to write. Captured tombstone vids
4397            // also drive `remove_from_vid_labels_index` below.
4398            let tombstone_rows = tombstones_by_label.remove(&label_id).unwrap_or_default();
4399            if !tombstone_rows.is_empty() {
4400                let tomb_batch =
4401                    ds.build_tombstone_partial_batch(&tombstone_rows, Some(&vertex_updated_at))?;
4402                if tomb_batch.num_rows() > 0 {
4403                    ds.merge_insert_batch(backend, tomb_batch).await?;
4404                }
4405            }
4406
4407            ds.ensure_default_indexes(backend).await?;
4408
4409            // VidLabelsIndex maintenance is centralized at the main-vertex
4410            // flush above (it sees both schema'd and schemaless vertices).
4411
4412            // Update Manifest
4413            let current_snap =
4414                manifest
4415                    .vertices
4416                    .entry(label_name.to_string())
4417                    .or_insert(LabelSnapshot {
4418                        version: 0,
4419                        count: 0,
4420                        lance_version: 0,
4421                    });
4422            current_snap.version += 1;
4423            current_snap.count += v_data.len() as u64;
4424            // LanceDB tables don't expose Lance version directly
4425            current_snap.lance_version = 0;
4426
4427            // Invalidate table cache to ensure next read picks up new version
4428            self.storage.invalidate_table_cache(label_name);
4429
4430            // Apply inverted index updates incrementally
4431            #[cfg(feature = "lance-backend")]
4432            for idx in &schema.indexes {
4433                if let IndexDefinition::Inverted(cfg) = idx
4434                    && cfg.label == label_name
4435                    && let Some((added, removed)) = inverted_updates.get(&cfg.property)
4436                {
4437                    self.storage
4438                        .index_manager()
4439                        .update_inverted_index_incremental(cfg, added, removed)
4440                        .await?;
4441                }
4442            }
4443
4444            // Update UID index with new vertex mappings
4445            // Collect (UniId, Vid) mappings from non-deleted vertices
4446            #[cfg(feature = "lance-backend")]
4447            {
4448                let mut uid_mappings: Vec<(uni_common::core::id::UniId, Vid)> = Vec::new();
4449                for (vid, _labels, props) in &v_data {
4450                    let ext_id = props.get("ext_id").and_then(|v| v.as_str());
4451                    let uid = crate::storage::vertex::VertexDataset::compute_vertex_uid(
4452                        label_name, ext_id, props,
4453                    );
4454                    uid_mappings.push((uid, *vid));
4455                }
4456
4457                if !uid_mappings.is_empty()
4458                    && let Ok(uid_index) = self.storage.uid_index(label_name)
4459                {
4460                    // Stamp mappings with this flush's MVCC version so a later
4461                    // re-create of the same UID deterministically outranks the
4462                    // stale mapping (review C3).
4463                    uid_index
4464                        .write_mapping_versioned(&uid_mappings, current_version)
4465                        .await?;
4466                }
4467            }
4468        }
4469        Ok(FlushOutcome {
4470            manifest,
4471            snapshot_id,
4472        })
4473    }
4474
4475    /// Composition entry that assumes the caller already holds `flush_lock`.
4476    /// Runs rotate + stream + finalize_locked in sequence. Used by
4477    /// [`Writer::flush_to_l1`] (acquires the lock first) and by
4478    /// `commit_transaction_l0`'s post-merge auto-flush branch (which already
4479    /// holds the lock from the commit critical section).
4480    #[instrument(
4481        skip(self),
4482        fields(snapshot_id, mutations_count, size_bytes),
4483        level = "info"
4484    )]
4485    async fn flush_inline_under_lock(&self, name: Option<String>) -> Result<String> {
4486        let start = std::time::Instant::now();
4487
4488        let (initial_size, initial_count) = {
4489            let l0_arc = self.l0_manager.get_current();
4490            let l0 = l0_arc.read();
4491            (l0.estimated_size, l0.mutation_count)
4492        };
4493        tracing::Span::current().record("size_bytes", initial_size);
4494        tracing::Span::current().record("mutations_count", initial_count);
4495
4496        debug!("Starting L0 flush to L1");
4497
4498        // Phases A (WAL pre-flush), B (rotate), C (WAL handoff).
4499        // FlushInProgressGuard lives on RotateOutput and stays alive for
4500        // the full flush — including the finalize_locked call below.
4501        let RotateOutput {
4502            old_l0_arc,
4503            wal_lsn,
4504            current_version,
4505            flush_in_progress_guard: _flush_guard,
4506        } = self.flush_l0_rotate().await?;
4507
4508        // Phases D (L1 collect), E (orphan resolve), F (manifest seed),
4509        // G (Lance writes). Builds the manifest but does NOT publish it.
4510        let FlushOutcome {
4511            manifest,
4512            snapshot_id,
4513        } = self
4514            .flush_stream_l1(old_l0_arc.clone(), wal_lsn, current_version, name)
4515            .await?;
4516
4517        // Phases H..S: publish manifest, complete_flush, WAL truncate,
4518        // property cache clear, last_flush_time, metrics, l1_runs++,
4519        // compaction trigger, index-rebuild scheduling, fork tick.
4520        self.flush_finalize_locked(
4521            old_l0_arc,
4522            wal_lsn,
4523            manifest,
4524            snapshot_id,
4525            initial_size,
4526            initial_count,
4527            start,
4528        )
4529        .await
4530    }
4531
4532    /// Phases H..S of the flush: publish the manifest and run all
4533    /// post-publish bookkeeping. Assumes the caller already holds
4534    /// `flush_lock` — see [`Writer::flush_finalize_now`] for the
4535    /// lock-acquiring variant used by the async finalize path.
4536    #[allow(clippy::too_many_arguments)]
4537    async fn flush_finalize_locked(
4538        &self,
4539        old_l0_arc: Arc<RwLock<L0Buffer>>,
4540        wal_lsn: u64,
4541        manifest: SnapshotManifest,
4542        snapshot_id: String,
4543        initial_size: usize,
4544        initial_count: usize,
4545        start: std::time::Instant,
4546    ) -> Result<String> {
4547        Self::flush_finalize_body(
4548            &self.shared_ctx(),
4549            old_l0_arc,
4550            wal_lsn,
4551            manifest,
4552            snapshot_id,
4553            initial_size,
4554            initial_count,
4555            start,
4556        )
4557        .await
4558    }
4559
4560    /// Phases H..S of the flush, lock-acquiring variant. Used by the
4561    /// async-flush finalizer task (running on a spawned tokio task),
4562    /// which holds neither `&self` nor `flush_lock`. Briefly re-acquires
4563    /// `flush_lock` to serialize the publish boundary, then runs the
4564    /// same body as `flush_finalize_locked` but over a SharedFlushCtx.
4565    #[allow(clippy::too_many_arguments)]
4566    pub(crate) async fn flush_finalize_now(
4567        shared: SharedFlushCtx,
4568        old_l0_arc: Arc<RwLock<L0Buffer>>,
4569        wal_lsn: u64,
4570        manifest: SnapshotManifest,
4571        snapshot_id: String,
4572        initial_size: usize,
4573        initial_count: usize,
4574        start: std::time::Instant,
4575    ) -> Result<String> {
4576        let _flush_lock_guard = shared.flush_lock.clone().lock_owned().await;
4577        Self::flush_finalize_body(
4578            &shared,
4579            old_l0_arc,
4580            wal_lsn,
4581            manifest,
4582            snapshot_id,
4583            initial_size,
4584            initial_count,
4585            start,
4586        )
4587        .await
4588    }
4589
4590    /// Shared body of `flush_finalize_locked` and `flush_finalize_now`.
4591    /// Static over `SharedFlushCtx`; the caller is responsible for
4592    /// holding `flush_lock`.
4593    #[allow(clippy::too_many_arguments)]
4594    async fn flush_finalize_body(
4595        shared: &SharedFlushCtx,
4596        old_l0_arc: Arc<RwLock<L0Buffer>>,
4597        wal_lsn: u64,
4598        mut manifest: SnapshotManifest,
4599        snapshot_id: String,
4600        initial_size: usize,
4601        initial_count: usize,
4602        start: std::time::Instant,
4603    ) -> Result<String> {
4604        // Parent-snapshot fixup. The stream phase built `manifest` with
4605        // parent_snapshot set from cached_manifest at stream time. If
4606        // OTHER flushes (sync or async) have finalized since then,
4607        // cached_manifest has advanced. Re-link this manifest to the
4608        // current cached chain so we don't orphan their data when we
4609        // overwrite cached_manifest below.
4610        let current_parent_id = shared
4611            .cached_manifest
4612            .lock()
4613            .as_ref()
4614            .map(|m| m.snapshot_id.clone());
4615        if current_parent_id.is_some() && manifest.parent_snapshot != current_parent_id {
4616            manifest.parent_snapshot = current_parent_id;
4617            metrics::counter!("uni_flush_parent_chain_fixups_total").increment(1);
4618        }
4619
4620        // H. Publish manifest (body first, then pointer — recovery is
4621        // idempotent if we crash between the two).
4622        // A fork writer must publish to a fork-scoped namespace, never the
4623        // global `catalog/latest` that the primary reopen reads (review C1).
4624        debug_assert_eq!(
4625            shared.fork_id.is_some(),
4626            shared.storage.snapshot_manager().is_fork_scoped(),
4627            "fork writer must publish to a fork-scoped snapshot namespace (review C1)"
4628        );
4629        shared
4630            .storage
4631            .snapshot_manager()
4632            .save_snapshot(&manifest)
4633            .await?;
4634        shared
4635            .storage
4636            .snapshot_manager()
4637            .set_latest_snapshot(&manifest.snapshot_id)
4638            .await?;
4639
4640        // H2. Durability barrier (review C4). `save_snapshot` / `set_latest_snapshot`
4641        // wrote the manifest body and the `catalog/latest` pointer through the
4642        // object store, which does NOT fsync. WAL truncation (K, below) removes
4643        // the only other durable copy of this flush's data, so a crash after K
4644        // but before the OS flushed those writes would lose the snapshot —
4645        // recovery could not resolve `latest`. Make them durable now (local-fs
4646        // only; remote stores provide their own durability on `put`).
4647        crate::snapshot::manager::fsync_snapshot_pointer(
4648            shared.storage.local_fs_root().as_deref(),
4649            shared.fork_id.as_ref(),
4650            &manifest.snapshot_id,
4651        )
4652        .map_err(|e| {
4653            anyhow!(
4654                "fsync snapshot {} before WAL truncate: {}",
4655                manifest.snapshot_id,
4656                e
4657            )
4658        })?;
4659
4660        // I. Cache manifest for next flush to avoid re-reading from object store.
4661        *shared.cached_manifest.lock() = Some(manifest.clone());
4662
4663        // L. Invalidate the property cache BEFORE removing the flushed buffer
4664        // from the L0 chain (Bug #10). `clear_cache` has no dependency on the
4665        // complete_flush (J) / WAL-truncate (K) steps below, so clearing it
4666        // first closes the non-monotonic-read window: once the buffer leaves
4667        // the L0 chain at J a freshly-written value would otherwise miss the
4668        // chain and fall through to a stale cache entry. By the time finalize
4669        // runs the streamed rows are already durable in L1, so a post-clear
4670        // read falls through to fresh storage instead. The finalizer holds
4671        // `flush_lock` throughout, so reordering L ahead of J is safe.
4672        if let Some(ref pm) = shared.property_manager {
4673            pm.clear_cache().await;
4674        }
4675
4676        // J. Complete flush: remove old L0 from pending_flush. MUST happen
4677        // BEFORE WAL truncation so min_pending_wal_lsn is accurate.
4678        shared.l0_manager.complete_flush(&old_l0_arc);
4679
4680        // Test-only seam (no-op without the `failpoints` feature): pause AFTER
4681        // complete_flush removed the buffer from the L0 chain (J) but BEFORE
4682        // WAL truncation (K). The property cache is already cleared (L moved
4683        // ahead of J above), so a read in this window falls through to fresh
4684        // L1 storage rather than a stale cache entry (Bug #10 — non-monotonic
4685        // read after flush finalize).
4686        fail::fail_point!("flush::after-complete-before-cache-clear");
4687
4688        // K. Truncate WAL up to the safe LSN.
4689        let wal_handle = shared.l0_manager.get_current().read().wal.clone();
4690        if let Some(w) = wal_handle {
4691            let safe_lsn = shared
4692                .l0_manager
4693                .min_pending_wal_lsn()
4694                .map(|min_pending| min_pending.min(wal_lsn))
4695                .unwrap_or(wal_lsn);
4696            w.truncate_before(safe_lsn).await?;
4697        }
4698
4699        // M. Reset last flush time for time-based auto-flush.
4700        *shared.last_flush_time.lock() = std::time::Instant::now();
4701
4702        info!(
4703            snapshot_id,
4704            mutations_count = initial_count,
4705            size_bytes = initial_size,
4706            "L0 flush to L1 completed successfully"
4707        );
4708        metrics::histogram!("uni_flush_duration_seconds").record(start.elapsed().as_secs_f64());
4709        metrics::counter!("uni_flush_bytes_total").increment(initial_size as u64);
4710        metrics::counter!("uni_flush_rows_total").increment(initial_count as u64);
4711
4712        // P. Increment flush generation counter for write throttling.
4713        {
4714            let mut status = uni_common::sync::acquire_mutex(
4715                &shared.storage.compaction_status,
4716                "compaction_status",
4717            )?;
4718            status.l1_runs += 1;
4719        }
4720
4721        // Q. Trigger CSR compaction if enough frozen segments have accumulated.
4722        let am = shared.adjacency_manager.clone();
4723        if am.should_compact(shared.compaction_config.frozen_segments_compact_threshold) {
4724            let previous_still_running = {
4725                let guard = shared.compaction_handle.read();
4726                guard.as_ref().is_some_and(|h| !h.is_finished())
4727            };
4728            if previous_still_running {
4729                info!("Skipping compaction: previous compaction still in progress");
4730            } else {
4731                let handle = tokio::spawn(async move {
4732                    am.compact();
4733                });
4734                *shared.compaction_handle.write() = Some(handle);
4735            }
4736        }
4737
4738        // R. Post-flush: check if any indexes need rebuilding based on thresholds.
4739        if shared.auto_rebuild_enabled
4740            && let Some(rebuild_mgr) = shared.index_rebuild_manager.get()
4741        {
4742            Self::schedule_index_rebuilds_if_needed_static(
4743                &manifest,
4744                rebuild_mgr.clone(),
4745                shared.schema_manager.clone(),
4746                shared.index_rebuild_config.clone(),
4747            );
4748        }
4749
4750        // S. Emit fork-fragment observability after a successful forked flush.
4751        Self::tick_fork_fragment_observability_static(
4752            shared.fork_id,
4753            shared.fork_flush_count.clone(),
4754            shared.fork_fragment_warn_fired.clone(),
4755            shared.fork_fragment_warn_threshold,
4756        );
4757
4758        Ok(snapshot_id)
4759    }
4760
4761    /// Increment fork-flush bookkeeping and fire the fragment warn
4762    /// once if the threshold is crossed.
4763    ///
4764    /// Each flush typically appends ~1 fragment per touched dataset on
4765    /// the fork's branches; without compaction (deferred to Phase 5)
4766    /// long-lived heavy-write forks degrade. The flush count is a
4767    /// proxy for actual fragment growth — reading
4768    /// `Dataset::manifest().fragments.len()` per dataset would add a
4769    /// per-flush object-store roundtrip on the hot commit path, which
4770    /// is too costly for a purely observational guard rail.
4771    ///
4772    /// No-op for primary writers (`fork_id == None`).
4773    #[allow(dead_code)] // called by tests; production path uses _static
4774    pub(crate) fn tick_fork_fragment_observability(&self) {
4775        Self::tick_fork_fragment_observability_static(
4776            self.fork_id,
4777            self.fork_flush_count.clone(),
4778            self.fork_fragment_warn_fired.clone(),
4779            self.config.fork_fragment_warn_threshold,
4780        );
4781    }
4782
4783    /// Static variant of [`Writer::tick_fork_fragment_observability`].
4784    /// Used by the async-flush finalize path, where we hold a
4785    /// [`SharedFlushCtx`] bundle of Arcs rather than `&Writer`.
4786    pub(crate) fn tick_fork_fragment_observability_static(
4787        fork_id: Option<ForkId>,
4788        fork_flush_count: Arc<AtomicU64>,
4789        fork_fragment_warn_fired: Arc<AtomicBool>,
4790        warn_threshold: usize,
4791    ) {
4792        let Some(fork_id) = fork_id else { return };
4793        // `Relaxed` is sufficient: observational counter, no synchronizes-with.
4794        let new_count = fork_flush_count.fetch_add(1, Ordering::Relaxed) + 1;
4795        let fork_label = fork_id.to_string();
4796        metrics::gauge!(
4797            "uni_fork_l1_flushes",
4798            "fork" => fork_label.clone(),
4799        )
4800        .set(new_count as f64);
4801        let threshold = warn_threshold as u64;
4802        if !fork_fragment_warn_fired.load(Ordering::Relaxed)
4803            && threshold > 0
4804            && new_count >= threshold
4805        {
4806            fork_fragment_warn_fired.store(true, Ordering::Relaxed);
4807            tracing::warn!(
4808                fork = %fork_label,
4809                flush_count = new_count,
4810                threshold,
4811                "fork has exceeded the L1 flush-count threshold; \
4812                 fork compaction is deferred to Phase 5 — consider \
4813                 drop+recreate or promotion to bound fragment growth"
4814            );
4815        }
4816    }
4817
4818    /// Check rebuild thresholds and schedule background index rebuilds for
4819    /// labels that exceed growth or age limits. Marks affected indexes as
4820    /// `Stale` and spawns an async task to schedule the rebuild.
4821    #[allow(dead_code)] // production path uses _static; kept as the
4822    // documented instance entry point.
4823    fn schedule_index_rebuilds_if_needed(
4824        &self,
4825        manifest: &SnapshotManifest,
4826        rebuild_mgr: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
4827    ) {
4828        Self::schedule_index_rebuilds_if_needed_static(
4829            manifest,
4830            rebuild_mgr,
4831            self.schema_manager.clone(),
4832            self.config.index_rebuild.clone(),
4833        );
4834    }
4835
4836    /// Static variant of [`Writer::schedule_index_rebuilds_if_needed`].
4837    /// Used by the async-flush finalize path, where we hold the
4838    /// [`SchemaManager`] via `SharedFlushCtx` rather than `&Writer`.
4839    pub(crate) fn schedule_index_rebuilds_if_needed_static(
4840        manifest: &SnapshotManifest,
4841        rebuild_mgr: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
4842        schema_manager: Arc<uni_common::core::schema::SchemaManager>,
4843        index_rebuild_config: uni_common::config::IndexRebuildConfig,
4844    ) {
4845        let checker =
4846            crate::storage::index_rebuild::RebuildTriggerChecker::new(index_rebuild_config);
4847        let schema = schema_manager.schema();
4848        let labels = checker.labels_needing_rebuild(manifest, &schema.indexes);
4849
4850        if labels.is_empty() {
4851            return;
4852        }
4853
4854        // Mark affected indexes as Stale
4855        for label in &labels {
4856            for idx in &schema.indexes {
4857                if idx.label() == label {
4858                    let _ = schema_manager.update_index_metadata(idx.name(), |m| {
4859                        m.status = uni_common::core::schema::IndexStatus::Stale;
4860                    });
4861                }
4862            }
4863        }
4864
4865        tokio::spawn(async move {
4866            if let Err(e) = rebuild_mgr.schedule(labels).await {
4867                tracing::warn!("Failed to schedule index rebuild: {e}");
4868            }
4869        });
4870    }
4871}
4872
4873/// `FinalizeFn` implementation that the `FlushCoordinator` invokes from
4874/// its single-task finalizer loop. Unit struct on purpose: it must NOT
4875/// hold `Arc<Writer>` (that would create a reference cycle Writer ->
4876/// FlushCoordinator -> Arc<dyn FinalizeFn> -> Writer). All state needed
4877/// for finalize travels in via `SharedFlushCtx`.
4878pub(crate) struct WriterFinalizer;
4879
4880impl FinalizeFn for WriterFinalizer {
4881    fn finalize<'a>(
4882        &'a self,
4883        rotated: RotatedFlush,
4884        outcome: AsyncFlushOutcome,
4885        shared: SharedFlushCtx,
4886    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<String>> + Send + 'a>> {
4887        Box::pin(async move {
4888            // Read initial_size / initial_count from the rotated L0 so
4889            // we don't have to plumb them through the coordinator
4890            // submission. The buffer is still alive in pending_flush
4891            // until `complete_flush` (J) below pops it.
4892            let (initial_size, initial_count) = {
4893                let l0 = rotated.old_l0_arc.read();
4894                (l0.estimated_size, l0.mutation_count)
4895            };
4896            let result = Writer::flush_finalize_now(
4897                shared,
4898                rotated.old_l0_arc.clone(),
4899                rotated.wal_lsn,
4900                outcome.new_manifest,
4901                outcome.snapshot_id,
4902                initial_size,
4903                initial_count,
4904                std::time::Instant::now(),
4905            )
4906            .await;
4907            // `rotated` (permit + flush_in_progress_guard) drops here.
4908            drop(rotated.permit);
4909            result
4910        })
4911    }
4912
4913    fn finalize_failure<'a>(
4914        &'a self,
4915        rotated: RotatedFlush,
4916        err: anyhow::Error,
4917        _shared: SharedFlushCtx,
4918    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Error> + Send + 'a>> {
4919        Box::pin(async move {
4920            tracing::warn!(
4921                error = %err,
4922                seq = rotated.seq,
4923                "async flush stream failed; old L0 remains in pending_flush, \
4924                 WAL retains its data, recovery via WAL replay on restart"
4925            );
4926            metrics::counter!("uni_flush_failures_total").increment(1);
4927            // Permit + guard drop here so back-pressure releases even on
4928            // failure.
4929            drop(rotated.permit);
4930            err
4931        })
4932    }
4933}
4934
4935#[cfg(test)]
4936mod tests {
4937    use super::*;
4938    use tempfile::tempdir;
4939
4940    /// Test that commit_transaction writes mutations to WAL before merging to main L0.
4941    /// This verifies fix for issue #137 (transaction commit atomicity).
4942    #[tokio::test]
4943    async fn test_commit_transaction_wal_before_merge() -> Result<()> {
4944        use crate::runtime::wal::WriteAheadLog;
4945        use crate::storage::manager::StorageManager;
4946        use object_store::local::LocalFileSystem;
4947        use object_store::path::Path as ObjectStorePath;
4948        use uni_common::core::schema::SchemaManager;
4949
4950        let dir = tempdir()?;
4951        let path = dir.path().to_str().unwrap();
4952        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
4953        let schema_path = ObjectStorePath::from("schema.json");
4954
4955        let schema_manager =
4956            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
4957        let _label_id = schema_manager.add_label("Test")?;
4958        schema_manager.save().await?;
4959
4960        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
4961
4962        // Create WAL for main L0
4963        let wal_path = ObjectStorePath::from("wal");
4964        let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
4965
4966        let writer = Writer::new_with_config(
4967            storage.clone(),
4968            schema_manager.clone(),
4969            1,
4970            UniConfig::default(),
4971            Some(wal),
4972            None,
4973        )
4974        .await?;
4975
4976        // Begin transaction — create a transaction L0
4977        let tx_l0 = writer.create_transaction_l0();
4978
4979        // Insert data in transaction
4980        let vid_a = writer.next_vid().await?;
4981        let vid_b = writer.next_vid().await?;
4982
4983        let mut props = std::collections::HashMap::new();
4984        props.insert("test".to_string(), Value::String("data".to_string()));
4985
4986        writer
4987            .insert_vertex_with_labels(vid_a, props.clone(), &["Test".to_string()], Some(&tx_l0))
4988            .await?;
4989        writer
4990            .insert_vertex_with_labels(
4991                vid_b,
4992                std::collections::HashMap::new(),
4993                &["Test".to_string()],
4994                Some(&tx_l0),
4995            )
4996            .await?;
4997
4998        let eid = writer.next_eid(1).await?;
4999        writer
5000            .insert_edge(
5001                vid_a,
5002                vid_b,
5003                1,
5004                eid,
5005                std::collections::HashMap::new(),
5006                None,
5007                Some(&tx_l0),
5008            )
5009            .await?;
5010
5011        // Get WAL before commit
5012        let l0 = writer.l0_manager.get_current();
5013        let wal = l0.read().wal.clone().expect("Main L0 should have WAL");
5014        let mutations_before = wal.replay().await?;
5015        let count_before = mutations_before.len();
5016
5017        // Commit transaction - this should write to WAL first
5018        let writer = Arc::new(writer);
5019        writer.commit_transaction_l0(tx_l0).await?;
5020
5021        // Verify WAL has the new mutations
5022        let mutations_after = wal.replay().await?;
5023        assert!(
5024            mutations_after.len() > count_before,
5025            "WAL should contain transaction mutations after commit"
5026        );
5027
5028        // Verify mutations are in correct order: vertices first, then edges
5029        let new_mutations: Vec<_> = mutations_after.into_iter().skip(count_before).collect();
5030
5031        let mut saw_vertex_a = false;
5032        let mut saw_vertex_b = false;
5033        let mut saw_edge = false;
5034
5035        for mutation in &new_mutations {
5036            match mutation {
5037                crate::runtime::wal::Mutation::InsertVertex { vid, .. } => {
5038                    if *vid == vid_a {
5039                        saw_vertex_a = true;
5040                    }
5041                    if *vid == vid_b {
5042                        saw_vertex_b = true;
5043                    }
5044                    // Vertices should come before edges
5045                    assert!(!saw_edge, "Vertices should be logged to WAL before edges");
5046                }
5047                crate::runtime::wal::Mutation::InsertEdge { eid: e, .. } => {
5048                    if *e == eid {
5049                        saw_edge = true;
5050                    }
5051                    // Edges should come after vertices
5052                    assert!(
5053                        saw_vertex_a && saw_vertex_b,
5054                        "Edge should be logged after both vertices"
5055                    );
5056                }
5057                _ => {}
5058            }
5059        }
5060
5061        assert!(saw_vertex_a, "Vertex A should be in WAL");
5062        assert!(saw_vertex_b, "Vertex B should be in WAL");
5063        assert!(saw_edge, "Edge should be in WAL");
5064
5065        // Verify data is also in main L0
5066        let l0_read = l0.read();
5067        assert!(
5068            l0_read.vertex_properties.contains_key(&vid_a),
5069            "Vertex A should be in main L0"
5070        );
5071        assert!(
5072            l0_read.vertex_properties.contains_key(&vid_b),
5073            "Vertex B should be in main L0"
5074        );
5075        assert!(
5076            l0_read.edge_endpoints.contains_key(&eid),
5077            "Edge should be in main L0"
5078        );
5079
5080        Ok(())
5081    }
5082
5083    /// Test that failed WAL flush leaves transaction intact for retry or rollback.
5084    #[tokio::test]
5085    async fn test_commit_transaction_wal_failure_rollback() -> Result<()> {
5086        use crate::runtime::wal::WriteAheadLog;
5087        use crate::storage::manager::StorageManager;
5088        use object_store::local::LocalFileSystem;
5089        use object_store::path::Path as ObjectStorePath;
5090        use uni_common::core::schema::SchemaManager;
5091
5092        let dir = tempdir()?;
5093        let path = dir.path().to_str().unwrap();
5094        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5095        let schema_path = ObjectStorePath::from("schema.json");
5096
5097        let schema_manager =
5098            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5099        let _label_id = schema_manager.add_label("Test")?;
5100        let _baseline_label_id = schema_manager.add_label("Baseline")?;
5101        let _txdata_label_id = schema_manager.add_label("TxData")?;
5102        schema_manager.save().await?;
5103
5104        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5105
5106        // Create WAL for main L0
5107        let wal_path = ObjectStorePath::from("wal");
5108        let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
5109
5110        let writer = Writer::new_with_config(
5111            storage.clone(),
5112            schema_manager.clone(),
5113            1,
5114            UniConfig::default(),
5115            Some(wal),
5116            None,
5117        )
5118        .await?;
5119
5120        // Insert baseline data (outside transaction)
5121        let baseline_vid = writer.next_vid().await?;
5122        writer
5123            .insert_vertex_with_labels(
5124                baseline_vid,
5125                [("baseline".to_string(), Value::Bool(true))]
5126                    .into_iter()
5127                    .collect(),
5128                &["Baseline".to_string()],
5129                None,
5130            )
5131            .await?;
5132
5133        // Begin transaction — create a transaction L0
5134        let tx_l0 = writer.create_transaction_l0();
5135
5136        // Insert data in transaction
5137        let tx_vid = writer.next_vid().await?;
5138        writer
5139            .insert_vertex_with_labels(
5140                tx_vid,
5141                [("tx_data".to_string(), Value::Bool(true))]
5142                    .into_iter()
5143                    .collect(),
5144                &["TxData".to_string()],
5145                Some(&tx_l0),
5146            )
5147            .await?;
5148
5149        // Capture main L0 state before rollback
5150        let l0 = writer.l0_manager.get_current();
5151        let vertex_count_before = l0.read().vertex_properties.len();
5152
5153        // Rollback transaction (simulating what would happen after WAL flush failure)
5154        drop(tx_l0);
5155
5156        // Verify main L0 is unchanged
5157        let vertex_count_after = l0.read().vertex_properties.len();
5158        assert_eq!(
5159            vertex_count_before, vertex_count_after,
5160            "Main L0 should not change after rollback"
5161        );
5162
5163        // Baseline should still be present
5164        assert!(
5165            l0.read().vertex_properties.contains_key(&baseline_vid),
5166            "Baseline data should remain"
5167        );
5168
5169        // Transaction data should NOT be in main L0
5170        assert!(
5171            !l0.read().vertex_properties.contains_key(&tx_vid),
5172            "Transaction data should not be in main L0 after rollback"
5173        );
5174
5175        Ok(())
5176    }
5177
5178    /// Test that batch insert with shared labels does not clone labels per vertex.
5179    /// This verifies fix for issue #161 (redundant label cloning).
5180    #[tokio::test]
5181    async fn test_batch_insert_shared_labels() -> Result<()> {
5182        use crate::storage::manager::StorageManager;
5183        use object_store::local::LocalFileSystem;
5184        use object_store::path::Path as ObjectStorePath;
5185        use uni_common::core::schema::SchemaManager;
5186
5187        let dir = tempdir()?;
5188        let path = dir.path().to_str().unwrap();
5189        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5190        let schema_path = ObjectStorePath::from("schema.json");
5191
5192        let schema_manager =
5193            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5194        let _label_id = schema_manager.add_label("Person")?;
5195        schema_manager.save().await?;
5196
5197        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5198
5199        let writer = Writer::new(storage.clone(), schema_manager.clone(), 1).await?;
5200
5201        // Shared labels - should not be cloned per vertex
5202        let labels = &["Person".to_string()];
5203
5204        // Insert batch of vertices with same labels
5205        let mut vids = Vec::new();
5206        for i in 0..100 {
5207            let vid = writer.next_vid().await?;
5208            let mut props = std::collections::HashMap::new();
5209            props.insert("id".to_string(), Value::Int(i));
5210            writer
5211                .insert_vertex_with_labels(vid, props, labels, None)
5212                .await?;
5213            vids.push(vid);
5214        }
5215
5216        // Verify all vertices have the correct labels
5217        let l0 = writer.l0_manager.get_current();
5218        for vid in vids {
5219            let l0_guard = l0.read();
5220            let vertex_labels = l0_guard.vertex_labels.get(&vid);
5221            assert!(vertex_labels.is_some(), "Vertex should have labels");
5222            assert_eq!(
5223                vertex_labels.unwrap(),
5224                &vec!["Person".to_string()],
5225                "Labels should match"
5226            );
5227        }
5228
5229        Ok(())
5230    }
5231
5232    /// Test that estimated_size tracks mutations correctly and approximates size_bytes().
5233    /// This verifies fix for issue #147 (O(V+E) size_bytes() in metrics).
5234    #[tokio::test]
5235    async fn test_estimated_size_tracks_mutations() -> Result<()> {
5236        use crate::storage::manager::StorageManager;
5237        use object_store::local::LocalFileSystem;
5238        use object_store::path::Path as ObjectStorePath;
5239        use uni_common::core::schema::SchemaManager;
5240
5241        let dir = tempdir()?;
5242        let path = dir.path().to_str().unwrap();
5243        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5244        let schema_path = ObjectStorePath::from("schema.json");
5245
5246        let schema_manager =
5247            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5248        let _label_id = schema_manager.add_label("Test")?;
5249        schema_manager.save().await?;
5250
5251        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5252
5253        let writer = Writer::new(storage.clone(), schema_manager.clone(), 1).await?;
5254
5255        let l0 = writer.l0_manager.get_current();
5256
5257        // Initial state should be empty
5258        let initial_estimated = l0.read().estimated_size;
5259        let initial_actual = l0.read().size_bytes();
5260        assert_eq!(initial_estimated, 0, "Initial estimated_size should be 0");
5261        assert_eq!(initial_actual, 0, "Initial size_bytes should be 0");
5262
5263        // Insert vertices with properties
5264        let mut vids = Vec::new();
5265        for i in 0..10 {
5266            let vid = writer.next_vid().await?;
5267            let mut props = std::collections::HashMap::new();
5268            props.insert("name".to_string(), Value::String(format!("vertex_{}", i)));
5269            props.insert("index".to_string(), Value::Int(i));
5270            writer
5271                .insert_vertex_with_labels(vid, props, &[], None)
5272                .await?;
5273            vids.push(vid);
5274        }
5275
5276        // Verify estimated_size grew
5277        let after_vertices_estimated = l0.read().estimated_size;
5278        let after_vertices_actual = l0.read().size_bytes();
5279        assert!(
5280            after_vertices_estimated > 0,
5281            "estimated_size should grow after insertions"
5282        );
5283
5284        // Verify estimated_size is within reasonable bounds of actual size (within 2x)
5285        let ratio = after_vertices_estimated as f64 / after_vertices_actual as f64;
5286        assert!(
5287            (0.5..=2.0).contains(&ratio),
5288            "estimated_size ({}) should be within 2x of size_bytes ({}), ratio: {}",
5289            after_vertices_estimated,
5290            after_vertices_actual,
5291            ratio
5292        );
5293
5294        // Insert edges with a simple edge type
5295        let edge_type = 1u32;
5296        for i in 0..9 {
5297            let eid = writer.next_eid(edge_type).await?;
5298            writer
5299                .insert_edge(
5300                    vids[i],
5301                    vids[i + 1],
5302                    edge_type,
5303                    eid,
5304                    std::collections::HashMap::new(),
5305                    Some("NEXT".to_string()),
5306                    None,
5307                )
5308                .await?;
5309        }
5310
5311        // Verify estimated_size grew further
5312        let after_edges_estimated = l0.read().estimated_size;
5313        let after_edges_actual = l0.read().size_bytes();
5314        assert!(
5315            after_edges_estimated > after_vertices_estimated,
5316            "estimated_size should grow after edge insertions"
5317        );
5318
5319        // Verify still within reasonable bounds
5320        let ratio = after_edges_estimated as f64 / after_edges_actual as f64;
5321        assert!(
5322            (0.5..=2.0).contains(&ratio),
5323            "estimated_size ({}) should be within 2x of size_bytes ({}), ratio: {}",
5324            after_edges_estimated,
5325            after_edges_actual,
5326            ratio
5327        );
5328
5329        Ok(())
5330    }
5331
5332    /// Test that flushing WAL on a writer with no mutations succeeds cleanly.
5333    #[tokio::test]
5334    async fn test_flush_wal_empty_l0_is_noop() -> Result<()> {
5335        use crate::runtime::wal::WriteAheadLog;
5336        use crate::storage::manager::StorageManager;
5337        use object_store::local::LocalFileSystem;
5338        use object_store::path::Path as ObjectStorePath;
5339        use uni_common::core::schema::SchemaManager;
5340
5341        let dir = tempdir()?;
5342        let path = dir.path().to_str().unwrap();
5343        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5344        let schema_path = ObjectStorePath::from("schema.json");
5345
5346        let schema_manager =
5347            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5348        schema_manager.save().await?;
5349
5350        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5351
5352        let wal_path = ObjectStorePath::from("wal");
5353        let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
5354
5355        let writer = Writer::new_with_config(
5356            storage.clone(),
5357            schema_manager.clone(),
5358            1,
5359            UniConfig::default(),
5360            Some(wal.clone()),
5361            None,
5362        )
5363        .await?;
5364
5365        // Flush with no mutations — should succeed cleanly
5366        let lsn = writer.flush_wal().await?;
5367        // LSN should be 0 or 1 (no real mutations flushed)
5368        assert!(lsn <= 1, "Empty flush should produce low LSN, got {}", lsn);
5369
5370        Ok(())
5371    }
5372
5373    /// Test that transaction data does not leak into main L0 without commit.
5374    #[tokio::test]
5375    async fn test_transaction_isolation_without_commit() -> Result<()> {
5376        use crate::runtime::wal::WriteAheadLog;
5377        use crate::storage::manager::StorageManager;
5378        use object_store::local::LocalFileSystem;
5379        use object_store::path::Path as ObjectStorePath;
5380        use uni_common::core::schema::SchemaManager;
5381
5382        let dir = tempdir()?;
5383        let path = dir.path().to_str().unwrap();
5384        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5385        let schema_path = ObjectStorePath::from("schema.json");
5386
5387        let schema_manager =
5388            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5389        let _label_id = schema_manager.add_label("Person")?;
5390        schema_manager.save().await?;
5391
5392        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5393
5394        let wal_path = ObjectStorePath::from("wal");
5395        let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
5396
5397        let writer = Writer::new_with_config(
5398            storage.clone(),
5399            schema_manager.clone(),
5400            1,
5401            UniConfig::default(),
5402            Some(wal),
5403            None,
5404        )
5405        .await?;
5406
5407        // Create transaction L0
5408        let tx_l0 = writer.create_transaction_l0();
5409
5410        // Insert vertex into transaction L0
5411        let vid = writer.next_vid().await?;
5412        writer
5413            .insert_vertex_with_labels(
5414                vid,
5415                [("name".to_string(), Value::String("Ghost".to_string()))]
5416                    .into_iter()
5417                    .collect(),
5418                &["Person".to_string()],
5419                Some(&tx_l0),
5420            )
5421            .await?;
5422
5423        // Verify data is in transaction L0
5424        assert!(
5425            tx_l0.read().vertex_properties.contains_key(&vid),
5426            "Transaction L0 should contain the vertex"
5427        );
5428
5429        // Verify data is NOT in main L0
5430        let main_l0 = writer.l0_manager.get_current();
5431        assert!(
5432            !main_l0.read().vertex_properties.contains_key(&vid),
5433            "Main L0 should NOT contain uncommitted transaction data"
5434        );
5435
5436        // Drop transaction without committing — data should be lost
5437        drop(tx_l0);
5438
5439        // Main L0 still should not have it
5440        assert!(
5441            !main_l0.read().vertex_properties.contains_key(&vid),
5442            "Main L0 should remain clean after dropped transaction"
5443        );
5444
5445        Ok(())
5446    }
5447
5448    /// Phase 2 Day 12: the fork-fragment warn fires exactly once when
5449    /// the flush count crosses the configured threshold and stays
5450    /// silent on subsequent flushes for the lifetime of the writer.
5451    /// Primary writers (`fork_id == None`) never fire it.
5452    ///
5453    /// Tested directly against `tick_fork_fragment_observability` so
5454    /// the contract is locked in independently of the broader
5455    /// `flush_to_l1` path (the end-to-end fork-flush path is blocked
5456    /// on Day 10's on-the-fly schema overlay growth).
5457    #[tokio::test]
5458    async fn fork_fragment_warn_fires_once_then_silences() -> Result<()> {
5459        use crate::storage::manager::StorageManager;
5460        use object_store::local::LocalFileSystem;
5461        use object_store::path::Path as ObjectStorePath;
5462        use uni_common::core::fork::ForkId;
5463        use uni_common::core::schema::SchemaManager;
5464
5465        let dir = tempdir()?;
5466        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5467        let schema_path = ObjectStorePath::from("schema.json");
5468        let schema_manager =
5469            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5470        let storage = Arc::new(
5471            StorageManager::new(dir.path().to_str().unwrap(), schema_manager.clone()).await?,
5472        );
5473
5474        let config = UniConfig {
5475            fork_fragment_warn_threshold: 3,
5476            ..Default::default()
5477        };
5478        let mut writer =
5479            Writer::new_with_config(storage, schema_manager, 1, config, None, None).await?;
5480
5481        // Primary path: never fires.
5482        for _ in 0..10 {
5483            writer.tick_fork_fragment_observability();
5484        }
5485        assert!(!writer.fork_fragment_warn_fired.load(Ordering::Relaxed));
5486        assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 0);
5487
5488        // Fork path: tag and tick. Below threshold → no fire.
5489        writer.fork_id = Some(ForkId::new());
5490        writer.tick_fork_fragment_observability();
5491        writer.tick_fork_fragment_observability();
5492        assert!(!writer.fork_fragment_warn_fired.load(Ordering::Relaxed));
5493        assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 2);
5494
5495        // Crossing threshold → fires once.
5496        writer.tick_fork_fragment_observability();
5497        assert!(writer.fork_fragment_warn_fired.load(Ordering::Relaxed));
5498        assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 3);
5499
5500        // Subsequent ticks bump the gauge but do not re-fire.
5501        let fired_after = writer.fork_fragment_warn_fired.load(Ordering::Relaxed);
5502        for _ in 0..5 {
5503            writer.tick_fork_fragment_observability();
5504        }
5505        assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 8);
5506        assert_eq!(
5507            writer.fork_fragment_warn_fired.load(Ordering::Relaxed),
5508            fired_after
5509        );
5510
5511        Ok(())
5512    }
5513
5514    /// The hot-path mutators must not write to any `Writer` struct field.
5515    /// Phase 2 of the refactor
5516    /// gave them `&self` receivers, which the compiler enforces against
5517    /// direct `self.x = y` assignment — but interior-mutable writes
5518    /// (Mutex/Atomic/OnceLock) still compile. This regression test snapshots
5519    /// every potentially-writable field, calls each hot-path mutator, and
5520    /// asserts no field changed.
5521    ///
5522    /// Cold-path methods (`flush_to_l1`, `commit_transaction_l0`,
5523    /// `tick_fork_fragment_observability`) DO mutate fields by design and
5524    /// are intentionally out of scope here.
5525    #[tokio::test]
5526    async fn hot_path_mutators_do_not_change_writer_fields() -> Result<()> {
5527        use crate::storage::manager::StorageManager;
5528        use object_store::local::LocalFileSystem;
5529        use object_store::path::Path as ObjectStorePath;
5530        use uni_common::core::schema::SchemaManager;
5531
5532        let dir = tempdir()?;
5533        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5534        let schema_path = ObjectStorePath::from("schema.json");
5535        let schema_manager =
5536            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5537        schema_manager.add_label("Person")?;
5538        schema_manager.save().await?;
5539        let storage = Arc::new(
5540            StorageManager::new(dir.path().to_str().unwrap(), schema_manager.clone()).await?,
5541        );
5542
5543        let writer =
5544            Writer::new_with_config(storage, schema_manager, 1, UniConfig::default(), None, None)
5545                .await?;
5546
5547        /// Captures every `Writer` field that *could* be written by a
5548        /// hot-path mutator (i.e., every non-Arc, non-immutable-after-
5549        /// construction field). Arc'd substructures (`l0_manager`,
5550        /// `storage`, etc.) are intentionally not checked — they are
5551        /// re-pointed only at construction.
5552        #[derive(Debug, PartialEq)]
5553        struct Snapshot {
5554            last_flush_time: std::time::Instant,
5555            cached_manifest_some: bool,
5556            fork_flush_count: u64,
5557            fork_fragment_warn_fired: bool,
5558            xervo_runtime_some: bool,
5559            index_rebuild_manager_some: bool,
5560            fork_id: Option<ForkId>,
5561        }
5562
5563        fn snap(w: &Writer) -> Snapshot {
5564            Snapshot {
5565                last_flush_time: *w.last_flush_time.lock(),
5566                cached_manifest_some: w.cached_manifest.lock().is_some(),
5567                fork_flush_count: w.fork_flush_count.load(Ordering::Relaxed),
5568                fork_fragment_warn_fired: w.fork_fragment_warn_fired.load(Ordering::Relaxed),
5569                xervo_runtime_some: w.xervo_runtime.get().is_some(),
5570                index_rebuild_manager_some: w.index_rebuild_manager.get().is_some(),
5571                fork_id: w.fork_id,
5572            }
5573        }
5574
5575        // 1. insert_vertex_with_labels
5576        let before = snap(&writer);
5577        let vid = writer.next_vid().await?;
5578        writer
5579            .insert_vertex_with_labels(vid, Properties::new(), &["Person".to_string()], None)
5580            .await?;
5581        assert_eq!(
5582            snap(&writer),
5583            before,
5584            "insert_vertex_with_labels mutated a Writer field"
5585        );
5586
5587        // 2. insert_vertices_batch
5588        let before = snap(&writer);
5589        let vids = writer.allocate_vids(2).await?;
5590        writer
5591            .insert_vertices_batch(
5592                vids,
5593                vec![Properties::new(), Properties::new()],
5594                vec!["Person".into()],
5595                None,
5596            )
5597            .await?;
5598        assert_eq!(
5599            snap(&writer),
5600            before,
5601            "insert_vertices_batch mutated a Writer field"
5602        );
5603
5604        // 3. delete_vertex
5605        let before = snap(&writer);
5606        writer.delete_vertex(vid, None, None).await?;
5607        assert_eq!(
5608            snap(&writer),
5609            before,
5610            "delete_vertex mutated a Writer field"
5611        );
5612
5613        // (insert_edge / delete_edge are skipped here: their fixture cost is
5614        // disproportionate to the audit's marginal value, and the same
5615        // structural argument plus the compiler-enforced `&self` covers them.)
5616
5617        Ok(())
5618    }
5619}