Skip to main content

uni_store/runtime/
writer.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::runtime::context::QueryContext;
5use crate::runtime::flush_coordinator::{
6    FinalizeFn, FlushCoordinator, FlushOutcome as AsyncFlushOutcome, RotatedFlush, SharedFlushCtx,
7};
8use crate::runtime::id_allocator::IdAllocator;
9use crate::runtime::l0::{L0Buffer, serialize_constraint_key};
10use crate::runtime::l0_manager::L0Manager;
11use crate::runtime::property_manager::PropertyManager;
12use crate::runtime::wal::WriteAheadLog;
13use crate::storage::adjacency_manager::AdjacencyManager;
14use crate::storage::delta::{L1Entry, Op};
15use crate::storage::main_edge::MainEdgeDataset;
16use crate::storage::main_vertex::MainVertexDataset;
17use crate::storage::manager::StorageManager;
18use anyhow::{Result, anyhow};
19use chrono::Utc;
20use metrics;
21use parking_lot::{Mutex as PlMutex, RwLock};
22use std::collections::{HashMap, HashSet};
23use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
24use std::sync::{Arc, OnceLock};
25use tracing::{debug, info, instrument};
26use uni_common::Properties;
27use uni_common::Value;
28use uni_common::config::UniConfig;
29use uni_common::core::fork::ForkId;
30use uni_common::core::id::{Eid, Vid};
31use uni_common::core::schema::{ConstraintTarget, ConstraintType, IndexDefinition};
32use uni_common::core::snapshot::{EdgeSnapshot, LabelSnapshot, SnapshotManifest};
33use uni_xervo::runtime::ModelRuntime;
34use uuid::Uuid;
35
36#[derive(Clone, Debug)]
37pub struct WriterConfig {
38    pub max_mutations: usize,
39    /// Enable the partial-column MergeInsert path for SET-only flushes.
40    ///
41    /// When `true`, `Writer::insert_vertex_partial` records the touched
42    /// property keys into `L0Buffer::vertex_partial_keys` and the flush
43    /// routes those VIDs through Lance `MergeInsertBuilder` with a
44    /// subset-of-schema source, skipping the read of (and write of)
45    /// the unchanged columns — including wide ones like embeddings.
46    ///
47    /// When `false`, `insert_vertex_partial` falls back to the
48    /// read-modify-write `insert_vertex_with_labels` path (preserving
49    /// bit-for-bit equivalence with prior releases). Default `false`
50    /// for the first release; flip to `true` after telemetry on the
51    /// issue #72 ingest workload confirms the win.
52    ///
53    /// See the soundness probe at
54    /// `crates/uni-store/tests/common/storage/lance_merge_insert_probe.rs`.
55    pub partial_lance_writes: bool,
56}
57
58impl Default for WriterConfig {
59    fn default() -> Self {
60        Self {
61            max_mutations: 10_000,
62            partial_lance_writes: false,
63        }
64    }
65}
66
67/// RAII latch on [`StorageManager::flush_in_progress`].
68///
69/// Sets the flag to `true` on construction (via CAS) and back to `false` on
70/// drop, so any `?` early-exit inside `flush_to_l1` cannot leave the flag
71/// stuck. Returns `None` if a flush is already in progress, providing
72/// forward-compatible exclusion once the outer writer-RwLock is removed in
73/// Phase 4 of the concurrent-writer refactor.
74// FlushInProgressGuard moved to storage/manager.rs so flush_coordinator.rs
75// can hold it on RotatedFlush without a writer.rs back-import cycle.
76pub use crate::storage::manager::FlushInProgressGuard;
77
78/// Output of [`Writer::flush_l0_rotate`]: the to-be-flushed L0 buffer,
79/// captured WAL LSN, current_version, and the in-progress guard whose
80/// lifetime spans the full flush (including the future async stream
81/// phase that runs on a spawned task).
82struct RotateOutput {
83    old_l0_arc: Arc<RwLock<L0Buffer>>,
84    wal_lsn: u64,
85    current_version: u64,
86    flush_in_progress_guard: FlushInProgressGuard,
87}
88
89/// Project a property map to a subset selected by `keys`. Used to
90/// run `touched_needs_full_read` against just the SET-touched keys
91/// when the caller passes a fully-merged `props` map.
92fn props_subset(props: &Properties, keys: &HashSet<String>) -> Properties {
93    let mut out = Properties::new();
94    for k in keys {
95        if let Some(v) = props.get(k) {
96            out.insert(k.clone(), v.clone());
97        }
98    }
99    out
100}
101
102/// Output of [`Writer::flush_stream_l1`]: the built (but not yet
103/// published) snapshot manifest and its id. Finalize is responsible
104/// for `save_snapshot` + `set_latest_snapshot` + `cached_manifest`
105/// update.
106struct FlushOutcome {
107    manifest: SnapshotManifest,
108    snapshot_id: String,
109}
110
111pub struct Writer {
112    pub l0_manager: Arc<L0Manager>,
113    pub storage: Arc<StorageManager>,
114    pub schema_manager: Arc<uni_common::core::schema::SchemaManager>,
115    pub allocator: Arc<IdAllocator>,
116    pub config: UniConfig,
117    /// Optional embedding runtime. `OnceLock` so the initializer can run
118    /// on `&self` after the `Writer` has been wrapped in `Arc<Writer>`
119    /// (Phase 4 of concurrent_writer.md). Read through
120    /// [`Writer::xervo_runtime`] — the field itself is private to keep
121    /// callers oblivious to the OnceLock representation.
122    xervo_runtime: OnceLock<Arc<ModelRuntime>>,
123    /// Property manager for cache invalidation after flush
124    pub property_manager: Option<Arc<PropertyManager>>,
125    /// Adjacency manager for dual-write (edges survive flush).
126    adjacency_manager: Arc<AdjacencyManager>,
127    /// Timestamp of last flush or creation. Interior-mutable so that
128    /// `&self` callers can update it; uncontended in practice because all
129    /// writes happen inside the single-flusher critical section.
130    /// Arc-wrapped so it can travel into the SharedFlushCtx that the
131    /// async-flush coordinator passes to spawned stream/finalize tasks.
132    last_flush_time: Arc<PlMutex<std::time::Instant>>,
133    /// Background compaction task handle (prevents concurrent compaction races)
134    compaction_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
135    /// Optional index rebuild manager for post-flush automatic rebuild scheduling.
136    /// `OnceLock` for the same reason as `xervo_runtime`.
137    /// Wrapped in `Arc` so the async-flush finalize path can read it
138    /// from a spawned task via `SharedFlushCtx`.
139    index_rebuild_manager: Arc<OnceLock<Arc<crate::storage::index_rebuild::IndexRebuildManager>>>,
140    /// Cached snapshot manifest from the last flush. Avoids re-reading from
141    /// object store on every flush_to_l1 call. Wrapped in a `Mutex` for
142    /// `&self` access; uncontended because all access is inside the
143    /// single-flusher critical section.
144    cached_manifest: Arc<PlMutex<Option<SnapshotManifest>>>,
145    /// Identifier of the fork this writer serves, if any. `None` for
146    /// primary's writer. Set by [`crate::fork::writer_factory::new_for_fork`]
147    /// and read in `flush_to_l1` to emit fork-tagged metrics and to fire
148    /// the fragment-count guard rail (Phase 2 Day 12).
149    pub fork_id: Option<ForkId>,
150    /// Number of `flush_to_l1` calls since this writer was constructed.
151    /// Used as a proxy for L1 fragment growth on the fork's branches:
152    /// each flush typically appends ~1 fragment per touched dataset, so
153    /// the count tracks the order of magnitude of fragment accumulation.
154    /// Reading the actual `Dataset::manifest().fragments.len()` per
155    /// flush would add a per-dataset object-store roundtrip on the hot
156    /// commit path; the proxy keeps the guard rail purely observational
157    /// (Phase 5 introduces fork compaction proper). Only meaningful when
158    /// `fork_id.is_some()`. `Relaxed` is sufficient — observational only.
159    fork_flush_count: Arc<AtomicU64>,
160    /// Whether the fork-fragment warning has already fired at the
161    /// configured threshold. One-shot per writer lifetime. `Relaxed` is
162    /// sufficient — observational only.
163    fork_fragment_warn_fired: Arc<AtomicBool>,
164    /// Dedicated lock for the genuinely-exclusive flush path. Acquired by
165    /// the [`Writer::flush_to_l1`] entry and by `commit_transaction_l0`
166    /// across its WAL-append + L0-merge window. Replaces the outer
167    /// `Arc<RwLock<Writer>>` for flush exclusion once Phase 4 drops it.
168    /// Arc-wrapped so async-flush coordinator's finalize path can
169    /// re-acquire it from a spawned task via SharedFlushCtx.
170    flush_lock: Arc<tokio::sync::Mutex<()>>,
171    /// Coordinator for async-flush pipeline. Owns the back-pressure
172    /// semaphore, rotate-order sequence, single-finalizer task, and
173    /// pending-flush counter. Always present even when async flush is
174    /// disabled — the sync `flush_to_l1` path uses it for the future
175    /// `FlushInProgressGuard`/permit ownership model.
176    /// Coordinator is `None` when `async_flush_enabled = false`. The
177    /// coordinator's finalizer task captures `SharedFlushCtx` which
178    /// includes `Arc<StorageManager>`; on a fork-scoped Writer that
179    /// also pins the fork's `ForkScope` via `storage.fork_scope`, so
180    /// the holder count never drops. Constructing it only when the
181    /// feature is actually on avoids that side-effect for all
182    /// existing sync-flush paths. When async-flush graduates from
183    /// opt-in to default (Commit 12), `drop_fork` (Commit 8) handles
184    /// the drain explicitly.
185    #[allow(dead_code)] // first production use lands in Commit 6/7
186    pub(crate) flush_coordinator: Option<Arc<crate::runtime::flush_coordinator::FlushCoordinator>>,
187    /// Optimistic-concurrency commit-sequence counter (SSI). Incremented once
188    /// per successful commit under `flush_lock`; a transaction captures the
189    /// current value at begin as its read sequence (`L0Buffer::occ_read_seq`).
190    ///
191    /// Always allocated; consulted only when `config.ssi_enabled` is `true`.
192    commit_sequence: Arc<AtomicU64>,
193    /// Bounded log of recently-committed write-sets for OCC conflict detection.
194    /// Read and updated only under `flush_lock`.
195    ///
196    /// Always allocated; consulted only when `config.ssi_enabled` is `true`.
197    committed_writes: Arc<PlMutex<crate::runtime::occ::CommitRegistry>>,
198    /// Per-row pessimistic locks for `FOR UPDATE` (SSI escape hatch), keyed by
199    /// canonical (label, key-props) bytes. A transaction holds the lock from
200    /// MATCH until commit/rollback, serializing concurrent `FOR UPDATE` writers
201    /// on the same key (avoiding optimistic abort-retry on hot keys).
202    ///
203    /// Always allocated; populated only when `config.ssi_enabled` is `true`.
204    for_update_locks: Arc<dashmap::DashMap<Vec<u8>, Arc<tokio::sync::Mutex<()>>>>,
205}
206
207/// Number of recent commits retained for OCC conflict detection. Large enough
208/// that under-run — and the resulting conservative abort — is rare in practice;
209/// each entry is a small set of touched ids.
210const OCC_REGISTRY_CAPACITY: usize = 4096;
211
212impl Writer {
213    pub async fn new(
214        storage: Arc<StorageManager>,
215        schema_manager: Arc<uni_common::core::schema::SchemaManager>,
216        start_version: u64,
217    ) -> Result<Self> {
218        Self::new_with_config(
219            storage,
220            schema_manager,
221            start_version,
222            UniConfig::default(),
223            None,
224            None,
225        )
226        .await
227    }
228
229    pub async fn new_with_config(
230        storage: Arc<StorageManager>,
231        schema_manager: Arc<uni_common::core::schema::SchemaManager>,
232        start_version: u64,
233        config: UniConfig,
234        wal: Option<Arc<WriteAheadLog>>,
235        allocator: Option<Arc<IdAllocator>>,
236    ) -> Result<Self> {
237        let allocator = if let Some(a) = allocator {
238            a
239        } else {
240            let store = storage.store();
241            let path = object_store::path::Path::from("id_allocator.json");
242            Arc::new(IdAllocator::new(store, path, 1000).await?)
243        };
244
245        let l0_manager = Arc::new(L0Manager::new(start_version, wal));
246
247        let property_manager = Some(Arc::new(PropertyManager::new(
248            storage.clone(),
249            schema_manager.clone(),
250            1000,
251        )));
252
253        let adjacency_manager = storage.adjacency_manager();
254
255        // Hoist the Arc'd fields so we can both stash them on Writer and
256        // hand the same Arcs to the SharedFlushCtx that FlushCoordinator
257        // captures. Single-source-of-truth for each piece of mutable
258        // shared state.
259        let last_flush_time = Arc::new(PlMutex::new(std::time::Instant::now()));
260        let cached_manifest = Arc::new(PlMutex::new(None));
261        let fork_flush_count = Arc::new(AtomicU64::new(0));
262        let fork_fragment_warn_fired = Arc::new(AtomicBool::new(false));
263        let flush_lock = Arc::new(tokio::sync::Mutex::new(()));
264        let compaction_handle = Arc::new(RwLock::new(None));
265        let index_rebuild_manager: Arc<
266            OnceLock<Arc<crate::storage::index_rebuild::IndexRebuildManager>>,
267        > = Arc::new(OnceLock::new());
268
269        let flush_coordinator = if config.async_flush_enabled {
270            let shared = SharedFlushCtx {
271                storage: storage.clone(),
272                l0_manager: l0_manager.clone(),
273                adjacency_manager: adjacency_manager.clone(),
274                property_manager: property_manager.clone(),
275                schema_manager: schema_manager.clone(),
276                cached_manifest: cached_manifest.clone(),
277                last_flush_time: last_flush_time.clone(),
278                fork_id: None,
279                fork_flush_count: fork_flush_count.clone(),
280                fork_fragment_warn_fired: fork_fragment_warn_fired.clone(),
281                fork_fragment_warn_threshold: config.fork_fragment_warn_threshold,
282                flush_lock: flush_lock.clone(),
283                index_rebuild_manager: index_rebuild_manager.clone(),
284                compaction_handle: compaction_handle.clone(),
285                compaction_config: config.compaction.clone(),
286                index_rebuild_config: config.index_rebuild.clone(),
287                auto_rebuild_enabled: config.index_rebuild.auto_rebuild_enabled,
288            };
289            let finalize_fn: Arc<dyn FinalizeFn> = Arc::new(WriterFinalizer);
290            Some(Arc::new(FlushCoordinator::new(
291                config.max_pending_flushes,
292                shared,
293                finalize_fn,
294            )))
295        } else {
296            None
297        };
298
299        let commit_sequence = Arc::new(AtomicU64::new(0));
300        let committed_writes = Arc::new(PlMutex::new(crate::runtime::occ::CommitRegistry::new(
301            OCC_REGISTRY_CAPACITY,
302        )));
303        let for_update_locks = Arc::new(dashmap::DashMap::new());
304
305        Ok(Self {
306            l0_manager,
307            storage,
308            schema_manager,
309            allocator,
310            config,
311            xervo_runtime: OnceLock::new(),
312            property_manager,
313            adjacency_manager,
314            last_flush_time,
315            compaction_handle,
316            index_rebuild_manager,
317            cached_manifest,
318            fork_id: None,
319            fork_flush_count,
320            fork_fragment_warn_fired,
321            flush_lock,
322            flush_coordinator,
323            commit_sequence,
324            committed_writes,
325            for_update_locks,
326        })
327    }
328
329    /// Returns the shared pessimistic lock handle for a `FOR UPDATE` row key,
330    /// creating it on first use. The caller `.lock_owned().await`s the returned
331    /// mutex and holds the guard for the transaction's lifetime.
332    pub fn row_lock_handle(&self, key: &[u8]) -> Arc<tokio::sync::Mutex<()>> {
333        self.for_update_locks
334            .entry(key.to_vec())
335            .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
336            .clone()
337    }
338
339    /// Prunes `FOR UPDATE` lock-map entries for `keys` that no live transaction
340    /// holds anymore, so the map does not grow without bound across the keyspace.
341    ///
342    /// Called when a transaction ends, **after** its guards have been dropped.
343    /// `remove_if` evaluates its predicate under the DashMap shard lock, which is
344    /// the same lock `row_lock_handle` takes to clone an entry — so the check
345    /// `strong_count == 1` (only the map holds the `Arc`) is race-free: a
346    /// concurrent acquirer either already cloned the `Arc` (count ≥ 2 → we skip
347    /// removal) or has not yet taken the shard lock (it will mint a fresh entry
348    /// after we remove). Either way no two transactions ever lock different
349    /// `Mutex` instances for the same key.
350    pub fn release_for_update_locks(&self, keys: &[Vec<u8>]) {
351        for key in keys {
352            self.for_update_locks
353                .remove_if(key, |_, handle| Arc::strong_count(handle) == 1);
354        }
355    }
356
357    /// Number of live entries in the `FOR UPDATE` lock map. Introspection for
358    /// tests that the map does not leak entries across transactions (G5).
359    pub fn for_update_lock_count(&self) -> usize {
360        self.for_update_locks.len()
361    }
362
363    /// The current OCC commit sequence. A `FOR UPDATE` acquisition re-stamps a
364    /// fresh transaction's `occ_read_seq` to this so its conflict-detection
365    /// baseline advances to lock-acquisition time (read-latest under the lock).
366    pub fn current_commit_sequence(&self) -> u64 {
367        self.commit_sequence.load(Ordering::Relaxed)
368    }
369
370    /// Build a fresh `SharedFlushCtx` from this Writer's current state.
371    /// Used by the async-flush stream/finalize paths to pass into spawned
372    /// tasks without smuggling `Arc<Writer>` (which would create a cycle
373    /// with `flush_coordinator -> FinalizeFn -> Writer`).
374    pub(crate) fn shared_ctx(&self) -> SharedFlushCtx {
375        SharedFlushCtx {
376            storage: self.storage.clone(),
377            l0_manager: self.l0_manager.clone(),
378            adjacency_manager: self.adjacency_manager.clone(),
379            property_manager: self.property_manager.clone(),
380            schema_manager: self.schema_manager.clone(),
381            cached_manifest: self.cached_manifest.clone(),
382            last_flush_time: self.last_flush_time.clone(),
383            fork_id: self.fork_id,
384            fork_flush_count: self.fork_flush_count.clone(),
385            fork_fragment_warn_fired: self.fork_fragment_warn_fired.clone(),
386            fork_fragment_warn_threshold: self.config.fork_fragment_warn_threshold,
387            flush_lock: self.flush_lock.clone(),
388            index_rebuild_manager: self.index_rebuild_manager.clone(),
389            compaction_handle: self.compaction_handle.clone(),
390            compaction_config: self.config.compaction.clone(),
391            index_rebuild_config: self.config.index_rebuild.clone(),
392            auto_rebuild_enabled: self.config.index_rebuild.auto_rebuild_enabled,
393        }
394    }
395
396    /// Borrow the flush coordinator if async flush is enabled.
397    /// Returns `None` when `config.async_flush_enabled = false`.
398    /// External callers (`drop_fork`) use this to drain pending streams.
399    pub fn flush_coordinator(
400        &self,
401    ) -> Option<&Arc<crate::runtime::flush_coordinator::FlushCoordinator>> {
402        self.flush_coordinator.as_ref()
403    }
404
405    /// Set the index rebuild manager for post-flush automatic rebuild scheduling.
406    ///
407    /// One-shot: returns `Err` if already set. The receiver is `&self` so this
408    /// can be called after the `Writer` has been wrapped in `Arc<Writer>`.
409    pub fn set_index_rebuild_manager(
410        &self,
411        manager: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
412    ) -> Result<()> {
413        self.index_rebuild_manager
414            .set(manager)
415            .map_err(|_| anyhow!("index_rebuild_manager already set"))
416    }
417
418    /// Replay WAL mutations into the current L0 buffer.
419    pub async fn replay_wal(&self, wal_high_water_mark: u64) -> Result<usize> {
420        let l0 = self.l0_manager.get_current();
421        let wal = l0.read().wal.clone();
422
423        if let Some(wal) = wal {
424            wal.initialize().await?;
425            let mutations = wal.replay_since(wal_high_water_mark).await?;
426            let count = mutations.len();
427
428            if count > 0 {
429                log::info!(
430                    "Replaying {} mutations from WAL (LSN > {})",
431                    count,
432                    wal_high_water_mark
433                );
434                let mut l0_guard = l0.write();
435                l0_guard.replay_mutations(mutations)?;
436            }
437
438            Ok(count)
439        } else {
440            Ok(0)
441        }
442    }
443
444    /// Allocates the next VID (pure auto-increment).
445    pub async fn next_vid(&self) -> Result<Vid> {
446        self.allocator.allocate_vid().await
447    }
448
449    /// Allocates multiple VIDs at once for bulk operations.
450    /// This is more efficient than calling next_vid() in a loop.
451    pub async fn allocate_vids(&self, count: usize) -> Result<Vec<Vid>> {
452        self.allocator.allocate_vids(count).await
453    }
454
455    /// Allocates the next EID (pure auto-increment).
456    pub async fn next_eid(&self, _type_id: u32) -> Result<Eid> {
457        self.allocator.allocate_eid().await
458    }
459
460    /// Allocates multiple EIDs at once for bulk operations.
461    /// This is more efficient than calling next_eid() in a loop.
462    pub async fn allocate_eids(&self, count: usize) -> Result<Vec<Eid>> {
463        self.allocator.allocate_eids(count).await
464    }
465
466    /// Install the embedding runtime exactly once. Receiver is `&self` so it
467    /// can be called after the `Writer` has been wrapped in `Arc<Writer>`.
468    pub fn set_xervo_runtime(&self, runtime: Arc<ModelRuntime>) -> Result<()> {
469        self.xervo_runtime
470            .set(runtime)
471            .map_err(|_| anyhow!("xervo_runtime already set"))
472    }
473
474    pub fn xervo_runtime(&self) -> Option<Arc<ModelRuntime>> {
475        self.xervo_runtime.get().cloned()
476    }
477
478    /// Create a new empty L0 buffer for transaction-scoped mutations.
479    ///
480    /// Only reads the current version — no exclusive lock required on Writer.
481    /// The returned buffer has no WAL reference; mutations are logged at
482    /// commit time via [`Self::commit_transaction_l0`].
483    pub fn create_transaction_l0(&self) -> Arc<RwLock<L0Buffer>> {
484        let current_version = self.l0_manager.get_current().read().current_version;
485        // Transaction mutations are logged to WAL at COMMIT time, not during the transaction.
486        let buf = L0Buffer::new(current_version, None);
487        // SSI: stamp the OCC read sequence at begin so commit can detect any
488        // transaction that committed since. Gated on the runtime `ssi_enabled`
489        // toggle — when off, `occ_read_set` stays `None` and every downstream
490        // read-set recording / commit validation self-gates to a no-op.
491        let buf = if self.config.ssi_enabled {
492            let mut buf = buf;
493            buf.occ_read_seq = self.commit_sequence.load(Ordering::Relaxed);
494            // The read path records observed ids here for SSI antidependency
495            // detection; commit consults it.
496            buf.occ_read_set = Some(Arc::new(parking_lot::Mutex::new(
497                crate::runtime::l0::OccReadSet::default(),
498            )));
499            buf
500        } else {
501            buf
502        };
503        Arc::new(RwLock::new(buf))
504    }
505
506    /// Resolve the target L0 buffer for a mutation.
507    ///
508    /// When `tx_l0` is `Some`, the mutation targets a transaction-private buffer.
509    /// When `None`, it targets the global L0 from the manager.
510    fn resolve_l0(&self, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) -> Arc<RwLock<L0Buffer>> {
511        tx_l0
512            .cloned()
513            .unwrap_or_else(|| self.l0_manager.get_current())
514    }
515
516    fn update_metrics(&self) {
517        let l0 = self.l0_manager.get_current();
518        let size = l0.read().estimated_size;
519        metrics::gauge!("l0_buffer_size_bytes").set(size as f64);
520    }
521
522    /// Commit an externally-owned transaction L0 buffer.
523    ///
524    /// Writes mutations to WAL, flushes, merges into main L0, and replays
525    /// edges into the AdjacencyManager. Returns the WAL LSN of the commit
526    /// (0 when no WAL is configured).
527    /// Commit a transaction's private L0 buffer into main L0.
528    ///
529    /// Returns `(wal_lsn, flush_pending)`. When `flush_pending == true`, the
530    /// post-commit `should_flush()` predicate fired but no flush ran — the
531    /// caller is expected to spawn a background `flush_to_l1`. This is the
532    /// shape used when `UniConfig::async_flush_enabled` is set, so commits
533    /// don't block on L1-streaming I/O.
534    pub async fn commit_transaction_l0(
535        self: &Arc<Self>,
536        tx_l0_arc: Arc<RwLock<L0Buffer>>,
537    ) -> Result<(u64, bool)> {
538        // Hold `flush_lock` across WAL append + flush + main-L0 merge.
539        // Two concurrent commits serialize here; in Phase 3 the outer
540        // `Arc<RwLock<Writer>>` already provides this exclusion, so the
541        // acquisition is uncontended. Phase 4 drops the outer lock and
542        // this becomes the load-bearing serialization point.
543        let _flush_lock_guard = self.flush_lock.lock().await;
544
545        // Crash-recovery seam: simulate process death immediately after winning
546        // the commit serialization point but before any durable work. No-op
547        // unless built with `--features failpoints`. (See ssi_resilience tests.)
548        fail::fail_point!("commit::after-flush-lock");
549
550        // SSI: optimistic conflict detection. This MUST run before any WAL
551        // write — `flush_wal()` below is the durable commit point and the WAL
552        // has no abort marker, so aborting after it would resurrect this
553        // transaction on crash recovery. The write-set is reused for
554        // registration after a successful merge.
555        // Runtime-gated on `config.ssi_enabled`. When off, no validation runs
556        // and `occ_write_set` is `None`, so the post-merge registration below
557        // is skipped — reproducing last-writer-wins exactly.
558        let occ_write_set: Option<crate::runtime::occ::WriteSet> = if self.config.ssi_enabled {
559            let tx_l0 = tx_l0_arc.read();
560            let read_seq = tx_l0.occ_read_seq;
561            let write_set = crate::runtime::occ::WriteSet::from_l0(&tx_l0);
562            if !write_set.is_empty() {
563                // Telemetry: one validation per non-empty (writing) commit. The
564                // ratio of conflicts to validations is the headline abort rate.
565                metrics::counter!("uni_ssi_commit_validations_total").increment(1);
566                // Read-set is consulted only for writing transactions, so a
567                // read-only commit (empty write-set) runs at snapshot isolation.
568                let read_guard = tx_l0.occ_read_set.as_ref().map(|rs| rs.lock());
569                if let Some(conflict) =
570                    self.committed_writes
571                        .lock()
572                        .check(read_seq, &write_set, read_guard.as_deref())
573                {
574                    use crate::runtime::occ::Conflict;
575                    match &conflict {
576                        Conflict::WriteWrite { .. } => metrics::counter!(
577                            "uni_ssi_serialization_conflicts_total",
578                            "kind" => "write_write",
579                        )
580                        .increment(1),
581                        Conflict::ReadWrite { .. } => metrics::counter!(
582                            "uni_ssi_serialization_conflicts_total",
583                            "kind" => "read_write",
584                        )
585                        .increment(1),
586                        Conflict::HistoryTruncated { .. } => {
587                            metrics::counter!("uni_ssi_history_truncated_total").increment(1)
588                        }
589                    }
590                    return Err(anyhow::Error::new(
591                        uni_common::UniError::SerializationConflict {
592                            message: conflict.to_string(),
593                        },
594                    ));
595                }
596            }
597
598            // Validate against the committed main L0 under `flush_lock`:
599            // serializable MERGE uniqueness + CRDT carve-out soundness.
600            {
601                let main_l0 = self.l0_manager.get_current();
602                let main_l0 = main_l0.read();
603
604                // SSI / serializable MERGE: abort if a concurrent transaction has
605                // already committed a row with one of this transaction's unique
606                // keys. Commits serialize here, so this closes the race window
607                // left by the per-insert check. (Empty index → no iterations.)
608                for (key, vid) in &tx_l0.constraint_index {
609                    if main_l0.has_constraint_key(key, *vid) {
610                        metrics::counter!("uni_ssi_constraint_conflicts_total").increment(1);
611                        return Err(anyhow::Error::new(
612                            uni_common::UniError::ConstraintConflict {
613                                message: "unique key already committed by a concurrent \
614                                          transaction"
615                                    .to_string(),
616                            },
617                        ));
618                    }
619                }
620
621                // CRDT carve-out soundness: a pure-CRDT write was dropped from the
622                // write-set assuming its merge commutes. If main L0 holds a
623                // *different* CRDT variant for the same property, the merge would
624                // silently overwrite it — abort instead of losing the update.
625                if let Some(conflict) =
626                    crate::runtime::occ::crdt_carveout_overwrite(&tx_l0, &main_l0)
627                {
628                    metrics::counter!("uni_ssi_crdt_aborts_total").increment(1);
629                    return Err(anyhow::Error::new(
630                        uni_common::UniError::SerializationConflict {
631                            message: conflict.to_string(),
632                        },
633                    ));
634                }
635            }
636            Some(write_set)
637        } else {
638            None
639        };
640
641        // Crash-recovery seam: SSI validation has passed; the transaction is
642        // about to become durable. A crash here must leave NO trace (validation
643        // happens before the WAL is touched). No-op unless `failpoints`.
644        fail::fail_point!("commit::after-validate");
645
646        // 1. Write transaction mutations to WAL BEFORE merging into main L0
647        // This ensures durability before visibility.
648        {
649            let tx_l0 = tx_l0_arc.read();
650            let main_l0_arc = self.l0_manager.get_current();
651            let main_l0 = main_l0_arc.read();
652
653            // If WAL exists, write mutations to it for durability
654            if let Some(wal) = main_l0.wal.as_ref() {
655                // Order: vertices first, then edges (to ensure src/dst exist on replay)
656
657                // Vertex insertions
658                for (vid, properties) in &tx_l0.vertex_properties {
659                    if !tx_l0.vertex_tombstones.contains(vid) {
660                        let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
661                        wal.append(&crate::runtime::wal::Mutation::InsertVertex {
662                            vid: *vid,
663                            properties: properties.clone(),
664                            labels,
665                        })?;
666                    }
667                }
668
669                // Vertex deletions
670                for vid in &tx_l0.vertex_tombstones {
671                    let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
672                    wal.append(&crate::runtime::wal::Mutation::DeleteVertex { vid: *vid, labels })?;
673                }
674
675                // Label-only mutations (SET n:Label / REMOVE n:Label). After
676                // vertex inserts (so the vertex exists on replay), before edges,
677                // and skipping vertices deleted in this same commit.
678                for vid in &tx_l0.vertex_label_overwrites {
679                    if tx_l0.vertex_tombstones.contains(vid) {
680                        continue;
681                    }
682                    let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
683                    wal.append(&crate::runtime::wal::Mutation::SetVertexLabels {
684                        vid: *vid,
685                        labels,
686                    })?;
687                }
688
689                // Crash-recovery seam: vertices appended, edges not yet. Tests
690                // assert that a crash here (before `flush_wal`) recovers NOTHING
691                // — the durable commit point is the flush below, not append.
692                fail::fail_point!("commit::mid-wal");
693
694                // Edge insertions and deletions from edge_endpoints
695                for (eid, (src_vid, dst_vid, edge_type)) in &tx_l0.edge_endpoints {
696                    if tx_l0.tombstones.contains_key(eid) {
697                        let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
698                        wal.append(&crate::runtime::wal::Mutation::DeleteEdge {
699                            eid: *eid,
700                            src_vid: *src_vid,
701                            dst_vid: *dst_vid,
702                            edge_type: *edge_type,
703                            version,
704                        })?;
705                    } else {
706                        let properties =
707                            tx_l0.edge_properties.get(eid).cloned().unwrap_or_default();
708                        let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
709                        let edge_type_name = tx_l0.edge_types.get(eid).cloned();
710                        wal.append(&crate::runtime::wal::Mutation::InsertEdge {
711                            src_vid: *src_vid,
712                            dst_vid: *dst_vid,
713                            edge_type: *edge_type,
714                            eid: *eid,
715                            version,
716                            properties,
717                            edge_type_name,
718                        })?;
719                    }
720                }
721
722                // Tombstones for edges that only exist in the global L0 (not in
723                // this transaction's edge_endpoints).  Without this, deletes of
724                // pre-existing edges would be silently lost.
725                for (eid, tombstone) in &tx_l0.tombstones {
726                    if !tx_l0.edge_endpoints.contains_key(eid) {
727                        let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
728                        wal.append(&crate::runtime::wal::Mutation::DeleteEdge {
729                            eid: *eid,
730                            src_vid: tombstone.src_vid,
731                            dst_vid: tombstone.dst_vid,
732                            edge_type: tombstone.edge_type,
733                            version,
734                        })?;
735                    }
736                }
737            }
738        }
739
740        // 2. Flush WAL to durable storage - THIS IS THE COMMIT POINT
741        let wal_lsn = self.flush_wal().await?;
742
743        // Crash-recovery seam: the WAL is durable but main L0 has NOT merged.
744        // A crash here must RECOVER the transaction on replay (it is committed),
745        // even though it was never made visible in-process. No-op unless `failpoints`.
746        fail::fail_point!("commit::after-wal-flush");
747
748        // Component C1: if an outstanding snapshot pins the current generation,
749        // clone it aside (lazy copy-on-write) before merging, so the pinning
750        // transaction's reads stay isolated from this commit. No-op — and zero
751        // cost — when nothing is pinned (the common case). We hold `flush_lock`,
752        // so this cannot race a flush rotate or another commit's merge; the merge
753        // below re-fetches `get_current()`, landing in the fresh post-freeze buffer.
754        // Self-gates on the runtime SSI toggle: a snapshot is only ever pinned by
755        // a transaction begun under `ssi_enabled`, so `is_current_pinned()` is
756        // always false when SSI is off and this is a zero-cost no-op.
757        if self.l0_manager.is_current_pinned() {
758            self.l0_manager.freeze_current_for_snapshot();
759            metrics::counter!("uni_l0_snapshot_freezes_total").increment(1);
760        }
761
762        // 3. Merge into main L0 and make visible
763        {
764            let tx_l0 = tx_l0_arc.read();
765            let main_l0_arc = self.l0_manager.get_current();
766            let mut main_l0 = main_l0_arc.write();
767            main_l0.merge(&tx_l0)?;
768
769            // Replay transaction edges into the AdjacencyManager overlay
770            for (eid, (src, dst, etype)) in &tx_l0.edge_endpoints {
771                let edge_version = tx_l0
772                    .edge_versions
773                    .get(eid)
774                    .copied()
775                    .unwrap_or(main_l0.current_version);
776                if tx_l0.tombstones.contains_key(eid) {
777                    self.adjacency_manager
778                        .add_tombstone(*eid, *src, *dst, *etype, edge_version);
779                } else {
780                    self.adjacency_manager
781                        .insert_edge(*src, *dst, *eid, *etype, edge_version);
782                }
783            }
784
785            // Replay tombstones for edges that only exist in the global L0
786            // (not in this transaction's edge_endpoints).
787            for (eid, tombstone) in &tx_l0.tombstones {
788                if !tx_l0.edge_endpoints.contains_key(eid) {
789                    let edge_version = tx_l0
790                        .edge_versions
791                        .get(eid)
792                        .copied()
793                        .unwrap_or(main_l0.current_version);
794                    self.adjacency_manager.add_tombstone(
795                        *eid,
796                        tombstone.src_vid,
797                        tombstone.dst_vid,
798                        tombstone.edge_type,
799                        edge_version,
800                    );
801                }
802            }
803        }
804
805        // Crash-recovery seam: durable AND merged, but the in-memory commit
806        // registry has not recorded this write-set yet. A crash here is
807        // indistinguishable from one at `after-wal-flush` on reopen (the
808        // registry is in-memory and rebuilt empty); the tx still recovers.
809        fail::fail_point!("commit::after-merge");
810
811        // SSI: register this commit's write-set under a fresh commit sequence so
812        // later transactions detect conflicts against it. Still under
813        // `flush_lock`, before the async-flush branch can drop the guard.
814        // `occ_write_set` is `Some` only when `config.ssi_enabled`.
815        if let Some(write_set) = occ_write_set
816            && !write_set.is_empty()
817        {
818            let seq = self.commit_sequence.fetch_add(1, Ordering::Relaxed) + 1;
819            self.committed_writes.lock().record(seq, write_set);
820        }
821
822        self.update_metrics();
823
824        // 4. Best-effort post-commit auto-flush.
825        //
826        // Two paths:
827        // - async_flush_enabled = false (default): inline under our
828        //   existing flush_lock guard via flush_inline_under_lock.
829        // - async_flush_enabled = true: rotate inline, drop flush_lock,
830        //   then submit the stream phase to the coordinator. Gated on
831        //   `pending_flush_count() < max_pending_flushes` so we don't
832        //   stack up rotations beyond the configured pipeline depth.
833        //   `try_acquire_permit` is non-blocking: if we lose the race
834        //   for the last permit, we just skip this trigger (the next
835        //   commit retries).
836        let mut flush_pending = false;
837        if self.should_flush() {
838            if self.config.async_flush_enabled
839                && let Some(coord) = self.flush_coordinator.as_ref()
840                && coord.pending_flush_count() < self.config.max_pending_flushes
841            {
842                match coord.try_acquire_permit() {
843                    Some(permit) => {
844                        let seq = coord.next_rotate_seq();
845                        coord.note_pending();
846                        match self.flush_l0_rotate().await {
847                            Ok(rotate_out) => {
848                                // Release flush_lock BEFORE the spawn so concurrent
849                                // commits can proceed while the stream runs.
850                                drop(_flush_lock_guard);
851                                let parent_manifest = self.cached_manifest.lock().clone();
852                                let rotated = crate::runtime::flush_coordinator::RotatedFlush {
853                                    seq,
854                                    old_l0_arc: rotate_out.old_l0_arc.clone(),
855                                    wal_lsn: rotate_out.wal_lsn,
856                                    current_version: rotate_out.current_version,
857                                    name: None,
858                                    parent_manifest,
859                                    permit,
860                                    flush_in_progress_guard: rotate_out.flush_in_progress_guard,
861                                };
862                                let writer = self.clone();
863                                let _ticket = coord.submit_for_stream(
864                                    rotated,
865                                    move |old_l0, wal, ver, n| async move {
866                                        let outcome =
867                                            writer.flush_stream_l1(old_l0, wal, ver, n).await?;
868                                        Ok(crate::runtime::flush_coordinator::FlushOutcome {
869                                            new_manifest: outcome.manifest,
870                                            snapshot_id: outcome.snapshot_id,
871                                        })
872                                    },
873                                );
874                                flush_pending = true;
875                                // Early return — flush_lock already dropped.
876                                return Ok((wal_lsn, flush_pending));
877                            }
878                            Err(e) => {
879                                tracing::warn!("Async rotate failed (non-critical): {}", e);
880                                // permit drops here, freeing the slot
881                            }
882                        }
883                    }
884                    None => {
885                        // Race: someone else grabbed the last permit. Skip;
886                        // next commit will retry should_flush().
887                        metrics::counter!("uni_flush_trigger_skipped_total").increment(1);
888                    }
889                }
890            } else if let Err(e) = self.flush_inline_under_lock(None).await {
891                tracing::warn!("Post-commit flush check failed (non-critical): {}", e);
892            }
893        }
894
895        Ok((wal_lsn, flush_pending))
896    }
897
898    /// Flush the WAL buffer to durable storage.
899    ///
900    /// Returns the LSN of the flushed segment, or `0` when no WAL is configured.
901    pub async fn flush_wal(&self) -> Result<u64> {
902        let l0 = self.l0_manager.get_current();
903        let wal = l0.read().wal.clone();
904
905        match wal {
906            Some(wal) => Ok(wal.flush().await?),
907            None => Ok(0),
908        }
909    }
910
911    /// Record property removals in the active L0 mutation stats.
912    ///
913    /// Routes to the transaction L0 if provided, otherwise to the main L0.
914    pub fn track_properties_removed(&self, count: usize, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) {
915        if count == 0 {
916            return;
917        }
918        let l0 = self.resolve_l0(tx_l0);
919        l0.write().mutation_stats.properties_removed += count;
920    }
921
922    /// Validates vertex constraints for the given properties.
923    /// In the new design, label is passed as a parameter since VID no longer embeds label.
924    async fn validate_vertex_constraints_for_label(
925        &self,
926        vid: Vid,
927        properties: &Properties,
928        label: &str,
929        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
930    ) -> Result<()> {
931        self.validate_vertex_constraints_for_label_impl(vid, properties, label, tx_l0, false)
932            .await
933    }
934
935    /// Partial-update sibling: validates only constraints touching keys
936    /// present in `properties` (the touched set). NOT NULL is checked
937    /// only for touched keys; multi-key UNIQUE / CHECK / EXISTS are
938    /// skipped when any referenced key is absent (the caller is
939    /// expected to have routed to the full-row path in that case via
940    /// `touched_needs_full_read`).
941    async fn validate_vertex_constraints_for_label_partial(
942        &self,
943        vid: Vid,
944        properties: &Properties,
945        label: &str,
946        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
947    ) -> Result<()> {
948        self.validate_vertex_constraints_for_label_impl(vid, properties, label, tx_l0, true)
949            .await
950    }
951
952    async fn validate_vertex_constraints_for_label_impl(
953        &self,
954        vid: Vid,
955        properties: &Properties,
956        label: &str,
957        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
958        partial: bool,
959    ) -> Result<()> {
960        let schema = self.schema_manager.schema();
961
962        {
963            // 1. Check NOT NULL constraints (from Property definitions).
964            //    Under partial-update mode, skip properties NOT in
965            //    `properties` — they retain their previous (already-
966            //    validated) value.
967            if let Some(props_meta) = schema.properties.get(label) {
968                for (prop_name, meta) in props_meta {
969                    if !meta.nullable {
970                        let present = properties.get(prop_name);
971                        if partial && present.is_none() {
972                            continue;
973                        }
974                        if present.is_none_or(|v| v.is_null()) {
975                            log::warn!(
976                                "Constraint violation: Property '{}' cannot be null for label '{}'",
977                                prop_name,
978                                label
979                            );
980                            return Err(anyhow!(
981                                "Constraint violation: Property '{}' cannot be null",
982                                prop_name
983                            ));
984                        }
985                    }
986                }
987            }
988
989            // 2. Check Explicit Constraints (Unique, Check, etc.)
990            for constraint in &schema.constraints {
991                if !constraint.enabled {
992                    continue;
993                }
994                match &constraint.target {
995                    ConstraintTarget::Label(l) if l == label => {}
996                    _ => continue,
997                }
998
999                match &constraint.constraint_type {
1000                    ConstraintType::Unique {
1001                        properties: unique_props,
1002                    } => {
1003                        // Support single and multi-property unique constraints
1004                        if !unique_props.is_empty() {
1005                            let mut key_values = Vec::new();
1006                            let mut missing = false;
1007                            for prop in unique_props {
1008                                if let Some(val) = properties.get(prop) {
1009                                    key_values.push((prop.clone(), val.clone()));
1010                                } else {
1011                                    missing = true; // Can't enforce if property missing (partial update?)
1012                                    // For INSERT, missing means null?
1013                                    // If property is nullable, unique constraint typically allows multiple nulls or ignores?
1014                                    // For now, only check if ALL keys are present
1015                                }
1016                            }
1017
1018                            if !missing {
1019                                self.check_unique_constraint_multi(label, &key_values, vid, tx_l0)
1020                                    .await?;
1021                            }
1022                        }
1023                    }
1024                    ConstraintType::Exists { property } => {
1025                        if properties.get(property).is_none_or(|v| v.is_null()) {
1026                            log::warn!(
1027                                "Constraint violation: Property '{}' must exist for label '{}'",
1028                                property,
1029                                label
1030                            );
1031                            return Err(anyhow!(
1032                                "Constraint violation: Property '{}' must exist",
1033                                property
1034                            ));
1035                        }
1036                    }
1037                    ConstraintType::Check { expression } => {
1038                        if !self.evaluate_check_constraint(expression, properties)? {
1039                            return Err(anyhow!(
1040                                "CHECK constraint '{}' violated: expression '{}' evaluated to false",
1041                                constraint.name,
1042                                expression
1043                            ));
1044                        }
1045                    }
1046                    _ => {
1047                        return Err(anyhow!("Unsupported constraint type"));
1048                    }
1049                }
1050            }
1051        }
1052        Ok(())
1053    }
1054
1055    /// Validates vertex constraints for a vertex with the given labels.
1056    /// Labels must be passed explicitly since the vertex may not yet be in L0.
1057    /// Unknown labels (not in schema) are skipped.
1058    async fn validate_vertex_constraints(
1059        &self,
1060        vid: Vid,
1061        properties: &Properties,
1062        labels: &[String],
1063        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1064    ) -> Result<()> {
1065        let schema = self.schema_manager.schema();
1066
1067        // Validate constraints only for known labels
1068        for label in labels {
1069            // Skip unknown labels (schemaless support)
1070            if schema.get_label_case_insensitive(label).is_none() {
1071                continue;
1072            }
1073            self.validate_vertex_constraints_for_label(vid, properties, label, tx_l0)
1074                .await?;
1075        }
1076
1077        // Check global ext_id uniqueness if ext_id is provided
1078        if let Some(ext_id) = properties.get("ext_id").and_then(|v| v.as_str()) {
1079            self.check_extid_globally_unique(ext_id, vid, tx_l0).await?;
1080        }
1081
1082        Ok(())
1083    }
1084
1085    /// Partial sibling of `validate_vertex_constraints` — validates only
1086    /// constraints touching keys present in `properties`. Used by
1087    /// `insert_vertex_partial`'s fast path; the caller pre-screens for
1088    /// multi-key UNIQUE constraints via `touched_needs_full_read`.
1089    async fn validate_vertex_constraints_partial(
1090        &self,
1091        vid: Vid,
1092        touched: &Properties,
1093        labels: &[String],
1094        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1095    ) -> Result<()> {
1096        let schema = self.schema_manager.schema();
1097        for label in labels {
1098            if schema.get_label_case_insensitive(label).is_none() {
1099                continue;
1100            }
1101            self.validate_vertex_constraints_for_label_partial(vid, touched, label, tx_l0)
1102                .await?;
1103        }
1104        if let Some(ext_id) = touched.get("ext_id").and_then(|v| v.as_str()) {
1105            self.check_extid_globally_unique(ext_id, vid, tx_l0).await?;
1106        }
1107        Ok(())
1108    }
1109
1110    /// Collect ext_ids and unique constraint keys from an iterator of vertex properties.
1111    ///
1112    /// Used to build a constraint key index from L0 buffers for batch validation.
1113    fn collect_constraint_keys_from_properties<'a>(
1114        properties_iter: impl Iterator<Item = &'a Properties>,
1115        label: &str,
1116        constraints: &[uni_common::core::schema::Constraint],
1117        existing_keys: &mut HashMap<String, HashSet<String>>,
1118        existing_extids: &mut HashSet<String>,
1119    ) {
1120        for props in properties_iter {
1121            if let Some(ext_id) = props.get("ext_id").and_then(|v| v.as_str()) {
1122                existing_extids.insert(ext_id.to_string());
1123            }
1124
1125            for constraint in constraints {
1126                if !constraint.enabled {
1127                    continue;
1128                }
1129                if let ConstraintTarget::Label(l) = &constraint.target {
1130                    if l != label {
1131                        continue;
1132                    }
1133                } else {
1134                    continue;
1135                }
1136
1137                if let ConstraintType::Unique {
1138                    properties: unique_props,
1139                } = &constraint.constraint_type
1140                {
1141                    let mut key_parts = Vec::new();
1142                    let mut all_present = true;
1143                    for prop in unique_props {
1144                        if let Some(val) = props.get(prop) {
1145                            key_parts.push(format!("{}:{}", prop, val));
1146                        } else {
1147                            all_present = false;
1148                            break;
1149                        }
1150                    }
1151                    if all_present {
1152                        let key = key_parts.join("|");
1153                        existing_keys
1154                            .entry(constraint.name.clone())
1155                            .or_default()
1156                            .insert(key);
1157                    }
1158                }
1159            }
1160        }
1161    }
1162
1163    /// Validates constraints for a batch of vertices efficiently.
1164    ///
1165    /// This method builds an in-memory index from L0 buffers ONCE instead of scanning
1166    /// per vertex, reducing complexity from O(n²) to O(n) for bulk inserts.
1167    ///
1168    /// # Arguments
1169    /// * `vids` - VIDs of vertices being inserted
1170    /// * `properties_batch` - Properties for each vertex
1171    /// * `label` - Label for all vertices (assumes single label for now)
1172    ///
1173    /// # Performance
1174    /// For N vertices with unique constraints:
1175    /// - Old approach: O(N²) - scan L0 buffer N times
1176    /// - New approach: O(N) - scan L0 buffer once, build HashSet, check each vertex in O(1)
1177    async fn validate_vertex_batch_constraints(
1178        &self,
1179        vids: &[Vid],
1180        properties_batch: &[Properties],
1181        label: &str,
1182        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1183    ) -> Result<()> {
1184        if vids.len() != properties_batch.len() {
1185            return Err(anyhow!("VID/properties length mismatch"));
1186        }
1187
1188        let schema = self.schema_manager.schema();
1189
1190        // 1. Validate NOT NULL constraints for each vertex
1191        if let Some(props_meta) = schema.properties.get(label) {
1192            for (idx, properties) in properties_batch.iter().enumerate() {
1193                for (prop_name, meta) in props_meta {
1194                    if !meta.nullable && properties.get(prop_name).is_none_or(|v| v.is_null()) {
1195                        return Err(anyhow!(
1196                            "Constraint violation at index {}: Property '{}' cannot be null",
1197                            idx,
1198                            prop_name
1199                        ));
1200                    }
1201                }
1202            }
1203        }
1204
1205        // 2. Build constraint key index from L0 buffers (ONCE for entire batch)
1206        let mut existing_keys: HashMap<String, HashSet<String>> = HashMap::new();
1207        let mut existing_extids: HashSet<String> = HashSet::new();
1208
1209        // Scan current L0 buffer
1210        {
1211            let l0 = self.l0_manager.get_current();
1212            let l0_guard = l0.read();
1213            Self::collect_constraint_keys_from_properties(
1214                l0_guard.vertex_properties.values(),
1215                label,
1216                &schema.constraints,
1217                &mut existing_keys,
1218                &mut existing_extids,
1219            );
1220        }
1221
1222        // Scan transaction L0 if present
1223        if let Some(tx_l0) = tx_l0 {
1224            let tx_l0_guard = tx_l0.read();
1225            Self::collect_constraint_keys_from_properties(
1226                tx_l0_guard.vertex_properties.values(),
1227                label,
1228                &schema.constraints,
1229                &mut existing_keys,
1230                &mut existing_extids,
1231            );
1232        }
1233
1234        // 3. Check batch vertices against index AND check for duplicates within batch
1235        let mut batch_keys: HashMap<String, HashMap<String, usize>> = HashMap::new();
1236        let mut batch_extids: HashMap<String, usize> = HashMap::new();
1237
1238        for (idx, (_vid, properties)) in vids.iter().zip(properties_batch.iter()).enumerate() {
1239            // Check ext_id uniqueness
1240            if let Some(ext_id) = properties.get("ext_id").and_then(|v| v.as_str()) {
1241                if existing_extids.contains(ext_id) {
1242                    return Err(anyhow!(
1243                        "Constraint violation at index {}: ext_id '{}' already exists",
1244                        idx,
1245                        ext_id
1246                    ));
1247                }
1248                if let Some(first_idx) = batch_extids.get(ext_id) {
1249                    return Err(anyhow!(
1250                        "Constraint violation: ext_id '{}' duplicated in batch at indices {} and {}",
1251                        ext_id,
1252                        first_idx,
1253                        idx
1254                    ));
1255                }
1256                batch_extids.insert(ext_id.to_string(), idx);
1257            }
1258
1259            // Check unique constraints
1260            for constraint in &schema.constraints {
1261                if !constraint.enabled {
1262                    continue;
1263                }
1264                if let ConstraintTarget::Label(l) = &constraint.target {
1265                    if l != label {
1266                        continue;
1267                    }
1268                } else {
1269                    continue;
1270                }
1271
1272                match &constraint.constraint_type {
1273                    ConstraintType::Unique {
1274                        properties: unique_props,
1275                    } => {
1276                        let mut key_parts = Vec::new();
1277                        let mut all_present = true;
1278                        for prop in unique_props {
1279                            if let Some(val) = properties.get(prop) {
1280                                key_parts.push(format!("{}:{}", prop, val));
1281                            } else {
1282                                all_present = false;
1283                                break;
1284                            }
1285                        }
1286
1287                        if all_present {
1288                            let key = key_parts.join("|");
1289
1290                            // Check against existing L0 keys
1291                            if let Some(keys) = existing_keys.get(&constraint.name)
1292                                && keys.contains(&key)
1293                            {
1294                                return Err(anyhow!(
1295                                    "Constraint violation at index {}: Duplicate composite key for label '{}' (constraint '{}')",
1296                                    idx,
1297                                    label,
1298                                    constraint.name
1299                                ));
1300                            }
1301
1302                            // Check for duplicates within batch
1303                            let batch_constraint_keys =
1304                                batch_keys.entry(constraint.name.clone()).or_default();
1305                            if let Some(first_idx) = batch_constraint_keys.get(&key) {
1306                                return Err(anyhow!(
1307                                    "Constraint violation: Duplicate key '{}' in batch at indices {} and {}",
1308                                    key,
1309                                    first_idx,
1310                                    idx
1311                                ));
1312                            }
1313                            batch_constraint_keys.insert(key, idx);
1314                        }
1315                    }
1316                    ConstraintType::Exists { property }
1317                        if properties.get(property).is_none_or(|v| v.is_null()) =>
1318                    {
1319                        return Err(anyhow!(
1320                            "Constraint violation at index {}: Property '{}' must exist",
1321                            idx,
1322                            property
1323                        ));
1324                    }
1325                    ConstraintType::Check { expression }
1326                        if !self.evaluate_check_constraint(expression, properties)? =>
1327                    {
1328                        return Err(anyhow!(
1329                            "Constraint violation at index {}: CHECK constraint '{}' violated",
1330                            idx,
1331                            constraint.name
1332                        ));
1333                    }
1334                    _ => {}
1335                }
1336            }
1337        }
1338
1339        // 4. Check storage for unique constraints (can batch this into a single query)
1340        for constraint in &schema.constraints {
1341            if !constraint.enabled {
1342                continue;
1343            }
1344            if let ConstraintTarget::Label(l) = &constraint.target {
1345                if l != label {
1346                    continue;
1347                }
1348            } else {
1349                continue;
1350            }
1351
1352            if let ConstraintType::Unique {
1353                properties: unique_props,
1354            } = &constraint.constraint_type
1355            {
1356                // Build compound OR filter for all batch vertices
1357                let mut or_filters = Vec::new();
1358                for properties in properties_batch.iter() {
1359                    let mut and_parts = Vec::new();
1360                    let mut all_present = true;
1361                    for prop in unique_props {
1362                        if let Some(val) = properties.get(prop) {
1363                            let val_str = match val {
1364                                Value::String(s) => format!("'{}'", s.replace('\'', "''")),
1365                                Value::Int(n) => n.to_string(),
1366                                Value::Float(f) => f.to_string(),
1367                                Value::Bool(b) => b.to_string(),
1368                                _ => {
1369                                    all_present = false;
1370                                    break;
1371                                }
1372                            };
1373                            and_parts.push(format!("{} = {}", prop, val_str));
1374                        } else {
1375                            all_present = false;
1376                            break;
1377                        }
1378                    }
1379                    if all_present {
1380                        or_filters.push(format!("({})", and_parts.join(" AND ")));
1381                    }
1382                }
1383
1384                #[cfg(feature = "lance-backend")]
1385                if !or_filters.is_empty() {
1386                    let vid_list: Vec<String> =
1387                        vids.iter().map(|v| v.as_u64().to_string()).collect();
1388                    let filter = format!(
1389                        "({}) AND _deleted = false AND _vid NOT IN ({})",
1390                        or_filters.join(" OR "),
1391                        vid_list.join(", ")
1392                    );
1393
1394                    if let Ok(ds) = self.storage.vertex_dataset(label)
1395                        && let Ok(lance_ds) = ds.open_raw().await
1396                    {
1397                        let count = lance_ds.count_rows(Some(filter.clone())).await?;
1398                        if count > 0 {
1399                            return Err(anyhow!(
1400                                "Constraint violation: Duplicate composite key for label '{}' in storage (constraint '{}')",
1401                                label,
1402                                constraint.name
1403                            ));
1404                        }
1405                    }
1406                }
1407            }
1408        }
1409
1410        Ok(())
1411    }
1412
1413    /// Checks that ext_id is globally unique across all vertices.
1414    ///
1415    /// Searches L0 buffers (current, transaction, pending) and the main vertices table
1416    /// to ensure no other vertex uses this ext_id.
1417    ///
1418    /// # Errors
1419    ///
1420    /// Returns error if another vertex with the same ext_id exists.
1421    async fn check_extid_globally_unique(
1422        &self,
1423        ext_id: &str,
1424        current_vid: Vid,
1425        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1426    ) -> Result<()> {
1427        // Check L0 buffers: current, transaction, and pending flush
1428        let l0_buffers_to_check: Vec<Arc<RwLock<L0Buffer>>> = {
1429            let mut buffers = vec![self.l0_manager.get_current()];
1430            if let Some(tx_l0) = tx_l0 {
1431                buffers.push(tx_l0.clone());
1432            }
1433            buffers.extend(self.l0_manager.get_pending_flush());
1434            buffers
1435        };
1436
1437        for l0 in &l0_buffers_to_check {
1438            if let Some(vid) =
1439                Self::find_extid_in_properties(&l0.read().vertex_properties, ext_id, current_vid)
1440            {
1441                return Err(anyhow!(
1442                    "Constraint violation: ext_id '{}' already exists (vertex {:?})",
1443                    ext_id,
1444                    vid
1445                ));
1446            }
1447        }
1448
1449        // Check main vertices table (if it exists)
1450        // Pass None for global uniqueness check (not snapshot-isolated)
1451        let backend = self.storage.backend();
1452        if let Ok(Some(found_vid)) = MainVertexDataset::find_by_ext_id(backend, ext_id, None).await
1453            && found_vid != current_vid
1454        {
1455            return Err(anyhow!(
1456                "Constraint violation: ext_id '{}' already exists (vertex {:?})",
1457                ext_id,
1458                found_vid
1459            ));
1460        }
1461
1462        Ok(())
1463    }
1464
1465    /// Search vertex properties for a duplicate ext_id, excluding `current_vid`.
1466    fn find_extid_in_properties(
1467        vertex_properties: &HashMap<Vid, Properties>,
1468        ext_id: &str,
1469        current_vid: Vid,
1470    ) -> Option<Vid> {
1471        vertex_properties.iter().find_map(|(&vid, props)| {
1472            if vid != current_vid && props.get("ext_id").and_then(|v| v.as_str()) == Some(ext_id) {
1473                Some(vid)
1474            } else {
1475                None
1476            }
1477        })
1478    }
1479
1480    /// Helper to get vertex labels from L0 buffer.
1481    fn get_vertex_labels_from_l0(&self, vid: Vid) -> Option<Vec<String>> {
1482        let l0 = self.l0_manager.get_current();
1483        let l0_guard = l0.read();
1484        // Check if vertex is tombstoned (deleted) - if so, return None
1485        if l0_guard.vertex_tombstones.contains(&vid) {
1486            return None;
1487        }
1488        l0_guard.get_vertex_labels(vid).map(|l| l.to_vec())
1489    }
1490
1491    /// Get vertex labels from all sources: current L0, pending L0s, and storage.
1492    /// This is the proper way to read vertex labels after a flush, as it checks both
1493    /// in-memory buffers and persisted storage.
1494    pub async fn get_vertex_labels(
1495        &self,
1496        vid: Vid,
1497        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1498    ) -> Option<Vec<String>> {
1499        // 1. Check current L0
1500        if let Some(labels) = self.get_vertex_labels_from_l0(vid) {
1501            return Some(labels);
1502        }
1503
1504        // 2. Check transaction L0 if present
1505        if let Some(tx_l0) = tx_l0 {
1506            let guard = tx_l0.read();
1507            if guard.vertex_tombstones.contains(&vid) {
1508                return None;
1509            }
1510            if let Some(labels) = guard.get_vertex_labels(vid) {
1511                return Some(labels.to_vec());
1512            }
1513        }
1514
1515        // 3. Check pending flush L0s
1516        for pending_l0 in self.l0_manager.get_pending_flush() {
1517            let guard = pending_l0.read();
1518            if guard.vertex_tombstones.contains(&vid) {
1519                return None;
1520            }
1521            if let Some(labels) = guard.get_vertex_labels(vid) {
1522                return Some(labels.to_vec());
1523            }
1524        }
1525
1526        // 4. Check storage
1527        self.find_vertex_labels_in_storage(vid).await.ok().flatten()
1528    }
1529
1530    /// Helper to get edge type from L0 buffer.
1531    fn get_edge_type_from_l0(&self, eid: Eid) -> Option<String> {
1532        let l0 = self.l0_manager.get_current();
1533        let l0_guard = l0.read();
1534        l0_guard.get_edge_type(eid).map(|s| s.to_string())
1535    }
1536
1537    /// Look up the edge type ID (u32) for an EID from the L0 buffer's edge endpoints.
1538    /// Falls back to the transaction L0 if available.
1539    pub fn get_edge_type_id_from_l0(
1540        &self,
1541        eid: Eid,
1542        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1543    ) -> Option<u32> {
1544        // Check transaction L0 first
1545        if let Some(tx_l0) = tx_l0 {
1546            let guard = tx_l0.read();
1547            if let Some((_, _, etype)) = guard.get_edge_endpoint_full(eid) {
1548                return Some(etype);
1549            }
1550        }
1551        // Fall back to main L0
1552        let l0 = self.l0_manager.get_current();
1553        let l0_guard = l0.read();
1554        l0_guard
1555            .get_edge_endpoint_full(eid)
1556            .map(|(_, _, etype)| etype)
1557    }
1558
1559    /// Set the type name for an edge (used for schemaless edge types).
1560    /// This is called during CREATE for edge types not found in the schema.
1561    pub fn set_edge_type(
1562        &self,
1563        eid: Eid,
1564        type_name: String,
1565        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1566    ) {
1567        self.resolve_l0(tx_l0).write().set_edge_type(eid, type_name);
1568    }
1569
1570    /// Evaluate a simple CHECK constraint expression.
1571    /// Supports: "property op value" (e.g., "age > 18", "status = 'active'")
1572    fn evaluate_check_constraint(&self, expression: &str, properties: &Properties) -> Result<bool> {
1573        let parts: Vec<&str> = expression.split_whitespace().collect();
1574        if parts.len() != 3 {
1575            // For now, only support "prop op val"
1576            // Fallback to true if too complex to avoid breaking, but warn
1577            log::warn!(
1578                "Complex CHECK constraint expression '{}' not fully supported yet; allowing write.",
1579                expression
1580            );
1581            return Ok(true);
1582        }
1583
1584        let prop_part = parts[0].trim_start_matches('(');
1585        // Handle "variable.property" format - take the part after the dot
1586        let prop_name = if let Some(idx) = prop_part.find('.') {
1587            &prop_part[idx + 1..]
1588        } else {
1589            prop_part
1590        };
1591
1592        let op = parts[1];
1593        let val_str = parts[2].trim_end_matches(')');
1594
1595        let prop_val = match properties.get(prop_name) {
1596            Some(v) => v,
1597            None => return Ok(true), // If property missing, CHECK usually passes (unless NOT NULL)
1598        };
1599
1600        // Parse value string (handle quotes for strings)
1601        let target_val = if (val_str.starts_with('\'') && val_str.ends_with('\''))
1602            || (val_str.starts_with('"') && val_str.ends_with('"'))
1603        {
1604            Value::String(val_str[1..val_str.len() - 1].to_string())
1605        } else if let Ok(n) = val_str.parse::<i64>() {
1606            Value::Int(n)
1607        } else if let Ok(n) = val_str.parse::<f64>() {
1608            Value::Float(n)
1609        } else if let Ok(b) = val_str.parse::<bool>() {
1610            Value::Bool(b)
1611        } else {
1612            // Check for internal format wrappers if they somehow leaked through
1613            if val_str.starts_with("Number(") && val_str.ends_with(')') {
1614                let n_str = &val_str[7..val_str.len() - 1];
1615                if let Ok(n) = n_str.parse::<i64>() {
1616                    Value::Int(n)
1617                } else if let Ok(n) = n_str.parse::<f64>() {
1618                    Value::Float(n)
1619                } else {
1620                    Value::String(val_str.to_string())
1621                }
1622            } else {
1623                Value::String(val_str.to_string())
1624            }
1625        };
1626
1627        match op {
1628            "=" | "==" => Ok(prop_val == &target_val),
1629            "!=" | "<>" => Ok(prop_val != &target_val),
1630            ">" => self
1631                .compare_values(prop_val, &target_val)
1632                .map(|o| o.is_gt()),
1633            "<" => self
1634                .compare_values(prop_val, &target_val)
1635                .map(|o| o.is_lt()),
1636            ">=" => self
1637                .compare_values(prop_val, &target_val)
1638                .map(|o| o.is_ge()),
1639            "<=" => self
1640                .compare_values(prop_val, &target_val)
1641                .map(|o| o.is_le()),
1642            _ => {
1643                log::warn!("Unsupported operator '{}' in CHECK constraint", op);
1644                Ok(true)
1645            }
1646        }
1647    }
1648
1649    fn compare_values(&self, a: &Value, b: &Value) -> Result<std::cmp::Ordering> {
1650        use std::cmp::Ordering;
1651
1652        fn cmp_f64(x: f64, y: f64) -> Ordering {
1653            x.partial_cmp(&y).unwrap_or(Ordering::Equal)
1654        }
1655
1656        match (a, b) {
1657            (Value::Int(n1), Value::Int(n2)) => Ok(n1.cmp(n2)),
1658            (Value::Float(f1), Value::Float(f2)) => Ok(cmp_f64(*f1, *f2)),
1659            (Value::Int(n), Value::Float(f)) => Ok(cmp_f64(*n as f64, *f)),
1660            (Value::Float(f), Value::Int(n)) => Ok(cmp_f64(*f, *n as f64)),
1661            (Value::String(s1), Value::String(s2)) => Ok(s1.cmp(s2)),
1662            _ => Err(anyhow!(
1663                "Cannot compare incompatible types: {:?} vs {:?}",
1664                a,
1665                b
1666            )),
1667        }
1668    }
1669
1670    async fn check_unique_constraint_multi(
1671        &self,
1672        label: &str,
1673        key_values: &[(String, Value)],
1674        current_vid: Vid,
1675        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1676    ) -> Result<()> {
1677        // Serialize constraint key once for O(1) lookups
1678        let key = serialize_constraint_key(label, key_values);
1679
1680        // 1. Check L0 (in-memory) using O(1) constraint index
1681        {
1682            let l0 = self.l0_manager.get_current();
1683            let l0_guard = l0.read();
1684            if l0_guard.has_constraint_key(&key, current_vid) {
1685                return Err(anyhow!(
1686                    "Constraint violation: Duplicate composite key for label '{}'",
1687                    label
1688                ));
1689            }
1690        }
1691
1692        // Check Transaction L0
1693        if let Some(tx_l0) = tx_l0 {
1694            let tx_l0_guard = tx_l0.read();
1695            if tx_l0_guard.has_constraint_key(&key, current_vid) {
1696                return Err(anyhow!(
1697                    "Constraint violation: Duplicate composite key for label '{}' (in tx)",
1698                    label
1699                ));
1700            }
1701        }
1702
1703        // 2. Check Storage (L1/L2)
1704        let filters: Vec<String> = key_values
1705            .iter()
1706            .map(|(prop, val)| {
1707                let val_str = match val {
1708                    Value::String(s) => format!("'{}'", s.replace('\'', "''")),
1709                    Value::Int(n) => n.to_string(),
1710                    Value::Float(f) => f.to_string(),
1711                    Value::Bool(b) => b.to_string(),
1712                    _ => "NULL".to_string(),
1713                };
1714                format!("{} = {}", prop, val_str)
1715            })
1716            .collect();
1717
1718        let mut filter = filters.join(" AND ");
1719        filter.push_str(&format!(
1720            " AND _deleted = false AND _vid != {}",
1721            current_vid.as_u64()
1722        ));
1723
1724        #[cfg(feature = "lance-backend")]
1725        if let Ok(ds) = self.storage.vertex_dataset(label)
1726            && let Ok(lance_ds) = ds.open_raw().await
1727        {
1728            let count = lance_ds.count_rows(Some(filter.clone())).await?;
1729            if count > 0 {
1730                return Err(anyhow!(
1731                    "Constraint violation: Duplicate composite key for label '{}' (in storage). Filter: {}",
1732                    label,
1733                    filter
1734                ));
1735            }
1736        }
1737
1738        Ok(())
1739    }
1740
1741    async fn check_write_pressure(&self) -> Result<()> {
1742        let status = self
1743            .storage
1744            .compaction_status()
1745            .map_err(|e| anyhow::anyhow!("Failed to get compaction status: {}", e))?;
1746        let l1_runs = status.l1_runs;
1747        let throttle = &self.config.throttle;
1748
1749        if l1_runs >= throttle.hard_limit {
1750            log::warn!("Write stalled: L1 runs ({}) at hard limit", l1_runs);
1751            // Simple polling for now
1752            while self
1753                .storage
1754                .compaction_status()
1755                .map_err(|e| anyhow::anyhow!("Failed to get compaction status: {}", e))?
1756                .l1_runs
1757                >= throttle.hard_limit
1758            {
1759                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1760            }
1761        } else if l1_runs >= throttle.soft_limit {
1762            let excess = l1_runs - throttle.soft_limit;
1763            // Cap multiplier to avoid overflow
1764            let excess = std::cmp::min(excess, 31);
1765            let multiplier = 2_u32.pow(excess as u32);
1766            let delay = throttle.base_delay * multiplier;
1767            tokio::time::sleep(delay).await;
1768        }
1769        Ok(())
1770    }
1771
1772    /// Check transaction memory limit to prevent OOM.
1773    /// No-op when no transaction is active.
1774    fn check_transaction_memory(&self, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) -> Result<()> {
1775        if let Some(tx_l0) = tx_l0 {
1776            let size = tx_l0.read().estimated_size;
1777            if size > self.config.max_transaction_memory {
1778                return Err(anyhow!(
1779                    "Transaction memory limit exceeded: {} bytes used, limit is {} bytes. \
1780                     Roll back or commit the current transaction.",
1781                    size,
1782                    self.config.max_transaction_memory
1783                ));
1784            }
1785        }
1786        Ok(())
1787    }
1788
1789    async fn get_query_context(
1790        &self,
1791        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1792    ) -> Option<QueryContext> {
1793        Some(QueryContext::new_with_pending(
1794            self.l0_manager.get_current(),
1795            tx_l0.cloned(),
1796            self.l0_manager.get_pending_flush(),
1797        ))
1798    }
1799
1800    /// Layer-1 CRDT variant enforcement, shared by the single-vertex and batch
1801    /// write paths.
1802    ///
1803    /// Rejects a declared CRDT property written as a parsed CRDT value
1804    /// (`Value::Map`) whose variant differs from the schema's declared variant.
1805    /// A mismatch would make the commit-time merge silently overwrite instead of
1806    /// merge, and the OCC CRDT carve-out (`occ::crdt_carveout_overwrite` /
1807    /// `WriteSet::from_l0`) would hide it as a lost update — so it must be caught
1808    /// at write time, on *every* write path. `try_as_crdt` is `Map`-gated, so the
1809    /// JSON-string (Cypher) form and non-CRDT values pass through untouched: they
1810    /// are never carved out and stay conflictable.
1811    fn enforce_crdt_variants(
1812        props_meta: &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
1813        properties: &Properties,
1814    ) -> Result<()> {
1815        for (key, value) in properties {
1816            let Some(meta) = props_meta.get(key) else {
1817                continue;
1818            };
1819            let uni_common::core::schema::DataType::Crdt(expected) = &meta.r#type else {
1820                continue;
1821            };
1822            if let Some(crdt) = crate::runtime::l0::try_as_crdt(value)
1823                && crdt.type_name() != expected.type_name()
1824            {
1825                return Err(anyhow::Error::new(uni_common::UniError::Constraint {
1826                    message: format!(
1827                        "CRDT property '{key}' must be written as a {} value",
1828                        expected.type_name()
1829                    ),
1830                }));
1831            }
1832        }
1833        Ok(())
1834    }
1835
1836    /// Prepare a vertex for upsert by merging CRDT properties with existing values.
1837    ///
1838    /// When `label` is provided, uses it directly to look up property metadata.
1839    /// Otherwise falls back to discovering the label from L0 buffers and storage.
1840    ///
1841    /// # Errors
1842    ///
1843    /// Returns an error if CRDT property merging fails.
1844    async fn prepare_vertex_upsert(
1845        &self,
1846        vid: Vid,
1847        properties: &mut Properties,
1848        label: Option<&str>,
1849        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1850    ) -> Result<()> {
1851        let Some(pm) = &self.property_manager else {
1852            return Ok(());
1853        };
1854
1855        let schema = self.schema_manager.schema();
1856
1857        // Resolve label: use provided label or discover from L0/storage
1858        let discovered_labels;
1859        let label_name = if let Some(l) = label {
1860            Some(l)
1861        } else {
1862            discovered_labels = self.get_vertex_labels(vid, tx_l0).await;
1863            discovered_labels
1864                .as_ref()
1865                .and_then(|l| l.first().map(|s| s.as_str()))
1866        };
1867
1868        let Some(label_str) = label_name else {
1869            return Ok(());
1870        };
1871        let Some(props_meta) = schema.properties.get(label_str) else {
1872            return Ok(());
1873        };
1874
1875        // Identify CRDT properties in the insert data
1876        let crdt_keys: Vec<String> = properties
1877            .keys()
1878            .filter(|key| {
1879                props_meta.get(*key).is_some_and(|meta| {
1880                    matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
1881                })
1882            })
1883            .cloned()
1884            .collect();
1885
1886        if crdt_keys.is_empty() {
1887            return Ok(());
1888        }
1889
1890        // Enforce that each declared CRDT property written as a parsed CRDT value
1891        // (`Value::Map`) carries its declared variant. A mismatched variant makes
1892        // `merge_crdt_properties` overwrite rather than merge at commit, and the
1893        // OCC carve-out (`occ::crdt_carveout_overwrite` / `WriteSet::from_l0`)
1894        // would hide that as a silent lost update — reject it at the source.
1895        //
1896        // Only the `Map` form is checked: it is exactly the form the carve-out
1897        // applies to (`try_as_crdt` is `Map`-gated). A CRDT written as a JSON
1898        // string (the Cypher form) or a non-CRDT value is never carved out — it
1899        // stays conflictable — so it poses no carve-out soundness risk and is left
1900        // to the existing merge/parse path. This is the declared-property half of
1901        // the layered fix; the commit-time check covers undeclared CRDT-shaped values.
1902        Self::enforce_crdt_variants(props_meta, properties)?;
1903
1904        let ctx = self.get_query_context(tx_l0).await;
1905        for key in crdt_keys {
1906            let existing = pm.get_vertex_prop_with_ctx(vid, &key, ctx.as_ref()).await?;
1907            if !existing.is_null()
1908                && let Some(val) = properties.get_mut(&key)
1909            {
1910                *val = pm.merge_crdt_values(&existing, val)?;
1911            }
1912        }
1913
1914        Ok(())
1915    }
1916
1917    async fn prepare_edge_upsert(
1918        &self,
1919        eid: Eid,
1920        properties: &mut Properties,
1921        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1922    ) -> Result<()> {
1923        if let Some(pm) = &self.property_manager {
1924            let schema = self.schema_manager.schema();
1925            // Get edge type from L0 buffer instead of from EID
1926            let type_name = self.get_edge_type_from_l0(eid);
1927
1928            if let Some(ref t_name) = type_name
1929                && let Some(props_meta) = schema.properties.get(t_name)
1930            {
1931                let mut crdt_keys = Vec::new();
1932                for (key, _) in properties.iter() {
1933                    if let Some(meta) = props_meta.get(key)
1934                        && matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
1935                    {
1936                        crdt_keys.push(key.clone());
1937                    }
1938                }
1939
1940                if !crdt_keys.is_empty() {
1941                    let ctx = self.get_query_context(tx_l0).await;
1942                    for key in crdt_keys {
1943                        let existing = pm.get_edge_prop(eid, &key, ctx.as_ref()).await?;
1944
1945                        if !existing.is_null()
1946                            && let Some(val) = properties.get_mut(&key)
1947                        {
1948                            *val = pm.merge_crdt_values(&existing, val)?;
1949                        }
1950                    }
1951                }
1952            }
1953        }
1954        Ok(())
1955    }
1956
1957    #[instrument(skip(self, properties), level = "trace")]
1958    pub async fn insert_vertex(
1959        &self,
1960        vid: Vid,
1961        properties: Properties,
1962        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1963    ) -> Result<()> {
1964        self.insert_vertex_with_labels(vid, properties, &[], tx_l0)
1965            .await?;
1966        Ok(())
1967    }
1968
1969    /// Component C1 (G4): before a non-transactional mutation merges into main
1970    /// L0, if an outstanding snapshot pins the current generation, freeze it
1971    /// aside so snapshots taken *before* this write stay isolated from it.
1972    ///
1973    /// `flush_lock` (acquired and released here) serializes the freeze against
1974    /// concurrent commit-time freezes/merges, matching the atomicity the tx
1975    /// commit path gets. No-op for transactional writes (their freeze happens at
1976    /// commit) and — the common case — when nothing is pinned, where it costs one
1977    /// atomic load. Freezes at most once per pinned generation: the freeze
1978    /// installs a fresh unpinned `current`, so later writes in the same bulk
1979    /// import see no pin and merge in place, and the snapshot keeps reading the
1980    /// frozen pre-import buffer.
1981    async fn freeze_for_non_tx_write_if_pinned(&self, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) {
1982        // Self-gates on the runtime SSI toggle: nothing pins a snapshot unless a
1983        // transaction began under `ssi_enabled`, so `is_current_pinned()` is
1984        // always false (one atomic load) when SSI is off.
1985        if tx_l0.is_none() && self.l0_manager.is_current_pinned() {
1986            let _flush_lock_guard = self.flush_lock.lock().await;
1987            // Re-check under the lock: a concurrent commit may have frozen first.
1988            if self.l0_manager.is_current_pinned() {
1989                self.l0_manager.freeze_current_for_snapshot();
1990                metrics::counter!("uni_l0_snapshot_freezes_total").increment(1);
1991            }
1992        }
1993    }
1994
1995    #[instrument(skip(self, properties, labels), level = "trace")]
1996    pub async fn insert_vertex_with_labels(
1997        &self,
1998        vid: Vid,
1999        mut properties: Properties,
2000        labels: &[String],
2001        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2002    ) -> Result<Properties> {
2003        let start = std::time::Instant::now();
2004        self.check_write_pressure().await?;
2005        self.check_transaction_memory(tx_l0)?;
2006
2007        // Component C1 (G4): a non-transactional write (`tx_l0 == None`, e.g. bulk
2008        // import / LOAD CSV) mutates main L0 directly, outside the commit-time
2009        // snapshot freeze. Freeze the pinned generation aside first so snapshots
2010        // taken before this write stay isolated from it.
2011        self.freeze_for_non_tx_write_if_pinned(tx_l0).await;
2012
2013        if !self.try_defer_embedding(labels, &properties, vid, tx_l0) {
2014            self.process_embeddings_for_labels(labels, &mut properties)
2015                .await?;
2016        }
2017        self.validate_vertex_constraints(vid, &properties, labels, tx_l0)
2018            .await?;
2019        self.prepare_vertex_upsert(
2020            vid,
2021            &mut properties,
2022            labels.first().map(|s| s.as_str()),
2023            tx_l0,
2024        )
2025        .await?;
2026
2027        // Clone properties and labels before moving into L0 to return them and populate constraint index
2028        let properties_copy = properties.clone();
2029        let labels_copy = labels.to_vec();
2030
2031        {
2032            let l0 = self.resolve_l0(tx_l0);
2033            let mut l0_guard = l0.write();
2034            l0_guard.insert_vertex_with_labels(vid, properties, labels);
2035
2036            // Populate constraint index for O(1) duplicate detection
2037            let schema = self.schema_manager.schema();
2038            for label in &labels_copy {
2039                if schema.get_label_case_insensitive(label).is_none() {
2040                    if self.config.strict_schema {
2041                        return Err(anyhow::anyhow!(
2042                            "Label '{}' is not defined in the schema \
2043                             (strict_schema is enabled).",
2044                            label
2045                        ));
2046                    }
2047                    continue; // Schemaless: skip unknown labels.
2048                }
2049
2050                // For each unique constraint on this label, insert into constraint index
2051                for constraint in &schema.constraints {
2052                    if !constraint.enabled {
2053                        continue;
2054                    }
2055                    if let ConstraintTarget::Label(l) = &constraint.target {
2056                        if l != label {
2057                            continue;
2058                        }
2059                    } else {
2060                        continue;
2061                    }
2062
2063                    if let ConstraintType::Unique {
2064                        properties: unique_props,
2065                    } = &constraint.constraint_type
2066                    {
2067                        let mut key_values = Vec::new();
2068                        let mut all_present = true;
2069                        for prop in unique_props {
2070                            if let Some(val) = properties_copy.get(prop) {
2071                                key_values.push((prop.clone(), val.clone()));
2072                            } else {
2073                                all_present = false;
2074                                break;
2075                            }
2076                        }
2077
2078                        if all_present {
2079                            let key = serialize_constraint_key(label, &key_values);
2080                            l0_guard.insert_constraint_key(key, vid);
2081                        }
2082                    }
2083                }
2084            }
2085        }
2086
2087        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2088        self.update_metrics();
2089
2090        if tx_l0.is_none() {
2091            self.check_flush().await?;
2092        }
2093        if start.elapsed().as_millis() > 100 {
2094            log::warn!("Slow insert_vertex: {}ms", start.elapsed().as_millis());
2095        }
2096        Ok(properties_copy)
2097    }
2098
2099    /// True iff routing this partial write through MergeInsert would
2100    /// miss a constraint check. Specifically: a multi-key UNIQUE
2101    /// constraint where the touched-set doesn't cover all member keys
2102    /// requires the unchanged keys from the existing row to compute
2103    /// the composite. Conservative: also returns true if any touched
2104    /// key is `ext_id` (uniqueness checked globally — handled in the
2105    /// full-row path).
2106    fn touched_needs_full_read(&self, touched: &Properties, labels: &[String]) -> bool {
2107        if touched.contains_key("ext_id") {
2108            return true;
2109        }
2110        let schema = self.schema_manager.schema();
2111        for label in labels {
2112            if schema.get_label_case_insensitive(label).is_none() {
2113                continue;
2114            }
2115            for constraint in &schema.constraints {
2116                if !constraint.enabled {
2117                    continue;
2118                }
2119                if let ConstraintTarget::Label(l) = &constraint.target {
2120                    if !l.eq_ignore_ascii_case(label) {
2121                        continue;
2122                    }
2123                } else {
2124                    continue;
2125                }
2126                if let ConstraintType::Unique {
2127                    properties: unique_props,
2128                } = &constraint.constraint_type
2129                {
2130                    if unique_props.len() < 2 {
2131                        continue; // single-key UNIQUE — partial path sees the key
2132                    }
2133                    if unique_props.iter().any(|p| touched.contains_key(p)) {
2134                        return true;
2135                    }
2136                }
2137            }
2138        }
2139        false
2140    }
2141
2142    /// Insert a vertex's FULL property row plus a touched-keys hint so
2143    /// the flush emits ONLY those columns via Lance MergeInsert.
2144    ///
2145    /// Caller must have read the full row (via PropertyManager) and
2146    /// applied SET-touched values on top before calling — same input
2147    /// shape as `insert_vertex_with_labels`. The new arg `touched_keys`
2148    /// is the set of property keys this SET statement actually
2149    /// assigned; L0 records it in `vertex_partial_keys[vid]` and the
2150    /// flush filters the MergeInsert source schema down to those keys.
2151    /// When `UniConfig::partial_lance_writes == false`, falls through
2152    /// to `insert_vertex_with_labels` (Append) — preserving bit-for-bit
2153    /// equivalence with prior releases.
2154    #[instrument(skip(self, props, touched_keys, labels), level = "trace")]
2155    pub async fn insert_vertex_partial_full(
2156        &self,
2157        vid: Vid,
2158        mut props: Properties,
2159        touched_keys: HashSet<String>,
2160        labels: &[String],
2161        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2162    ) -> Result<()> {
2163        if !self.config.partial_lance_writes
2164            || self.touched_needs_full_read(&props_subset(&props, &touched_keys), labels)
2165        {
2166            self.insert_vertex_with_labels(vid, props, labels, tx_l0)
2167                .await?;
2168            return Ok(());
2169        }
2170
2171        self.check_write_pressure().await?;
2172        self.check_transaction_memory(tx_l0)?;
2173        if !self.try_defer_embedding(labels, &props, vid, tx_l0) {
2174            self.process_embeddings_for_labels(labels, &mut props)
2175                .await?;
2176        }
2177        // Full-row validation runs because we have the complete map;
2178        // no need for the partial-only validator.
2179        self.validate_vertex_constraints(vid, &props, labels, tx_l0)
2180            .await?;
2181        {
2182            let l0 = self.resolve_l0(tx_l0);
2183            let mut l0_guard = l0.write();
2184            l0_guard.insert_vertex_partial_full(vid, props, touched_keys, labels);
2185        }
2186        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2187        metrics::counter!("uni_partial_writes_total").increment(1);
2188        self.update_metrics();
2189        if tx_l0.is_none() {
2190            self.check_flush().await?;
2191        }
2192        Ok(())
2193    }
2194
2195    /// Insert a vertex's *partial* property set without first reading the
2196    /// full row.
2197    ///
2198    /// When `WriterConfig::partial_lance_writes` is `true`, the touched
2199    /// keys flow into `L0Buffer::vertex_partial_keys` so the next flush
2200    /// emits them via Lance `MergeInsertBuilder` against a subset-of-
2201    /// schema source — preserving untouched columns (e.g., embeddings)
2202    /// byte-equal in Lance with no read at the caller and no write of
2203    /// those columns.
2204    ///
2205    /// When the flag is `false`, this falls back to the existing
2206    /// `insert_vertex_with_labels` path after merging `touched` with
2207    /// the current properties from L0/storage. The caller can therefore
2208    /// use this entry point unconditionally; the optimization activates
2209    /// only when the flag is on.
2210    #[instrument(skip(self, touched, labels), level = "trace")]
2211    pub async fn insert_vertex_partial(
2212        &self,
2213        vid: Vid,
2214        touched: Properties,
2215        labels: &[String],
2216        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2217    ) -> Result<()> {
2218        let needs_full_read =
2219            !self.config.partial_lance_writes || self.touched_needs_full_read(&touched, labels);
2220        if needs_full_read {
2221            // Flag-off fallback (or constraint-driven fallback): merge
2222            // `touched` with the current full property snapshot from
2223            // L0/storage and route through the existing path. Preserves
2224            // bit-for-bit equivalence with the pre-Round-11 release.
2225            let existing = if let Some(pm) = &self.property_manager {
2226                pm.get_all_vertex_props_with_ctx(vid, None)
2227                    .await
2228                    .unwrap_or_default()
2229                    .unwrap_or_default()
2230            } else {
2231                Properties::new()
2232            };
2233            let mut merged = existing;
2234            for (k, v) in touched {
2235                merged.insert(k, v);
2236            }
2237            self.insert_vertex_with_labels(vid, merged, labels, tx_l0)
2238                .await?;
2239            return Ok(());
2240        }
2241
2242        // Flag-on fast path: stage the partial update directly. Pressure
2243        // checks, embedding generation, constraint validation all still
2244        // run — but the validator is the partial-aware variant that
2245        // skips NOT NULL / multi-key UNIQUE / CHECK / EXISTS for
2246        // properties not present in `touched`. Multi-key UNIQUE that
2247        // overlaps the touched set forces a fallback above via
2248        // `touched_needs_full_read`.
2249        let mut touched = touched;
2250        self.check_write_pressure().await?;
2251        self.check_transaction_memory(tx_l0)?;
2252        if !self.try_defer_embedding(labels, &touched, vid, tx_l0) {
2253            self.process_embeddings_for_labels(labels, &mut touched)
2254                .await?;
2255        }
2256        self.validate_vertex_constraints_partial(vid, &touched, labels, tx_l0)
2257            .await?;
2258
2259        {
2260            let l0 = self.resolve_l0(tx_l0);
2261            let mut l0_guard = l0.write();
2262            l0_guard.insert_vertex_partial(vid, touched, labels);
2263        }
2264
2265        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2266        metrics::counter!("uni_partial_writes_total").increment(1);
2267        self.update_metrics();
2268        if tx_l0.is_none() {
2269            self.check_flush().await?;
2270        }
2271        Ok(())
2272    }
2273
2274    /// Insert multiple vertices with batched operations.
2275    ///
2276    /// This method uses batched operations to achieve O(N) complexity instead of O(N²)
2277    /// for bulk inserts with unique constraints.
2278    ///
2279    /// # Performance Improvements
2280    /// - Batch VID allocation: 1 call instead of N calls
2281    /// - Batch constraint validation: O(N) instead of O(N²)
2282    /// - Batch embedding generation: 1 API call per config instead of N calls
2283    /// - Transaction wrapping: Automatic flush deferral, atomicity
2284    ///
2285    /// # Arguments
2286    /// * `vids` - Pre-allocated VIDs for the vertices
2287    /// * `properties_batch` - Properties for each vertex
2288    /// * `labels` - Labels for all vertices (assumes single label for simplicity)
2289    ///
2290    /// # Errors
2291    /// Returns error if:
2292    /// - VID/properties length mismatch
2293    /// - Constraint violation detected
2294    /// - Embedding generation fails
2295    /// - Transaction commit fails
2296    ///
2297    /// # Atomicity
2298    /// If this method fails, all changes are rolled back (if transaction was started here).
2299    pub async fn insert_vertices_batch(
2300        &self,
2301        vids: Vec<Vid>,
2302        mut properties_batch: Vec<Properties>,
2303        labels: Vec<String>,
2304        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2305    ) -> Result<Vec<Properties>> {
2306        let start = std::time::Instant::now();
2307
2308        // Validate inputs
2309        if vids.len() != properties_batch.len() {
2310            return Err(anyhow!(
2311                "VID/properties size mismatch: {} vids, {} properties",
2312                vids.len(),
2313                properties_batch.len()
2314            ));
2315        }
2316
2317        if vids.is_empty() {
2318            return Ok(Vec::new());
2319        }
2320
2321        // Batch operations — writes go directly to the resolved L0.
2322        // Atomicity is guaranteed by the caller holding the writer lock.
2323        let result = async {
2324            self.check_write_pressure().await?;
2325            self.check_transaction_memory(tx_l0)?;
2326
2327            // Component C1 (G4): batch bulk-import is the canonical non-tx write —
2328            // freeze the pinned generation aside before merging so snapshot
2329            // readers stay isolated. No-op when unpinned or transactional.
2330            self.freeze_for_non_tx_write_if_pinned(tx_l0).await;
2331
2332            // Batch embedding generation (1 API call per config)
2333            self.process_embeddings_for_batch(&labels, &mut properties_batch)
2334                .await?;
2335
2336            // Batch constraint validation (O(N) instead of O(N²))
2337            let label = labels
2338                .first()
2339                .ok_or_else(|| anyhow!("No labels provided"))?;
2340            self.validate_vertex_batch_constraints(&vids, &properties_batch, label, tx_l0)
2341                .await?;
2342
2343            // Batch prepare (CRDT merging if needed)
2344            // Check schema once: skip entirely if no CRDT properties for this label.
2345            // For new vertices (freshly allocated VIDs), there are no existing CRDT
2346            // values to merge, so the per-vertex lookup is unnecessary in that case.
2347            let has_crdt_fields = {
2348                let schema = self.schema_manager.schema();
2349                schema
2350                    .properties
2351                    .get(label.as_str())
2352                    .is_some_and(|props_meta| {
2353                        props_meta.values().any(|meta| {
2354                            matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2355                        })
2356                    })
2357            };
2358
2359            if has_crdt_fields {
2360                // Layer-1 variant enforcement (G3): the batch path must reject a
2361                // declared-CRDT variant mismatch exactly as the single-vertex
2362                // `prepare_vertex_upsert` does. Without this, a wrong-variant CRDT
2363                // written via batch import slips past write-time validation and
2364                // the OCC carve-out then masks the overwrite as a lost update.
2365                {
2366                    let schema = self.schema_manager.schema();
2367                    if let Some(props_meta) = schema.properties.get(label.as_str()) {
2368                        for props in &properties_batch {
2369                            Self::enforce_crdt_variants(props_meta, props)?;
2370                        }
2371                    }
2372                }
2373
2374                // Batch fetch existing CRDT values: collect VIDs that need merging,
2375                // then query once via PropertyManager instead of per-vertex lookups.
2376                let schema = self.schema_manager.schema();
2377                let crdt_keys: Vec<String> = schema
2378                    .properties
2379                    .get(label.as_str())
2380                    .map(|props_meta| {
2381                        props_meta
2382                            .iter()
2383                            .filter(|(_, meta)| {
2384                                matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2385                            })
2386                            .map(|(key, _)| key.clone())
2387                            .collect()
2388                    })
2389                    .unwrap_or_default();
2390
2391                if let Some(pm) = &self.property_manager {
2392                    let ctx = self.get_query_context(tx_l0).await;
2393                    for (vid, props) in vids.iter().zip(&mut properties_batch) {
2394                        for key in &crdt_keys {
2395                            if props.contains_key(key) {
2396                                let existing =
2397                                    pm.get_vertex_prop_with_ctx(*vid, key, ctx.as_ref()).await?;
2398                                if !existing.is_null()
2399                                    && let Some(val) = props.get_mut(key)
2400                                {
2401                                    *val = pm.merge_crdt_values(&existing, val)?;
2402                                }
2403                            }
2404                        }
2405                    }
2406                }
2407            }
2408
2409            // Batch L0 writes — route to active L0 (transaction L0 if active, else current).
2410            let target_l0 = self.resolve_l0(tx_l0);
2411
2412            let properties_result = properties_batch.clone();
2413            {
2414                let mut l0_guard = target_l0.write();
2415                for (vid, props) in vids.iter().zip(properties_batch.iter()) {
2416                    l0_guard.insert_vertex_with_labels(*vid, props.clone(), &labels);
2417                }
2418            }
2419
2420            // Update metrics (batch increment)
2421            metrics::counter!("uni_l0_buffer_mutations_total").increment(vids.len() as u64);
2422            self.update_metrics();
2423
2424            Ok::<Vec<Properties>, anyhow::Error>(properties_result)
2425        }
2426        .await;
2427
2428        let props = result?;
2429
2430        if start.elapsed().as_millis() > 100 {
2431            log::warn!(
2432                "Slow insert_vertices_batch ({} vertices): {}ms",
2433                vids.len(),
2434                start.elapsed().as_millis()
2435            );
2436        }
2437
2438        Ok(props)
2439    }
2440
2441    /// Delete a vertex by VID.
2442    ///
2443    /// When `labels` is provided, uses them directly to populate L0 for
2444    /// correct tombstone flushing. Otherwise discovers labels from L0
2445    /// buffers and storage (which can be slow for many vertices).
2446    ///
2447    /// # Errors
2448    ///
2449    /// Returns an error if write pressure stalls, label lookup fails, or
2450    /// the L0 delete operation fails.
2451    #[instrument(skip(self, labels), level = "trace")]
2452    pub async fn delete_vertex(
2453        &self,
2454        vid: Vid,
2455        labels: Option<Vec<String>>,
2456        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2457    ) -> Result<()> {
2458        let start = std::time::Instant::now();
2459        self.check_write_pressure().await?;
2460        self.check_transaction_memory(tx_l0)?;
2461        self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
2462        let l0 = self.resolve_l0(tx_l0);
2463
2464        // Before deleting, ensure we have the vertex's labels stored in L0
2465        // so the tombstone can be properly flushed to the correct label datasets.
2466        let has_labels = {
2467            let l0_guard = l0.read();
2468            l0_guard.vertex_labels.contains_key(&vid)
2469        };
2470
2471        if !has_labels {
2472            let resolved_labels = if let Some(provided) = labels {
2473                // Caller provided labels — skip the lookup entirely
2474                Some(provided)
2475            } else {
2476                // Discover labels from pending flush L0s, then storage
2477                let mut found = None;
2478                for pending_l0 in self.l0_manager.get_pending_flush() {
2479                    let pending_guard = pending_l0.read();
2480                    if let Some(l) = pending_guard.get_vertex_labels(vid) {
2481                        found = Some(l.to_vec());
2482                        break;
2483                    }
2484                }
2485                if found.is_none() {
2486                    found = self.find_vertex_labels_in_storage(vid).await?;
2487                }
2488                found
2489            };
2490
2491            if let Some(found_labels) = resolved_labels {
2492                let mut l0_guard = l0.write();
2493                l0_guard.vertex_labels.insert(vid, found_labels);
2494            }
2495        }
2496
2497        l0.write().delete_vertex(vid)?;
2498        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2499        self.update_metrics();
2500
2501        if tx_l0.is_none() {
2502            self.check_flush().await?;
2503        }
2504        if start.elapsed().as_millis() > 100 {
2505            log::warn!("Slow delete_vertex: {}ms", start.elapsed().as_millis());
2506        }
2507        Ok(())
2508    }
2509
2510    /// Find vertex labels from storage by querying the main vertices table.
2511    /// Returns the labels from the latest non-deleted version of the vertex.
2512    async fn find_vertex_labels_in_storage(&self, vid: Vid) -> Result<Option<Vec<String>>> {
2513        use crate::backend::types::ScanRequest;
2514        use arrow_array::Array;
2515        use arrow_array::cast::AsArray;
2516
2517        let backend = self.storage.backend();
2518        let table_name = MainVertexDataset::table_name();
2519
2520        // Check if table exists first; if not, vertex hasn't been flushed to storage yet
2521        if !backend.table_exists(table_name).await? {
2522            return Ok(None);
2523        }
2524
2525        // Query for this specific vid (don't filter by _deleted yet - we need to find the latest version first)
2526        let filter = format!("_vid = {}", vid.as_u64());
2527        let batches = backend
2528            .scan(
2529                ScanRequest::all(table_name)
2530                    .with_filter(filter)
2531                    .with_columns(vec![
2532                        "_vid".to_string(),
2533                        "labels".to_string(),
2534                        "_version".to_string(),
2535                        "_deleted".to_string(),
2536                    ]),
2537            )
2538            .await
2539            .unwrap_or_default();
2540
2541        // Find the row with the highest version number
2542        let mut max_version: Option<u64> = None;
2543        let mut labels: Option<Vec<String>> = None;
2544        let mut is_deleted = false;
2545
2546        for batch in batches {
2547            if batch.num_rows() == 0 {
2548                continue;
2549            }
2550
2551            let version_array = batch
2552                .column_by_name("_version")
2553                .unwrap()
2554                .as_primitive::<arrow_array::types::UInt64Type>();
2555
2556            let deleted_array = batch.column_by_name("_deleted").unwrap().as_boolean();
2557
2558            let labels_array = batch.column_by_name("labels").unwrap().as_list::<i32>();
2559
2560            for row_idx in 0..batch.num_rows() {
2561                let version = version_array.value(row_idx);
2562
2563                if max_version.is_none_or(|mv| version > mv) {
2564                    is_deleted = deleted_array.value(row_idx);
2565
2566                    let labels_list = labels_array.value(row_idx);
2567                    let string_array = labels_list.as_string::<i32>();
2568                    let vertex_labels: Vec<String> = (0..string_array.len())
2569                        .filter(|&i| !string_array.is_null(i))
2570                        .map(|i| string_array.value(i).to_string())
2571                        .collect();
2572
2573                    max_version = Some(version);
2574                    labels = Some(vertex_labels);
2575                }
2576            }
2577        }
2578
2579        // If the latest version is deleted, return None
2580        if is_deleted { Ok(None) } else { Ok(labels) }
2581    }
2582
2583    #[expect(clippy::too_many_arguments)]
2584    #[instrument(skip(self, props, touched_keys), level = "trace")]
2585    pub async fn insert_edge_partial_full(
2586        &self,
2587        src_vid: Vid,
2588        dst_vid: Vid,
2589        edge_type: u32,
2590        eid: Eid,
2591        props: Properties,
2592        edge_type_name: Option<String>,
2593        touched_keys: HashSet<String>,
2594        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2595    ) -> Result<()> {
2596        self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
2597        if !self.config.partial_lance_writes {
2598            return self
2599                .insert_edge(
2600                    src_vid,
2601                    dst_vid,
2602                    edge_type,
2603                    eid,
2604                    props,
2605                    edge_type_name,
2606                    tx_l0,
2607                )
2608                .await;
2609        }
2610
2611        let start = std::time::Instant::now();
2612        self.check_write_pressure().await?;
2613        self.check_transaction_memory(tx_l0)?;
2614        let mut props = props;
2615        self.prepare_edge_upsert(eid, &mut props, tx_l0).await?;
2616
2617        let l0 = self.resolve_l0(tx_l0);
2618        l0.write().insert_edge_partial_full(
2619            src_vid,
2620            dst_vid,
2621            edge_type,
2622            eid,
2623            props,
2624            edge_type_name,
2625            touched_keys,
2626        )?;
2627
2628        if tx_l0.is_none() {
2629            let version = l0.read().current_version;
2630            self.adjacency_manager
2631                .insert_edge(src_vid, dst_vid, eid, edge_type, version);
2632        }
2633
2634        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2635        metrics::counter!("uni_partial_writes_total").increment(1);
2636        self.update_metrics();
2637        if tx_l0.is_none() {
2638            self.check_flush().await?;
2639        }
2640        if start.elapsed().as_millis() > 100 {
2641            log::warn!(
2642                "Slow insert_edge_partial_full: {}ms",
2643                start.elapsed().as_millis()
2644            );
2645        }
2646        Ok(())
2647    }
2648
2649    #[expect(clippy::too_many_arguments)]
2650    pub async fn insert_edge(
2651        &self,
2652        src_vid: Vid,
2653        dst_vid: Vid,
2654        edge_type: u32,
2655        eid: Eid,
2656        mut properties: Properties,
2657        edge_type_name: Option<String>,
2658        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2659    ) -> Result<()> {
2660        let start = std::time::Instant::now();
2661        self.check_write_pressure().await?;
2662        self.check_transaction_memory(tx_l0)?;
2663        self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
2664        self.prepare_edge_upsert(eid, &mut properties, tx_l0)
2665            .await?;
2666
2667        let l0 = self.resolve_l0(tx_l0);
2668        l0.write()
2669            .insert_edge(src_vid, dst_vid, edge_type, eid, properties, edge_type_name)?;
2670
2671        // Dual-write to AdjacencyManager overlay (survives flush).
2672        // Skip for transaction-local L0 -- transaction edges are overlaid separately.
2673        if tx_l0.is_none() {
2674            let version = l0.read().current_version;
2675            self.adjacency_manager
2676                .insert_edge(src_vid, dst_vid, eid, edge_type, version);
2677        }
2678
2679        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2680        self.update_metrics();
2681
2682        if tx_l0.is_none() {
2683            self.check_flush().await?;
2684        }
2685        if start.elapsed().as_millis() > 100 {
2686            log::warn!("Slow insert_edge: {}ms", start.elapsed().as_millis());
2687        }
2688        Ok(())
2689    }
2690
2691    #[instrument(skip(self), level = "trace")]
2692    pub async fn delete_edge(
2693        &self,
2694        eid: Eid,
2695        src_vid: Vid,
2696        dst_vid: Vid,
2697        edge_type: u32,
2698        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2699    ) -> Result<()> {
2700        let start = std::time::Instant::now();
2701        self.check_write_pressure().await?;
2702        self.check_transaction_memory(tx_l0)?;
2703        self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
2704        let l0 = self.resolve_l0(tx_l0);
2705
2706        l0.write().delete_edge(eid, src_vid, dst_vid, edge_type)?;
2707
2708        // Dual-write tombstone to AdjacencyManager overlay.
2709        if tx_l0.is_none() {
2710            let version = l0.read().current_version;
2711            self.adjacency_manager
2712                .add_tombstone(eid, src_vid, dst_vid, edge_type, version);
2713        }
2714        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2715        self.update_metrics();
2716
2717        if tx_l0.is_none() {
2718            self.check_flush().await?;
2719        }
2720        if start.elapsed().as_millis() > 100 {
2721            log::warn!("Slow delete_edge: {}ms", start.elapsed().as_millis());
2722        }
2723        Ok(())
2724    }
2725
2726    /// Decide whether a flush should be triggered based on mutation count
2727    /// or elapsed time since the last flush.
2728    ///
2729    /// Extracted from [`Writer::check_flush`] so `commit_transaction_l0` can
2730    /// reuse the decision while bypassing the lock-acquiring entry point
2731    /// (it already holds `flush_lock`).
2732    fn should_flush(&self) -> bool {
2733        let count = self.l0_manager.get_current().read().mutation_count;
2734        if count == 0 {
2735            return false;
2736        }
2737        if count >= self.config.auto_flush_threshold {
2738            return true;
2739        }
2740        if let Some(interval) = self.config.auto_flush_interval
2741            && self.last_flush_time.lock().elapsed() >= interval
2742            && count >= self.config.auto_flush_min_mutations
2743        {
2744            return true;
2745        }
2746        false
2747    }
2748
2749    /// Check if flush should be triggered based on mutation count or time elapsed.
2750    /// This method is called after each write operation and can also be called
2751    /// by a background task for time-based flushing.
2752    pub async fn check_flush(&self) -> Result<()> {
2753        if self.should_flush() {
2754            self.flush_to_l1(None).await?;
2755        }
2756        Ok(())
2757    }
2758
2759    /// Process embeddings for a vertex using labels passed directly.
2760    /// Use this when labels haven't been stored to L0 yet.
2761    async fn process_embeddings_for_labels(
2762        &self,
2763        labels: &[String],
2764        properties: &mut Properties,
2765    ) -> Result<()> {
2766        let label_name = labels.first().map(|s| s.as_str());
2767        self.process_embeddings_impl(label_name, properties).await
2768    }
2769
2770    /// Phase B: if `defer_embeddings` is enabled in `UniConfig` and the
2771    /// vertex has an embedding config that hasn't been satisfied by the
2772    /// caller-provided properties, enqueue the VID in
2773    /// `L0Buffer::pending_embeddings` and return `true`. The caller then
2774    /// skips `process_embeddings_for_labels` and the embedding is computed
2775    /// in a single batched call at flush time via
2776    /// `drain_pending_embeddings`.
2777    ///
2778    /// Returns `false` (caller falls back to today's per-row eager embed)
2779    /// if any of:
2780    ///  - the flag is off,
2781    ///  - no label has an embedding config,
2782    ///  - the user already provided the target property (matches the
2783    ///    existing skip-if-present semantics at writer.rs:2727).
2784    ///
2785    /// Trade-off: when deferral is active, in-tx reads of the embedding
2786    /// column return only what was already in storage (or nothing for
2787    /// brand-new vertices). Existing tests that RETURN n.embedding in
2788    /// the same tx as a SET on the source column must run with the flag
2789    /// off; opt in only when no such reads happen between write and
2790    /// commit.
2791    fn try_defer_embedding(
2792        &self,
2793        labels: &[String],
2794        properties: &Properties,
2795        vid: Vid,
2796        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2797    ) -> bool {
2798        if !self.config.defer_embeddings {
2799            return false;
2800        }
2801        let Some(label) = labels.first() else {
2802            return false;
2803        };
2804
2805        let schema = self.schema_manager.schema();
2806        let mut has_unsatisfied_cfg = false;
2807        for idx in &schema.indexes {
2808            if let IndexDefinition::Vector(v_cfg) = idx
2809                && v_cfg.label == *label
2810                && v_cfg.embedding_config.is_some()
2811                && !properties.contains_key(&v_cfg.property)
2812            {
2813                has_unsatisfied_cfg = true;
2814                break;
2815            }
2816        }
2817        if !has_unsatisfied_cfg {
2818            return false;
2819        }
2820
2821        let l0 = self.resolve_l0(tx_l0);
2822        let mut guard = l0.write();
2823        guard.pending_embeddings.insert(vid, label.clone());
2824        true
2825    }
2826
2827    /// Drain `pending_embeddings` from the rotated old-L0 right before
2828    /// `flush_stream_l1` reads it. Groups by label, issues one batched
2829    /// `process_embeddings_for_batch` call per label, and writes the
2830    /// resulting embedding vectors into each VID's `vertex_properties`
2831    /// map. After this returns, the flush proceeds against an L0 that
2832    /// looks no different from one whose embeddings were generated
2833    /// per-row at insert.
2834    ///
2835    /// Idempotent: a VID whose embedding was already materialized
2836    /// (e.g., by on-demand read paths in a future Phase B revision) is
2837    /// detected via `properties.contains_key(target_prop)` inside
2838    /// `process_embeddings_for_batch` (writer.rs:~2650), so re-running
2839    /// the drain is safe.
2840    async fn drain_pending_embeddings(&self, old_l0_arc: &Arc<RwLock<L0Buffer>>) -> Result<()> {
2841        let by_label: HashMap<String, Vec<Vid>> = {
2842            let guard = old_l0_arc.read();
2843            if guard.pending_embeddings.is_empty() {
2844                return Ok(());
2845            }
2846            let mut m: HashMap<String, Vec<Vid>> = HashMap::new();
2847            for (vid, label) in &guard.pending_embeddings {
2848                m.entry(label.clone()).or_default().push(*vid);
2849            }
2850            m
2851        };
2852
2853        for (label, vids) in by_label {
2854            let mut properties_batch: Vec<Properties> = {
2855                let guard = old_l0_arc.read();
2856                vids.iter()
2857                    .map(|vid| {
2858                        guard
2859                            .vertex_properties
2860                            .get(vid)
2861                            .cloned()
2862                            .unwrap_or_default()
2863                    })
2864                    .collect()
2865            };
2866
2867            self.process_embeddings_for_batch(std::slice::from_ref(&label), &mut properties_batch)
2868                .await?;
2869
2870            let mut guard = old_l0_arc.write();
2871            for (vid, props) in vids.iter().zip(properties_batch) {
2872                let target = guard.vertex_properties.entry(*vid).or_default();
2873                for (k, v) in props {
2874                    target.insert(k, v);
2875                }
2876                guard.pending_embeddings.remove(vid);
2877            }
2878        }
2879        Ok(())
2880    }
2881
2882    /// Process embeddings for a batch of vertices efficiently.
2883    ///
2884    /// Groups vertices by embedding config and makes batched API calls to the
2885    /// embedding service instead of calling once per vertex.
2886    ///
2887    /// # Performance
2888    /// For N vertices with embedding config:
2889    /// - Old approach: N API calls to embedding service
2890    /// - New approach: 1 API call per embedding config (usually 1 total)
2891    async fn process_embeddings_for_batch(
2892        &self,
2893        labels: &[String],
2894        properties_batch: &mut [Properties],
2895    ) -> Result<()> {
2896        let label_name = labels.first().map(|s| s.as_str());
2897        let schema = self.schema_manager.schema();
2898
2899        if let Some(label) = label_name {
2900            // Find vector indexes with embedding config for this label
2901            let mut configs = Vec::new();
2902            for idx in &schema.indexes {
2903                if let IndexDefinition::Vector(v_config) = idx
2904                    && v_config.label == label
2905                    && let Some(emb_config) = &v_config.embedding_config
2906                {
2907                    configs.push((v_config.property.clone(), emb_config.clone()));
2908                }
2909            }
2910
2911            if configs.is_empty() {
2912                return Ok(());
2913            }
2914
2915            for (target_prop, emb_config) in configs {
2916                // Collect input texts from all vertices that need embeddings
2917                let mut input_texts: Vec<String> = Vec::new();
2918                let mut needs_embedding: Vec<usize> = Vec::new();
2919
2920                for (idx, properties) in properties_batch.iter().enumerate() {
2921                    // Skip if target property already exists
2922                    if properties.contains_key(&target_prop) {
2923                        continue;
2924                    }
2925
2926                    // Check if source properties exist
2927                    let mut inputs = Vec::new();
2928                    for src_prop in &emb_config.source_properties {
2929                        if let Some(val) = properties.get(src_prop)
2930                            && let Some(s) = val.as_str()
2931                        {
2932                            inputs.push(s.to_string());
2933                        }
2934                    }
2935
2936                    if !inputs.is_empty() {
2937                        let input_text = inputs.join(" ");
2938                        let input_text = match &emb_config.document_prefix {
2939                            Some(prefix) => format!("{prefix}{input_text}"),
2940                            None => input_text,
2941                        };
2942                        input_texts.push(input_text);
2943                        needs_embedding.push(idx);
2944                    }
2945                }
2946
2947                if input_texts.is_empty() {
2948                    continue;
2949                }
2950
2951                let runtime = self.xervo_runtime.get().ok_or_else(|| {
2952                    anyhow!("Uni-Xervo runtime not configured for auto-embedding")
2953                })?;
2954                let embedder = runtime.embedding(&emb_config.alias).await?;
2955
2956                // Batch generate embeddings (single API call)
2957                let input_refs: Vec<&str> = input_texts.iter().map(|s| s.as_str()).collect();
2958                let embeddings = embedder.embed(input_refs).await?;
2959
2960                // Distribute results back to properties
2961                for (embedding_idx, &prop_idx) in needs_embedding.iter().enumerate() {
2962                    if let Some(vec) = embeddings.get(embedding_idx) {
2963                        let vals: Vec<Value> =
2964                            vec.iter().map(|f| Value::Float(*f as f64)).collect();
2965                        properties_batch[prop_idx].insert(target_prop.clone(), Value::List(vals));
2966                    }
2967                }
2968            }
2969        }
2970
2971        Ok(())
2972    }
2973
2974    async fn process_embeddings_impl(
2975        &self,
2976        label_name: Option<&str>,
2977        properties: &mut Properties,
2978    ) -> Result<()> {
2979        let schema = self.schema_manager.schema();
2980
2981        if let Some(label) = label_name {
2982            // Find vector indexes with embedding config for this label
2983            let mut configs = Vec::new();
2984            for idx in &schema.indexes {
2985                if let IndexDefinition::Vector(v_config) = idx
2986                    && v_config.label == label
2987                    && let Some(emb_config) = &v_config.embedding_config
2988                {
2989                    configs.push((v_config.property.clone(), emb_config.clone()));
2990                }
2991            }
2992
2993            if configs.is_empty() {
2994                log::info!("No embedding config found for label {}", label);
2995            }
2996
2997            for (target_prop, emb_config) in configs {
2998                // If target property already exists, skip (assume user provided it)
2999                if properties.contains_key(&target_prop) {
3000                    continue;
3001                }
3002
3003                // Check if source properties exist
3004                let mut inputs = Vec::new();
3005                for src_prop in &emb_config.source_properties {
3006                    if let Some(val) = properties.get(src_prop)
3007                        && let Some(s) = val.as_str()
3008                    {
3009                        inputs.push(s.to_string());
3010                    }
3011                }
3012
3013                if inputs.is_empty() {
3014                    continue;
3015                }
3016
3017                let input_text = inputs.join(" ");
3018                let input_text = match &emb_config.document_prefix {
3019                    Some(prefix) => format!("{prefix}{input_text}"),
3020                    None => input_text,
3021                };
3022
3023                let runtime = self.xervo_runtime.get().ok_or_else(|| {
3024                    anyhow!("Uni-Xervo runtime not configured for auto-embedding")
3025                })?;
3026                let embedder = runtime.embedding(&emb_config.alias).await?;
3027
3028                // Generate
3029                let embeddings = embedder.embed(vec![input_text.as_str()]).await?;
3030                if let Some(vec) = embeddings.first() {
3031                    // Store as array of floats
3032                    let vals: Vec<Value> = vec.iter().map(|f| Value::Float(*f as f64)).collect();
3033                    properties.insert(target_prop.clone(), Value::List(vals));
3034                }
3035            }
3036        }
3037        Ok(())
3038    }
3039
3040    /// Flushes the current in-memory L0 buffer to L1 storage.
3041    ///
3042    /// # Lock Ordering
3043    ///
3044    /// To prevent deadlocks, locks must be acquired in the following order:
3045    /// 1. `Writer` lock (held by caller via outer `Arc<RwLock<Writer>>`; removed in Phase 4)
3046    /// 2. `flush_lock` (acquired by this entry point; held across the whole flush)
3047    /// 3. `L0Manager` lock (via `begin_flush` / `get_current`)
3048    /// 4. `L0Buffer` lock (individual buffer RWLocks)
3049    /// 5. `Index` / `Storage` locks (during actual flush)
3050    ///
3051    /// Callers that already hold `flush_lock` (today only `commit_transaction_l0`)
3052    /// must call `flush_inline_under_lock` (private) directly to avoid a re-entrant
3053    /// `tokio::sync::Mutex` deadlock — see concurrent_writer.md §5.5.
3054    pub async fn flush_to_l1(&self, name: Option<String>) -> Result<String> {
3055        // Drain any in-flight async flushes first. `flush_to_l1` is a
3056        // SYNCHRONIZATION BARRIER — callers (test fixtures, fork
3057        // setup, shutdown paths) rely on it as "all writes are now
3058        // durably in Lance". Without the drain, an async stream from
3059        // a recent commit might still be writing to Lance when
3060        // `flush_to_l1` returns, leaving a window where forks branch
3061        // off pre-write Lance state and lose data.
3062        if let Some(coord) = self.flush_coordinator.as_ref() {
3063            let _ = coord.drain(self.config.drop_fork_drain_timeout).await;
3064        }
3065        let _flush_lock_guard = self.flush_lock.lock().await;
3066        self.flush_inline_under_lock(name).await
3067    }
3068
3069    /// Async-flush entry point: rotate under `flush_lock`, release the
3070    /// lock, then submit the stream phase to the [`FlushCoordinator`].
3071    /// Returns a [`FlushTicket`](crate::runtime::flush_coordinator::FlushTicket)
3072    /// that resolves when finalize completes.
3073    ///
3074    /// Errors if `config.async_flush_enabled = false` (the coordinator
3075    /// is `None` in that case — see `flush_coordinator` field doc).
3076    pub async fn flush_to_l1_async(
3077        self: &Arc<Self>,
3078        name: Option<String>,
3079    ) -> Result<crate::runtime::flush_coordinator::FlushTicket> {
3080        let coord = self
3081            .flush_coordinator
3082            .as_ref()
3083            .ok_or_else(|| anyhow!("async flush not enabled (config.async_flush_enabled=false)"))?
3084            .clone();
3085        // 1. Acquire permit FIRST (outside flush_lock) so we don't
3086        //    introduce a permit-while-holding-flush-lock convoy.
3087        let permit = coord.acquire_permit().await?;
3088        let seq = coord.next_rotate_seq();
3089        coord.note_pending();
3090        // 2. Rotate under flush_lock (µs work).
3091        let RotateOutput {
3092            old_l0_arc,
3093            wal_lsn,
3094            current_version,
3095            flush_in_progress_guard,
3096        } = {
3097            let _flush_lock_guard = self.flush_lock.lock().await;
3098            self.flush_l0_rotate().await?
3099        };
3100        // 3. Build the coordinator's RotatedFlush. parent_manifest is the
3101        //    cached_manifest snapshot at this moment.
3102        let parent_manifest = self.cached_manifest.lock().clone();
3103        let rotated = RotatedFlush {
3104            seq,
3105            old_l0_arc: old_l0_arc.clone(),
3106            wal_lsn,
3107            current_version,
3108            name: name.clone(),
3109            parent_manifest,
3110            permit,
3111            flush_in_progress_guard,
3112        };
3113        // 4. Spawn the stream phase via the coordinator. The closure
3114        //    captures Arc<Writer> transiently — drops when stream
3115        //    completes (bounded, ~50-500 ms).
3116        let writer = self.clone();
3117        let ticket = coord.submit_for_stream(rotated, move |old_l0, wal, ver, n| async move {
3118            let outcome = writer.flush_stream_l1(old_l0, wal, ver, n).await?;
3119            Ok(crate::runtime::flush_coordinator::FlushOutcome {
3120                new_manifest: outcome.manifest,
3121                snapshot_id: outcome.snapshot_id,
3122            })
3123        });
3124        Ok(ticket)
3125    }
3126
3127    /// Phase A+B+C of the flush: flush the WAL, rotate L0 (so the
3128    /// to-be-flushed buffer moves to `pending_flush` and a fresh L0 takes
3129    /// its place), and hand off the WAL to the new L0.
3130    ///
3131    /// Runs in microseconds. Must be called under `flush_lock` (the caller
3132    /// is responsible). The returned [`RotateOutput`] carries everything
3133    /// the subsequent stream + finalize phases need; in particular the
3134    /// [`FlushInProgressGuard`] is bound to the return value so it stays
3135    /// alive for the full flush lifetime — including any future async
3136    /// path where stream runs on a spawned task.
3137    async fn flush_l0_rotate(&self) -> Result<RotateOutput> {
3138        // Acquire the in-progress counter BEFORE any heavy work. The
3139        // guard lives on RotateOutput; dropping RotateOutput drops the
3140        // guard, so the counter goes back to zero exactly when the flush
3141        // is fully done.
3142        let flush_in_progress_guard = FlushInProgressGuard::new(&self.storage);
3143
3144        // A. Flush WAL BEFORE rotating L0. If WAL flush fails, the
3145        // current L0 is still active and mutations are retained in
3146        // memory until restart/retry.
3147        let wal_for_truncate = {
3148            let current_l0 = self.l0_manager.get_current();
3149            let l0_guard = current_l0.read();
3150            l0_guard.wal.clone()
3151        };
3152        let wal_lsn = if let Some(ref w) = wal_for_truncate {
3153            w.flush().await?
3154        } else {
3155            0
3156        };
3157
3158        // B. Begin flush: rotate L0 and keep old L0 visible to reads via
3159        // pending_flush until complete_flush is called by finalize.
3160        let old_l0_arc = self.l0_manager.begin_flush(0, None);
3161        metrics::counter!("uni_l0_buffer_rotations_total").increment(1);
3162
3163        // C. WAL handoff: record wal_lsn on old L0, transfer WAL handle
3164        // and current_version to the new L0.
3165        let current_version;
3166        {
3167            let mut old_l0_guard = old_l0_arc.write();
3168            current_version = old_l0_guard.current_version;
3169            old_l0_guard.wal_lsn_at_flush = wal_lsn;
3170            let wal = old_l0_guard.wal.take();
3171            let new_l0_arc = self.l0_manager.get_current();
3172            let mut new_l0_guard = new_l0_arc.write();
3173            new_l0_guard.wal = wal;
3174            new_l0_guard.current_version = current_version;
3175        }
3176
3177        Ok(RotateOutput {
3178            old_l0_arc,
3179            wal_lsn,
3180            current_version,
3181            flush_in_progress_guard,
3182        })
3183    }
3184
3185    /// Phases D, E, F, G of the flush: L1 collect, orphan resolve,
3186    /// manifest seed, Lance writes. Reads from `old_l0_arc` (kept in
3187    /// pending_flush by Phase B); writes append-only Lance datasets; does
3188    /// NOT call save_snapshot / set_latest_snapshot — those are
3189    /// finalize's job, so the manifest doesn't get published until the
3190    /// next phase.
3191    ///
3192    /// Today takes `&self`; in a follow-up commit this becomes a
3193    /// static `Send + 'static` function over `SharedFlushCtx` so it can
3194    /// run on a spawned task while concurrent commits proceed.
3195    async fn flush_stream_l1(
3196        &self,
3197        old_l0_arc: Arc<RwLock<L0Buffer>>,
3198        wal_lsn: u64,
3199        current_version: u64,
3200        name: Option<String>,
3201    ) -> Result<FlushOutcome> {
3202        // Phase B: materialize any deferred embeddings before column
3203        // extraction. No-op when `defer_embeddings` is off (the set will
3204        // be empty). On-demand reads of the embedding column are a TODO
3205        // for a future revision (see UniConfig::defer_embeddings docs).
3206        self.drain_pending_embeddings(&old_l0_arc).await?;
3207
3208        let schema = self.schema_manager.schema();
3209        // 2. Acquire Read lock on Old L0 for flushing
3210        let mut entries_by_type: HashMap<u32, Vec<L1Entry>> = HashMap::new();
3211        // (Vid, labels, properties, deleted, version)
3212        type VertexEntry = (Vid, Vec<String>, Properties, bool, u64);
3213        let mut vertices_by_label: HashMap<u16, Vec<VertexEntry>> = HashMap::new();
3214        // Partial-column updates (Lance MergeInsert path). Per-VID tuple:
3215        // (vid, full L0 properties map, version, set of keys to update).
3216        // Only the keys in the HashSet are emitted to the partial source;
3217        // the full props map is retained so the per-row column extractor
3218        // can read each touched key's value.
3219        type PartialEntry = (Vid, Properties, u64, std::collections::HashSet<String>);
3220        let mut partial_by_label: HashMap<u16, Vec<PartialEntry>> = HashMap::new();
3221        // DELETE-via-MergeInsert (Round-12 §B): tombstones flush as a
3222        // partial source with just `_vid`, `_deleted=true`, `_version`,
3223        // `_updated_at`. Skips the wide-row Append payload that adds
3224        // nothing on a soft-delete.
3225        let mut tombstones_by_label: HashMap<u16, Vec<(Vid, u64)>> = HashMap::new();
3226        let mut main_vertex_tombstones: Vec<(Vid, u64)> = Vec::new();
3227        // Collect vertex timestamps from L0 for flushing to storage
3228        let mut vertex_created_at: HashMap<Vid, i64> = HashMap::new();
3229        let mut vertex_updated_at: HashMap<Vid, i64> = HashMap::new();
3230        // Track tombstones missing labels for storage query fallback
3231        let mut orphaned_tombstones: Vec<(Vid, u64)> = Vec::new();
3232
3233        {
3234            let old_l0 = old_l0_arc.read();
3235
3236            // 1. Collect all edges and tombstones from L0
3237            for edge in old_l0.graph.edges() {
3238                let properties = old_l0
3239                    .edge_properties
3240                    .get(&edge.eid)
3241                    .cloned()
3242                    .unwrap_or_default();
3243                let version = old_l0.edge_versions.get(&edge.eid).copied().unwrap_or(0);
3244
3245                // Get timestamps from L0 buffer (populated during insert)
3246                let created_at = old_l0.edge_created_at.get(&edge.eid).copied();
3247                let updated_at = old_l0.edge_updated_at.get(&edge.eid).copied();
3248
3249                entries_by_type
3250                    .entry(edge.edge_type)
3251                    .or_default()
3252                    .push(L1Entry {
3253                        src_vid: edge.src_vid,
3254                        dst_vid: edge.dst_vid,
3255                        eid: edge.eid,
3256                        op: Op::Insert,
3257                        version,
3258                        properties,
3259                        created_at,
3260                        updated_at,
3261                    });
3262            }
3263
3264            // From tombstones
3265            for tombstone in old_l0.tombstones.values() {
3266                let version = old_l0
3267                    .edge_versions
3268                    .get(&tombstone.eid)
3269                    .copied()
3270                    .unwrap_or(0);
3271                // Get timestamps - for deletes, updated_at reflects deletion time
3272                let created_at = old_l0.edge_created_at.get(&tombstone.eid).copied();
3273                let updated_at = old_l0.edge_updated_at.get(&tombstone.eid).copied();
3274
3275                entries_by_type
3276                    .entry(tombstone.edge_type)
3277                    .or_default()
3278                    .push(L1Entry {
3279                        src_vid: tombstone.src_vid,
3280                        dst_vid: tombstone.dst_vid,
3281                        eid: tombstone.eid,
3282                        op: Op::Delete,
3283                        version,
3284                        properties: HashMap::new(),
3285                        created_at,
3286                        updated_at,
3287                    });
3288            }
3289
3290            // 1b. Collect vertices by label (using vertex_labels from L0)
3291            //
3292            // Helper: fan-out a single vertex entry into per-label buckets.
3293            // Each per-label table row carries the full label set so multi-label
3294            // info is preserved after flush.
3295            let push_vertex_to_labels =
3296                |vid: Vid,
3297                 all_labels: &[String],
3298                 props: Properties,
3299                 deleted: bool,
3300                 version: u64,
3301                 out: &mut HashMap<u16, Vec<VertexEntry>>| {
3302                    for label in all_labels {
3303                        if let Some(label_id) = schema.label_id_by_name(label) {
3304                            out.entry(label_id).or_default().push((
3305                                vid,
3306                                all_labels.to_vec(),
3307                                props.clone(),
3308                                deleted,
3309                                version,
3310                            ));
3311                        }
3312                    }
3313                };
3314
3315            for (vid, props) in &old_l0.vertex_properties {
3316                let version = old_l0.vertex_versions.get(vid).copied().unwrap_or(0);
3317                // Collect timestamps for this vertex
3318                if let Some(&ts) = old_l0.vertex_created_at.get(vid) {
3319                    vertex_created_at.insert(*vid, ts);
3320                }
3321                if let Some(&ts) = old_l0.vertex_updated_at.get(vid) {
3322                    vertex_updated_at.insert(*vid, ts);
3323                }
3324                if let Some(labels) = old_l0.vertex_labels.get(vid) {
3325                    // Partial-write routing: when this VID was last
3326                    // touched via `insert_vertex_partial` AND the
3327                    // partial_lance_writes flag is on, send only the
3328                    // touched columns to a MergeInsert batch. Otherwise
3329                    // (CREATE, MERGE-ON-CREATE, full-replace SET, DELETE
3330                    // — or flag off) use the existing full-row Append.
3331                    let is_partial = self.config.partial_lance_writes
3332                        && old_l0.vertex_partial_keys.contains_key(vid);
3333                    if is_partial {
3334                        if let Some(touched) = old_l0.vertex_partial_keys.get(vid) {
3335                            for label in labels {
3336                                if let Some(label_id) = schema.label_id_by_name(label) {
3337                                    partial_by_label.entry(label_id).or_default().push((
3338                                        *vid,
3339                                        props.clone(),
3340                                        version,
3341                                        touched.clone(),
3342                                    ));
3343                                }
3344                            }
3345                        }
3346                    } else {
3347                        push_vertex_to_labels(
3348                            *vid,
3349                            labels,
3350                            props.clone(),
3351                            false,
3352                            version,
3353                            &mut vertices_by_label,
3354                        );
3355                    }
3356                }
3357            }
3358            for &vid in &old_l0.vertex_tombstones {
3359                let version = old_l0.vertex_versions.get(&vid).copied().unwrap_or(0);
3360                if let Some(&ts) = old_l0.vertex_updated_at.get(&vid) {
3361                    vertex_updated_at.insert(vid, ts);
3362                }
3363                if let Some(labels) = old_l0.vertex_labels.get(&vid) {
3364                    // Round-12 §B: tombstones flush via Lance MergeInsert
3365                    // (just `_vid`, `_deleted=true`, `_version`,
3366                    // `_updated_at`) — skipping the wide-row Append.
3367                    // Unconditional (no `partial_lance_writes` gating);
3368                    // tombstone Append carries no useful payload.
3369                    for label in labels {
3370                        if let Some(label_id) = schema.label_id_by_name(label) {
3371                            tombstones_by_label
3372                                .entry(label_id)
3373                                .or_default()
3374                                .push((vid, version));
3375                        }
3376                    }
3377                } else {
3378                    // Tombstone missing labels (old WAL format) - collect for storage query fallback
3379                    orphaned_tombstones.push((vid, version));
3380                }
3381            }
3382        } // Drop read lock
3383
3384        // Resolve orphaned tombstones (missing labels) from storage
3385        if !orphaned_tombstones.is_empty() {
3386            tracing::warn!(
3387                count = orphaned_tombstones.len(),
3388                "Tombstones missing labels in L0, querying storage as fallback"
3389            );
3390            for (vid, version) in orphaned_tombstones {
3391                if let Ok(Some(labels)) = self.find_vertex_labels_in_storage(vid).await
3392                    && !labels.is_empty()
3393                {
3394                    for label in &labels {
3395                        if let Some(label_id) = schema.label_id_by_name(label) {
3396                            // Round-12 §B: route through partial tombstone too.
3397                            tombstones_by_label
3398                                .entry(label_id)
3399                                .or_default()
3400                                .push((vid, version));
3401                        }
3402                    }
3403                }
3404            }
3405        }
3406
3407        // 1. Load previous snapshot from cache, or fall back to storage.
3408        //
3409        // Use clone() not take(): for the async path, multiple
3410        // concurrent streams may run; if we take() here, a sibling
3411        // stream sees cached_manifest = None and seeds from
3412        // load_latest_snapshot (stale), losing the chain. clone()
3413        // preserves the parent. Finalize writes back the new manifest
3414        // unconditionally.
3415        let mut manifest = if let Some(cached) = self.cached_manifest.lock().clone() {
3416            cached
3417        } else {
3418            self.storage
3419                .snapshot_manager()
3420                .load_latest_snapshot()
3421                .await?
3422                .unwrap_or_else(|| {
3423                    SnapshotManifest::new(Uuid::new_v4().to_string(), schema.schema_version)
3424                })
3425        };
3426
3427        // Update snapshot metadata
3428        // Save parent snapshot ID before generating new one (for lineage tracking)
3429        let parent_id = manifest.snapshot_id.clone();
3430        manifest.parent_snapshot = Some(parent_id);
3431        manifest.snapshot_id = Uuid::new_v4().to_string();
3432        manifest.name = name;
3433        manifest.created_at = Utc::now();
3434        manifest.version_high_water_mark = current_version;
3435        manifest.wal_high_water_mark = wal_lsn;
3436        let snapshot_id = manifest.snapshot_id.clone();
3437
3438        tracing::Span::current().record("snapshot_id", &snapshot_id);
3439
3440        // 2. Write main unified tables FIRST (before deltas).
3441        //    Ensures the dual-write invariant: by the time an EID appears in a
3442        //    delta table, it already exists in main_edges. This prevents the
3443        //    compaction debug_assert from firing when compaction interleaves
3444        //    with flush at async yield points.
3445        //
3446        // 2.1 Main edges table
3447        let (main_edges, edge_created_at_map, edge_updated_at_map) = {
3448            let _old_l0 = old_l0_arc.read();
3449            let mut main_edges: Vec<(
3450                uni_common::core::id::Eid,
3451                Vid,
3452                Vid,
3453                String,
3454                Properties,
3455                bool,
3456                u64,
3457            )> = Vec::new();
3458            let mut edge_created_at_map: HashMap<uni_common::core::id::Eid, i64> = HashMap::new();
3459            let mut edge_updated_at_map: HashMap<uni_common::core::id::Eid, i64> = HashMap::new();
3460
3461            for (&edge_type_id, entries) in entries_by_type.iter() {
3462                for entry in entries {
3463                    let edge_type_name = self
3464                        .storage
3465                        .schema_manager()
3466                        .edge_type_name_by_id_unified(edge_type_id)
3467                        .unwrap_or_else(|| "unknown".to_string());
3468
3469                    let deleted = matches!(entry.op, Op::Delete);
3470                    main_edges.push((
3471                        entry.eid,
3472                        entry.src_vid,
3473                        entry.dst_vid,
3474                        edge_type_name,
3475                        entry.properties.clone(),
3476                        deleted,
3477                        entry.version,
3478                    ));
3479
3480                    if let Some(ts) = entry.created_at {
3481                        edge_created_at_map.insert(entry.eid, ts);
3482                    }
3483                    if let Some(ts) = entry.updated_at {
3484                        edge_updated_at_map.insert(entry.eid, ts);
3485                    }
3486                }
3487            }
3488
3489            (main_edges, edge_created_at_map, edge_updated_at_map)
3490        };
3491
3492        if !main_edges.is_empty() {
3493            let main_edge_batch = MainEdgeDataset::build_record_batch(
3494                &main_edges,
3495                Some(&edge_created_at_map),
3496                Some(&edge_updated_at_map),
3497            )?;
3498            MainEdgeDataset::write_batch(self.storage.backend(), main_edge_batch).await?;
3499            MainEdgeDataset::ensure_default_indexes(self.storage.backend()).await?;
3500        }
3501
3502        // 2.2 Main vertices table
3503        let main_vertices: Vec<(Vid, Vec<String>, Properties, bool, u64)> = {
3504            let old_l0 = old_l0_arc.read();
3505            let mut vertices = Vec::new();
3506
3507            // Live vertices: full-row Append on the main table (the
3508            // props_json blob is required for global ID lookups). For
3509            // partial-row VIDs (vertex_partial_keys non-empty), the
3510            // main table still needs the full props for the
3511            // ext_id-uniqueness path; we keep the Append here. The
3512            // per-label Lance write IS partial via MergeInsert.
3513            for (vid, props) in &old_l0.vertex_properties {
3514                let version = old_l0.vertex_versions.get(vid).copied().unwrap_or(0);
3515                let labels = old_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
3516                vertices.push((*vid, labels, props.clone(), false, version));
3517            }
3518
3519            // Tombstones: collected into `main_vertex_tombstones` for
3520            // the MergeInsert path below; skipping the wide-row Append.
3521            for &vid in &old_l0.vertex_tombstones {
3522                let version = old_l0.vertex_versions.get(&vid).copied().unwrap_or(0);
3523                main_vertex_tombstones.push((vid, version));
3524            }
3525
3526            vertices
3527        };
3528
3529        if !main_vertices.is_empty() {
3530            let main_vertex_batch = MainVertexDataset::build_record_batch(
3531                &main_vertices,
3532                Some(&vertex_created_at),
3533                Some(&vertex_updated_at),
3534            )?;
3535            MainVertexDataset::write_batch(self.storage.backend(), main_vertex_batch).await?;
3536        }
3537        // Round-12 §B: tombstones via MergeInsert on the main vertices
3538        // table. Independent of `vertex_properties` length.
3539        if !main_vertex_tombstones.is_empty() {
3540            let tomb_batch = MainVertexDataset::build_tombstone_partial_batch(
3541                &main_vertex_tombstones,
3542                Some(&vertex_updated_at),
3543            )?;
3544            MainVertexDataset::merge_insert_tombstone_batch(self.storage.backend(), tomb_batch)
3545                .await?;
3546        }
3547        if !main_vertices.is_empty() || !main_vertex_tombstones.is_empty() {
3548            MainVertexDataset::ensure_default_indexes(self.storage.backend()).await?;
3549        }
3550
3551        // 3. For each edge type, write FWD and BWD delta runs
3552        for (&edge_type_id, entries) in entries_by_type.iter() {
3553            // Get edge type name from unified lookup (handles both schema'd and schemaless)
3554            let edge_type_name = self
3555                .storage
3556                .schema_manager()
3557                .edge_type_name_by_id_unified(edge_type_id)
3558                .ok_or_else(|| anyhow!("Edge type ID {} not found", edge_type_id))?;
3559
3560            // FWD Run (sorted by src_vid)
3561            // Round-12 §A: split entries into full-row Append and
3562            // partial MergeInsert routes based on `edge_partial_keys`.
3563            // Edges in `edge_partial_keys` were last written via
3564            // `insert_edge_partial_full`; the per-edge-type delta
3565            // tables receive only the touched schema columns plus
3566            // (when any overflow key was touched) the regenerated
3567            // `overflow_json` blob. Untouched columns retain their
3568            // previous-version value via Lance MergeInsert.
3569            let partial_eids: std::collections::HashSet<Eid> = {
3570                let old_l0 = old_l0_arc.read();
3571                entries
3572                    .iter()
3573                    .filter(|e| {
3574                        self.config.partial_lance_writes
3575                            && old_l0.edge_partial_keys.contains_key(&e.eid)
3576                    })
3577                    .map(|e| e.eid)
3578                    .collect()
3579            };
3580            let touched_union_by_eid: HashMap<Eid, std::collections::HashSet<String>> = {
3581                let old_l0 = old_l0_arc.read();
3582                partial_eids
3583                    .iter()
3584                    .filter_map(|eid| old_l0.edge_partial_keys.get(eid).map(|s| (*eid, s.clone())))
3585                    .collect()
3586            };
3587            let (full_entries, partial_entries): (Vec<L1Entry>, Vec<L1Entry>) = entries
3588                .clone()
3589                .into_iter()
3590                .partition(|e| !partial_eids.contains(&e.eid));
3591
3592            let backend = self.storage.backend();
3593
3594            // FWD run (sorted by src_vid)
3595            let mut fwd_full = full_entries.clone();
3596            fwd_full.sort_by_key(|e| e.src_vid);
3597            let mut fwd_partial = partial_entries.clone();
3598            fwd_partial.sort_by_key(|e| e.src_vid);
3599            let fwd_ds = self.storage.delta_dataset(&edge_type_name, "fwd")?;
3600            if !fwd_full.is_empty() {
3601                let fwd_batch = fwd_ds.build_record_batch(&fwd_full, &schema)?;
3602                fwd_ds.write_run(backend, fwd_batch).await?;
3603            }
3604            if !fwd_partial.is_empty() {
3605                let touched_union: std::collections::HashSet<String> = fwd_partial
3606                    .iter()
3607                    .flat_map(|e| {
3608                        touched_union_by_eid
3609                            .get(&e.eid)
3610                            .cloned()
3611                            .unwrap_or_default()
3612                            .into_iter()
3613                    })
3614                    .collect();
3615                let fwd_partial_batch =
3616                    fwd_ds.build_partial_record_batch(&fwd_partial, &touched_union, &schema)?;
3617                fwd_ds
3618                    .merge_insert_partial_run(backend, fwd_partial_batch)
3619                    .await?;
3620            }
3621            fwd_ds.ensure_eid_index(backend).await?;
3622
3623            // BWD Run (sorted by dst_vid)
3624            let mut bwd_full = full_entries.clone();
3625            bwd_full.sort_by_key(|e| e.dst_vid);
3626            let mut bwd_partial = partial_entries.clone();
3627            bwd_partial.sort_by_key(|e| e.dst_vid);
3628            let bwd_ds = self.storage.delta_dataset(&edge_type_name, "bwd")?;
3629            if !bwd_full.is_empty() {
3630                let bwd_batch = bwd_ds.build_record_batch(&bwd_full, &schema)?;
3631                bwd_ds.write_run(backend, bwd_batch).await?;
3632            }
3633            if !bwd_partial.is_empty() {
3634                let touched_union: std::collections::HashSet<String> = bwd_partial
3635                    .iter()
3636                    .flat_map(|e| {
3637                        touched_union_by_eid
3638                            .get(&e.eid)
3639                            .cloned()
3640                            .unwrap_or_default()
3641                            .into_iter()
3642                    })
3643                    .collect();
3644                let bwd_partial_batch =
3645                    bwd_ds.build_partial_record_batch(&bwd_partial, &touched_union, &schema)?;
3646                bwd_ds
3647                    .merge_insert_partial_run(backend, bwd_partial_batch)
3648                    .await?;
3649            }
3650            bwd_ds.ensure_eid_index(backend).await?;
3651
3652            // Update Manifest
3653            let current_snap =
3654                manifest
3655                    .edges
3656                    .entry(edge_type_name.to_string())
3657                    .or_insert(EdgeSnapshot {
3658                        version: 0,
3659                        count: 0,
3660                        lance_version: 0,
3661                    });
3662            current_snap.version += 1;
3663            current_snap.count += entries.len() as u64;
3664            // LanceDB tables don't expose Lance version directly
3665            current_snap.lance_version = 0;
3666
3667            // Note: No CSR invalidation needed. AdjacencyManager's overlay
3668            // already has these edges via dual-write in insert_edge/delete_edge.
3669        }
3670
3671        // 4. Per-label vertex table writes
3672        // Iterate all labels that have either full-row OR partial-write
3673        // data pending. A label may appear in only one of the two maps
3674        // (e.g., all updates on this label were partial-only).
3675        let all_label_ids: std::collections::HashSet<u16> = vertices_by_label
3676            .keys()
3677            .chain(partial_by_label.keys())
3678            .chain(tombstones_by_label.keys())
3679            .copied()
3680            .collect();
3681        for label_id in all_label_ids {
3682            let vertices = vertices_by_label.remove(&label_id).unwrap_or_default();
3683            let label_name = schema
3684                .label_name_by_id(label_id)
3685                .ok_or_else(|| anyhow!("Label ID {} not found", label_id))?;
3686
3687            let ds = self.storage.vertex_dataset(label_name)?;
3688
3689            // Collect inverted index updates before consuming vertices
3690            // Maps: cfg.property -> (added, removed)
3691            type InvertedUpdateMap = HashMap<String, (HashMap<Vid, Vec<String>>, HashSet<Vid>)>;
3692            let mut inverted_updates: InvertedUpdateMap = HashMap::new();
3693
3694            for idx in &schema.indexes {
3695                if let IndexDefinition::Inverted(cfg) = idx
3696                    && cfg.label == label_name
3697                {
3698                    let mut added: HashMap<Vid, Vec<String>> = HashMap::new();
3699                    let mut removed: HashSet<Vid> = HashSet::new();
3700
3701                    for (vid, _labels, props, deleted, _version) in &vertices {
3702                        if *deleted {
3703                            removed.insert(*vid);
3704                        } else if let Some(prop_value) = props.get(&cfg.property) {
3705                            // Extract terms from the property value (List<String>)
3706                            if let Some(arr) = prop_value.as_array() {
3707                                let terms: Vec<String> = arr
3708                                    .iter()
3709                                    .filter_map(|v| v.as_str().map(ToString::to_string))
3710                                    .collect();
3711                                if !terms.is_empty() {
3712                                    added.insert(*vid, terms);
3713                                }
3714                            }
3715                        }
3716                    }
3717                    // Round-12 §B: tombstones no longer in `vertices`;
3718                    // pull them from `tombstones_by_label` for inverted
3719                    // index removal.
3720                    if let Some(tomb_rows) = tombstones_by_label.get(&label_id) {
3721                        for (vid, _) in tomb_rows {
3722                            removed.insert(*vid);
3723                        }
3724                    }
3725
3726                    if !added.is_empty() || !removed.is_empty() {
3727                        inverted_updates.insert(cfg.property.clone(), (added, removed));
3728                    }
3729                }
3730            }
3731
3732            let mut v_data = Vec::new();
3733            let mut d_data = Vec::new();
3734            let mut ver_data = Vec::new();
3735            for (vid, labels, props, deleted, version) in vertices {
3736                v_data.push((vid, labels, props));
3737                d_data.push(deleted);
3738                ver_data.push(version);
3739            }
3740
3741            let backend = self.storage.backend();
3742
3743            // Skip the full-row Append entirely if this label only has
3744            // partial-write rows pending.
3745            if !v_data.is_empty() {
3746                let batch = ds.build_record_batch_with_timestamps(
3747                    &v_data,
3748                    &d_data,
3749                    &ver_data,
3750                    &schema,
3751                    Some(&vertex_created_at),
3752                    Some(&vertex_updated_at),
3753                )?;
3754                ds.write_batch(backend, batch, &schema).await?;
3755            }
3756
3757            // Partial-column batch (Lance MergeInsert path). The flag
3758            // gates whether the routing classified any VIDs as partial;
3759            // outside the flag this collection is always empty so the
3760            // call below is a cheap no-op.
3761            if let Some(partial_rows) = partial_by_label.remove(&label_id)
3762                && !partial_rows.is_empty()
3763            {
3764                let touched_union: std::collections::HashSet<String> = partial_rows
3765                    .iter()
3766                    .flat_map(|(_, _, _, keys)| keys.iter().cloned())
3767                    .collect();
3768                let pairs: Vec<(Vid, Properties)> = partial_rows
3769                    .iter()
3770                    .map(|(vid, props, _, _)| (*vid, props.clone()))
3771                    .collect();
3772                let versions: Vec<u64> = partial_rows.iter().map(|(_, _, v, _)| *v).collect();
3773                let partial_batch = ds.build_partial_record_batch(
3774                    &pairs,
3775                    &versions,
3776                    &touched_union,
3777                    &schema,
3778                    Some(&vertex_updated_at),
3779                )?;
3780                if partial_batch.num_rows() > 0 {
3781                    ds.merge_insert_batch(backend, partial_batch).await?;
3782                }
3783            }
3784
3785            // Tombstone batch (Round-12 §B): always MergeInsert with
3786            // just `_vid`, `_deleted=true`, `_version`, `_updated_at`.
3787            // No partial_lance_writes gating — tombstones never carry
3788            // useful property payload to write. Captured tombstone vids
3789            // also drive `remove_from_vid_labels_index` below.
3790            let tombstone_rows = tombstones_by_label.remove(&label_id).unwrap_or_default();
3791            if !tombstone_rows.is_empty() {
3792                let tomb_batch =
3793                    ds.build_tombstone_partial_batch(&tombstone_rows, Some(&vertex_updated_at))?;
3794                if tomb_batch.num_rows() > 0 {
3795                    ds.merge_insert_batch(backend, tomb_batch).await?;
3796                }
3797            }
3798
3799            ds.ensure_default_indexes(backend).await?;
3800
3801            // Update VidLabelsIndex (if enabled). v_data carries live
3802            // vertices; tombstone removals come from the
3803            // `tombstone_rows` captured above the MergeInsert call.
3804            for ((vid, labels, _props), &deleted) in v_data.iter().zip(d_data.iter()) {
3805                if deleted {
3806                    self.storage.remove_from_vid_labels_index(*vid);
3807                } else {
3808                    self.storage.update_vid_labels_index(*vid, labels.clone());
3809                }
3810            }
3811            for (vid, _) in &tombstone_rows {
3812                self.storage.remove_from_vid_labels_index(*vid);
3813            }
3814
3815            // Update Manifest
3816            let current_snap =
3817                manifest
3818                    .vertices
3819                    .entry(label_name.to_string())
3820                    .or_insert(LabelSnapshot {
3821                        version: 0,
3822                        count: 0,
3823                        lance_version: 0,
3824                    });
3825            current_snap.version += 1;
3826            current_snap.count += v_data.len() as u64;
3827            // LanceDB tables don't expose Lance version directly
3828            current_snap.lance_version = 0;
3829
3830            // Invalidate table cache to ensure next read picks up new version
3831            self.storage.invalidate_table_cache(label_name);
3832
3833            // Apply inverted index updates incrementally
3834            #[cfg(feature = "lance-backend")]
3835            for idx in &schema.indexes {
3836                if let IndexDefinition::Inverted(cfg) = idx
3837                    && cfg.label == label_name
3838                    && let Some((added, removed)) = inverted_updates.get(&cfg.property)
3839                {
3840                    self.storage
3841                        .index_manager()
3842                        .update_inverted_index_incremental(cfg, added, removed)
3843                        .await?;
3844                }
3845            }
3846
3847            // Update UID index with new vertex mappings
3848            // Collect (UniId, Vid) mappings from non-deleted vertices
3849            #[cfg(feature = "lance-backend")]
3850            {
3851                let mut uid_mappings: Vec<(uni_common::core::id::UniId, Vid)> = Vec::new();
3852                for (vid, _labels, props) in &v_data {
3853                    let ext_id = props.get("ext_id").and_then(|v| v.as_str());
3854                    let uid = crate::storage::vertex::VertexDataset::compute_vertex_uid(
3855                        label_name, ext_id, props,
3856                    );
3857                    uid_mappings.push((uid, *vid));
3858                }
3859
3860                if !uid_mappings.is_empty()
3861                    && let Ok(uid_index) = self.storage.uid_index(label_name)
3862                {
3863                    uid_index.write_mapping(&uid_mappings).await?;
3864                }
3865            }
3866        }
3867        Ok(FlushOutcome {
3868            manifest,
3869            snapshot_id,
3870        })
3871    }
3872
3873    /// Composition entry that assumes the caller already holds `flush_lock`.
3874    /// Runs rotate + stream + finalize_locked in sequence. Used by
3875    /// [`Writer::flush_to_l1`] (acquires the lock first) and by
3876    /// `commit_transaction_l0`'s post-merge auto-flush branch (which already
3877    /// holds the lock from the commit critical section).
3878    #[instrument(
3879        skip(self),
3880        fields(snapshot_id, mutations_count, size_bytes),
3881        level = "info"
3882    )]
3883    async fn flush_inline_under_lock(&self, name: Option<String>) -> Result<String> {
3884        let start = std::time::Instant::now();
3885
3886        let (initial_size, initial_count) = {
3887            let l0_arc = self.l0_manager.get_current();
3888            let l0 = l0_arc.read();
3889            (l0.estimated_size, l0.mutation_count)
3890        };
3891        tracing::Span::current().record("size_bytes", initial_size);
3892        tracing::Span::current().record("mutations_count", initial_count);
3893
3894        debug!("Starting L0 flush to L1");
3895
3896        // Phases A (WAL pre-flush), B (rotate), C (WAL handoff).
3897        // FlushInProgressGuard lives on RotateOutput and stays alive for
3898        // the full flush — including the finalize_locked call below.
3899        let RotateOutput {
3900            old_l0_arc,
3901            wal_lsn,
3902            current_version,
3903            flush_in_progress_guard: _flush_guard,
3904        } = self.flush_l0_rotate().await?;
3905
3906        // Phases D (L1 collect), E (orphan resolve), F (manifest seed),
3907        // G (Lance writes). Builds the manifest but does NOT publish it.
3908        let FlushOutcome {
3909            manifest,
3910            snapshot_id,
3911        } = self
3912            .flush_stream_l1(old_l0_arc.clone(), wal_lsn, current_version, name)
3913            .await?;
3914
3915        // Phases H..S: publish manifest, complete_flush, WAL truncate,
3916        // property cache clear, last_flush_time, metrics, l1_runs++,
3917        // compaction trigger, index-rebuild scheduling, fork tick.
3918        self.flush_finalize_locked(
3919            old_l0_arc,
3920            wal_lsn,
3921            manifest,
3922            snapshot_id,
3923            initial_size,
3924            initial_count,
3925            start,
3926        )
3927        .await
3928    }
3929
3930    /// Phases H..S of the flush: publish the manifest and run all
3931    /// post-publish bookkeeping. Assumes the caller already holds
3932    /// `flush_lock` — see [`Writer::flush_finalize_now`] for the
3933    /// lock-acquiring variant used by the async finalize path.
3934    #[allow(clippy::too_many_arguments)]
3935    async fn flush_finalize_locked(
3936        &self,
3937        old_l0_arc: Arc<RwLock<L0Buffer>>,
3938        wal_lsn: u64,
3939        manifest: SnapshotManifest,
3940        snapshot_id: String,
3941        initial_size: usize,
3942        initial_count: usize,
3943        start: std::time::Instant,
3944    ) -> Result<String> {
3945        Self::flush_finalize_body(
3946            &self.shared_ctx(),
3947            old_l0_arc,
3948            wal_lsn,
3949            manifest,
3950            snapshot_id,
3951            initial_size,
3952            initial_count,
3953            start,
3954        )
3955        .await
3956    }
3957
3958    /// Phases H..S of the flush, lock-acquiring variant. Used by the
3959    /// async-flush finalizer task (running on a spawned tokio task),
3960    /// which holds neither `&self` nor `flush_lock`. Briefly re-acquires
3961    /// `flush_lock` to serialize the publish boundary, then runs the
3962    /// same body as `flush_finalize_locked` but over a SharedFlushCtx.
3963    #[allow(clippy::too_many_arguments)]
3964    pub(crate) async fn flush_finalize_now(
3965        shared: SharedFlushCtx,
3966        old_l0_arc: Arc<RwLock<L0Buffer>>,
3967        wal_lsn: u64,
3968        manifest: SnapshotManifest,
3969        snapshot_id: String,
3970        initial_size: usize,
3971        initial_count: usize,
3972        start: std::time::Instant,
3973    ) -> Result<String> {
3974        let _flush_lock_guard = shared.flush_lock.clone().lock_owned().await;
3975        Self::flush_finalize_body(
3976            &shared,
3977            old_l0_arc,
3978            wal_lsn,
3979            manifest,
3980            snapshot_id,
3981            initial_size,
3982            initial_count,
3983            start,
3984        )
3985        .await
3986    }
3987
3988    /// Shared body of `flush_finalize_locked` and `flush_finalize_now`.
3989    /// Static over `SharedFlushCtx`; the caller is responsible for
3990    /// holding `flush_lock`.
3991    #[allow(clippy::too_many_arguments)]
3992    async fn flush_finalize_body(
3993        shared: &SharedFlushCtx,
3994        old_l0_arc: Arc<RwLock<L0Buffer>>,
3995        wal_lsn: u64,
3996        mut manifest: SnapshotManifest,
3997        snapshot_id: String,
3998        initial_size: usize,
3999        initial_count: usize,
4000        start: std::time::Instant,
4001    ) -> Result<String> {
4002        // Parent-snapshot fixup. The stream phase built `manifest` with
4003        // parent_snapshot set from cached_manifest at stream time. If
4004        // OTHER flushes (sync or async) have finalized since then,
4005        // cached_manifest has advanced. Re-link this manifest to the
4006        // current cached chain so we don't orphan their data when we
4007        // overwrite cached_manifest below.
4008        let current_parent_id = shared
4009            .cached_manifest
4010            .lock()
4011            .as_ref()
4012            .map(|m| m.snapshot_id.clone());
4013        if current_parent_id.is_some() && manifest.parent_snapshot != current_parent_id {
4014            manifest.parent_snapshot = current_parent_id;
4015            metrics::counter!("uni_flush_parent_chain_fixups_total").increment(1);
4016        }
4017
4018        // H. Publish manifest (body first, then pointer — recovery is
4019        // idempotent if we crash between the two).
4020        shared
4021            .storage
4022            .snapshot_manager()
4023            .save_snapshot(&manifest)
4024            .await?;
4025        shared
4026            .storage
4027            .snapshot_manager()
4028            .set_latest_snapshot(&manifest.snapshot_id)
4029            .await?;
4030
4031        // I. Cache manifest for next flush to avoid re-reading from object store.
4032        *shared.cached_manifest.lock() = Some(manifest.clone());
4033
4034        // J. Complete flush: remove old L0 from pending_flush. MUST happen
4035        // BEFORE WAL truncation so min_pending_wal_lsn is accurate.
4036        shared.l0_manager.complete_flush(&old_l0_arc);
4037
4038        // K. Truncate WAL up to the safe LSN.
4039        let wal_handle = shared.l0_manager.get_current().read().wal.clone();
4040        if let Some(w) = wal_handle {
4041            let safe_lsn = shared
4042                .l0_manager
4043                .min_pending_wal_lsn()
4044                .map(|min_pending| min_pending.min(wal_lsn))
4045                .unwrap_or(wal_lsn);
4046            w.truncate_before(safe_lsn).await?;
4047        }
4048
4049        // L. Invalidate property cache after flush to prevent stale reads.
4050        if let Some(ref pm) = shared.property_manager {
4051            pm.clear_cache().await;
4052        }
4053
4054        // M. Reset last flush time for time-based auto-flush.
4055        *shared.last_flush_time.lock() = std::time::Instant::now();
4056
4057        info!(
4058            snapshot_id,
4059            mutations_count = initial_count,
4060            size_bytes = initial_size,
4061            "L0 flush to L1 completed successfully"
4062        );
4063        metrics::histogram!("uni_flush_duration_seconds").record(start.elapsed().as_secs_f64());
4064        metrics::counter!("uni_flush_bytes_total").increment(initial_size as u64);
4065        metrics::counter!("uni_flush_rows_total").increment(initial_count as u64);
4066
4067        // P. Increment flush generation counter for write throttling.
4068        {
4069            let mut status = uni_common::sync::acquire_mutex(
4070                &shared.storage.compaction_status,
4071                "compaction_status",
4072            )?;
4073            status.l1_runs += 1;
4074        }
4075
4076        // Q. Trigger CSR compaction if enough frozen segments have accumulated.
4077        let am = shared.adjacency_manager.clone();
4078        if am.should_compact(shared.compaction_config.frozen_segments_compact_threshold) {
4079            let previous_still_running = {
4080                let guard = shared.compaction_handle.read();
4081                guard.as_ref().is_some_and(|h| !h.is_finished())
4082            };
4083            if previous_still_running {
4084                info!("Skipping compaction: previous compaction still in progress");
4085            } else {
4086                let handle = tokio::spawn(async move {
4087                    am.compact();
4088                });
4089                *shared.compaction_handle.write() = Some(handle);
4090            }
4091        }
4092
4093        // R. Post-flush: check if any indexes need rebuilding based on thresholds.
4094        if shared.auto_rebuild_enabled
4095            && let Some(rebuild_mgr) = shared.index_rebuild_manager.get()
4096        {
4097            Self::schedule_index_rebuilds_if_needed_static(
4098                &manifest,
4099                rebuild_mgr.clone(),
4100                shared.schema_manager.clone(),
4101                shared.index_rebuild_config.clone(),
4102            );
4103        }
4104
4105        // S. Emit fork-fragment observability after a successful forked flush.
4106        Self::tick_fork_fragment_observability_static(
4107            shared.fork_id,
4108            shared.fork_flush_count.clone(),
4109            shared.fork_fragment_warn_fired.clone(),
4110            shared.fork_fragment_warn_threshold,
4111        );
4112
4113        Ok(snapshot_id)
4114    }
4115
4116    /// Increment fork-flush bookkeeping and fire the fragment warn
4117    /// once if the threshold is crossed.
4118    ///
4119    /// Each flush typically appends ~1 fragment per touched dataset on
4120    /// the fork's branches; without compaction (deferred to Phase 5)
4121    /// long-lived heavy-write forks degrade. The flush count is a
4122    /// proxy for actual fragment growth — reading
4123    /// `Dataset::manifest().fragments.len()` per dataset would add a
4124    /// per-flush object-store roundtrip on the hot commit path, which
4125    /// is too costly for a purely observational guard rail.
4126    ///
4127    /// No-op for primary writers (`fork_id == None`).
4128    #[allow(dead_code)] // called by tests; production path uses _static
4129    pub(crate) fn tick_fork_fragment_observability(&self) {
4130        Self::tick_fork_fragment_observability_static(
4131            self.fork_id,
4132            self.fork_flush_count.clone(),
4133            self.fork_fragment_warn_fired.clone(),
4134            self.config.fork_fragment_warn_threshold,
4135        );
4136    }
4137
4138    /// Static variant of [`Writer::tick_fork_fragment_observability`].
4139    /// Used by the async-flush finalize path, where we hold a
4140    /// [`SharedFlushCtx`] bundle of Arcs rather than `&Writer`.
4141    pub(crate) fn tick_fork_fragment_observability_static(
4142        fork_id: Option<ForkId>,
4143        fork_flush_count: Arc<AtomicU64>,
4144        fork_fragment_warn_fired: Arc<AtomicBool>,
4145        warn_threshold: usize,
4146    ) {
4147        let Some(fork_id) = fork_id else { return };
4148        // `Relaxed` is sufficient: observational counter, no synchronizes-with.
4149        let new_count = fork_flush_count.fetch_add(1, Ordering::Relaxed) + 1;
4150        let fork_label = fork_id.to_string();
4151        metrics::gauge!(
4152            "uni_fork_l1_flushes",
4153            "fork" => fork_label.clone(),
4154        )
4155        .set(new_count as f64);
4156        let threshold = warn_threshold as u64;
4157        if !fork_fragment_warn_fired.load(Ordering::Relaxed)
4158            && threshold > 0
4159            && new_count >= threshold
4160        {
4161            fork_fragment_warn_fired.store(true, Ordering::Relaxed);
4162            tracing::warn!(
4163                fork = %fork_label,
4164                flush_count = new_count,
4165                threshold,
4166                "fork has exceeded the L1 flush-count threshold; \
4167                 fork compaction is deferred to Phase 5 — consider \
4168                 drop+recreate or promotion to bound fragment growth"
4169            );
4170        }
4171    }
4172
4173    /// Check rebuild thresholds and schedule background index rebuilds for
4174    /// labels that exceed growth or age limits. Marks affected indexes as
4175    /// `Stale` and spawns an async task to schedule the rebuild.
4176    #[allow(dead_code)] // production path uses _static; kept as the
4177    // documented instance entry point.
4178    fn schedule_index_rebuilds_if_needed(
4179        &self,
4180        manifest: &SnapshotManifest,
4181        rebuild_mgr: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
4182    ) {
4183        Self::schedule_index_rebuilds_if_needed_static(
4184            manifest,
4185            rebuild_mgr,
4186            self.schema_manager.clone(),
4187            self.config.index_rebuild.clone(),
4188        );
4189    }
4190
4191    /// Static variant of [`Writer::schedule_index_rebuilds_if_needed`].
4192    /// Used by the async-flush finalize path, where we hold the
4193    /// [`SchemaManager`] via `SharedFlushCtx` rather than `&Writer`.
4194    pub(crate) fn schedule_index_rebuilds_if_needed_static(
4195        manifest: &SnapshotManifest,
4196        rebuild_mgr: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
4197        schema_manager: Arc<uni_common::core::schema::SchemaManager>,
4198        index_rebuild_config: uni_common::config::IndexRebuildConfig,
4199    ) {
4200        let checker =
4201            crate::storage::index_rebuild::RebuildTriggerChecker::new(index_rebuild_config);
4202        let schema = schema_manager.schema();
4203        let labels = checker.labels_needing_rebuild(manifest, &schema.indexes);
4204
4205        if labels.is_empty() {
4206            return;
4207        }
4208
4209        // Mark affected indexes as Stale
4210        for label in &labels {
4211            for idx in &schema.indexes {
4212                if idx.label() == label {
4213                    let _ = schema_manager.update_index_metadata(idx.name(), |m| {
4214                        m.status = uni_common::core::schema::IndexStatus::Stale;
4215                    });
4216                }
4217            }
4218        }
4219
4220        tokio::spawn(async move {
4221            if let Err(e) = rebuild_mgr.schedule(labels).await {
4222                tracing::warn!("Failed to schedule index rebuild: {e}");
4223            }
4224        });
4225    }
4226}
4227
4228/// `FinalizeFn` implementation that the `FlushCoordinator` invokes from
4229/// its single-task finalizer loop. Unit struct on purpose: it must NOT
4230/// hold `Arc<Writer>` (that would create a reference cycle Writer ->
4231/// FlushCoordinator -> Arc<dyn FinalizeFn> -> Writer). All state needed
4232/// for finalize travels in via `SharedFlushCtx`.
4233pub(crate) struct WriterFinalizer;
4234
4235impl FinalizeFn for WriterFinalizer {
4236    fn finalize<'a>(
4237        &'a self,
4238        rotated: RotatedFlush,
4239        outcome: AsyncFlushOutcome,
4240        shared: SharedFlushCtx,
4241    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<String>> + Send + 'a>> {
4242        Box::pin(async move {
4243            // Read initial_size / initial_count from the rotated L0 so
4244            // we don't have to plumb them through the coordinator
4245            // submission. The buffer is still alive in pending_flush
4246            // until `complete_flush` (J) below pops it.
4247            let (initial_size, initial_count) = {
4248                let l0 = rotated.old_l0_arc.read();
4249                (l0.estimated_size, l0.mutation_count)
4250            };
4251            let result = Writer::flush_finalize_now(
4252                shared,
4253                rotated.old_l0_arc.clone(),
4254                rotated.wal_lsn,
4255                outcome.new_manifest,
4256                outcome.snapshot_id,
4257                initial_size,
4258                initial_count,
4259                std::time::Instant::now(),
4260            )
4261            .await;
4262            // `rotated` (permit + flush_in_progress_guard) drops here.
4263            drop(rotated.permit);
4264            result
4265        })
4266    }
4267
4268    fn finalize_failure<'a>(
4269        &'a self,
4270        rotated: RotatedFlush,
4271        err: anyhow::Error,
4272        _shared: SharedFlushCtx,
4273    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Error> + Send + 'a>> {
4274        Box::pin(async move {
4275            tracing::warn!(
4276                error = %err,
4277                seq = rotated.seq,
4278                "async flush stream failed; old L0 remains in pending_flush, \
4279                 WAL retains its data, recovery via WAL replay on restart"
4280            );
4281            metrics::counter!("uni_flush_failures_total").increment(1);
4282            // Permit + guard drop here so back-pressure releases even on
4283            // failure.
4284            drop(rotated.permit);
4285            err
4286        })
4287    }
4288}
4289
4290#[cfg(test)]
4291mod tests {
4292    use super::*;
4293    use tempfile::tempdir;
4294
4295    /// Test that commit_transaction writes mutations to WAL before merging to main L0.
4296    /// This verifies fix for issue #137 (transaction commit atomicity).
4297    #[tokio::test]
4298    async fn test_commit_transaction_wal_before_merge() -> Result<()> {
4299        use crate::runtime::wal::WriteAheadLog;
4300        use crate::storage::manager::StorageManager;
4301        use object_store::local::LocalFileSystem;
4302        use object_store::path::Path as ObjectStorePath;
4303        use uni_common::core::schema::SchemaManager;
4304
4305        let dir = tempdir()?;
4306        let path = dir.path().to_str().unwrap();
4307        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
4308        let schema_path = ObjectStorePath::from("schema.json");
4309
4310        let schema_manager =
4311            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
4312        let _label_id = schema_manager.add_label("Test")?;
4313        schema_manager.save().await?;
4314
4315        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
4316
4317        // Create WAL for main L0
4318        let wal_path = ObjectStorePath::from("wal");
4319        let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
4320
4321        let writer = Writer::new_with_config(
4322            storage.clone(),
4323            schema_manager.clone(),
4324            1,
4325            UniConfig::default(),
4326            Some(wal),
4327            None,
4328        )
4329        .await?;
4330
4331        // Begin transaction — create a transaction L0
4332        let tx_l0 = writer.create_transaction_l0();
4333
4334        // Insert data in transaction
4335        let vid_a = writer.next_vid().await?;
4336        let vid_b = writer.next_vid().await?;
4337
4338        let mut props = std::collections::HashMap::new();
4339        props.insert("test".to_string(), Value::String("data".to_string()));
4340
4341        writer
4342            .insert_vertex_with_labels(vid_a, props.clone(), &["Test".to_string()], Some(&tx_l0))
4343            .await?;
4344        writer
4345            .insert_vertex_with_labels(
4346                vid_b,
4347                std::collections::HashMap::new(),
4348                &["Test".to_string()],
4349                Some(&tx_l0),
4350            )
4351            .await?;
4352
4353        let eid = writer.next_eid(1).await?;
4354        writer
4355            .insert_edge(
4356                vid_a,
4357                vid_b,
4358                1,
4359                eid,
4360                std::collections::HashMap::new(),
4361                None,
4362                Some(&tx_l0),
4363            )
4364            .await?;
4365
4366        // Get WAL before commit
4367        let l0 = writer.l0_manager.get_current();
4368        let wal = l0.read().wal.clone().expect("Main L0 should have WAL");
4369        let mutations_before = wal.replay().await?;
4370        let count_before = mutations_before.len();
4371
4372        // Commit transaction - this should write to WAL first
4373        let writer = Arc::new(writer);
4374        writer.commit_transaction_l0(tx_l0).await?;
4375
4376        // Verify WAL has the new mutations
4377        let mutations_after = wal.replay().await?;
4378        assert!(
4379            mutations_after.len() > count_before,
4380            "WAL should contain transaction mutations after commit"
4381        );
4382
4383        // Verify mutations are in correct order: vertices first, then edges
4384        let new_mutations: Vec<_> = mutations_after.into_iter().skip(count_before).collect();
4385
4386        let mut saw_vertex_a = false;
4387        let mut saw_vertex_b = false;
4388        let mut saw_edge = false;
4389
4390        for mutation in &new_mutations {
4391            match mutation {
4392                crate::runtime::wal::Mutation::InsertVertex { vid, .. } => {
4393                    if *vid == vid_a {
4394                        saw_vertex_a = true;
4395                    }
4396                    if *vid == vid_b {
4397                        saw_vertex_b = true;
4398                    }
4399                    // Vertices should come before edges
4400                    assert!(!saw_edge, "Vertices should be logged to WAL before edges");
4401                }
4402                crate::runtime::wal::Mutation::InsertEdge { eid: e, .. } => {
4403                    if *e == eid {
4404                        saw_edge = true;
4405                    }
4406                    // Edges should come after vertices
4407                    assert!(
4408                        saw_vertex_a && saw_vertex_b,
4409                        "Edge should be logged after both vertices"
4410                    );
4411                }
4412                _ => {}
4413            }
4414        }
4415
4416        assert!(saw_vertex_a, "Vertex A should be in WAL");
4417        assert!(saw_vertex_b, "Vertex B should be in WAL");
4418        assert!(saw_edge, "Edge should be in WAL");
4419
4420        // Verify data is also in main L0
4421        let l0_read = l0.read();
4422        assert!(
4423            l0_read.vertex_properties.contains_key(&vid_a),
4424            "Vertex A should be in main L0"
4425        );
4426        assert!(
4427            l0_read.vertex_properties.contains_key(&vid_b),
4428            "Vertex B should be in main L0"
4429        );
4430        assert!(
4431            l0_read.edge_endpoints.contains_key(&eid),
4432            "Edge should be in main L0"
4433        );
4434
4435        Ok(())
4436    }
4437
4438    /// Test that failed WAL flush leaves transaction intact for retry or rollback.
4439    #[tokio::test]
4440    async fn test_commit_transaction_wal_failure_rollback() -> Result<()> {
4441        use crate::runtime::wal::WriteAheadLog;
4442        use crate::storage::manager::StorageManager;
4443        use object_store::local::LocalFileSystem;
4444        use object_store::path::Path as ObjectStorePath;
4445        use uni_common::core::schema::SchemaManager;
4446
4447        let dir = tempdir()?;
4448        let path = dir.path().to_str().unwrap();
4449        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
4450        let schema_path = ObjectStorePath::from("schema.json");
4451
4452        let schema_manager =
4453            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
4454        let _label_id = schema_manager.add_label("Test")?;
4455        let _baseline_label_id = schema_manager.add_label("Baseline")?;
4456        let _txdata_label_id = schema_manager.add_label("TxData")?;
4457        schema_manager.save().await?;
4458
4459        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
4460
4461        // Create WAL for main L0
4462        let wal_path = ObjectStorePath::from("wal");
4463        let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
4464
4465        let writer = Writer::new_with_config(
4466            storage.clone(),
4467            schema_manager.clone(),
4468            1,
4469            UniConfig::default(),
4470            Some(wal),
4471            None,
4472        )
4473        .await?;
4474
4475        // Insert baseline data (outside transaction)
4476        let baseline_vid = writer.next_vid().await?;
4477        writer
4478            .insert_vertex_with_labels(
4479                baseline_vid,
4480                [("baseline".to_string(), Value::Bool(true))]
4481                    .into_iter()
4482                    .collect(),
4483                &["Baseline".to_string()],
4484                None,
4485            )
4486            .await?;
4487
4488        // Begin transaction — create a transaction L0
4489        let tx_l0 = writer.create_transaction_l0();
4490
4491        // Insert data in transaction
4492        let tx_vid = writer.next_vid().await?;
4493        writer
4494            .insert_vertex_with_labels(
4495                tx_vid,
4496                [("tx_data".to_string(), Value::Bool(true))]
4497                    .into_iter()
4498                    .collect(),
4499                &["TxData".to_string()],
4500                Some(&tx_l0),
4501            )
4502            .await?;
4503
4504        // Capture main L0 state before rollback
4505        let l0 = writer.l0_manager.get_current();
4506        let vertex_count_before = l0.read().vertex_properties.len();
4507
4508        // Rollback transaction (simulating what would happen after WAL flush failure)
4509        drop(tx_l0);
4510
4511        // Verify main L0 is unchanged
4512        let vertex_count_after = l0.read().vertex_properties.len();
4513        assert_eq!(
4514            vertex_count_before, vertex_count_after,
4515            "Main L0 should not change after rollback"
4516        );
4517
4518        // Baseline should still be present
4519        assert!(
4520            l0.read().vertex_properties.contains_key(&baseline_vid),
4521            "Baseline data should remain"
4522        );
4523
4524        // Transaction data should NOT be in main L0
4525        assert!(
4526            !l0.read().vertex_properties.contains_key(&tx_vid),
4527            "Transaction data should not be in main L0 after rollback"
4528        );
4529
4530        Ok(())
4531    }
4532
4533    /// Test that batch insert with shared labels does not clone labels per vertex.
4534    /// This verifies fix for issue #161 (redundant label cloning).
4535    #[tokio::test]
4536    async fn test_batch_insert_shared_labels() -> Result<()> {
4537        use crate::storage::manager::StorageManager;
4538        use object_store::local::LocalFileSystem;
4539        use object_store::path::Path as ObjectStorePath;
4540        use uni_common::core::schema::SchemaManager;
4541
4542        let dir = tempdir()?;
4543        let path = dir.path().to_str().unwrap();
4544        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
4545        let schema_path = ObjectStorePath::from("schema.json");
4546
4547        let schema_manager =
4548            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
4549        let _label_id = schema_manager.add_label("Person")?;
4550        schema_manager.save().await?;
4551
4552        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
4553
4554        let writer = Writer::new(storage.clone(), schema_manager.clone(), 1).await?;
4555
4556        // Shared labels - should not be cloned per vertex
4557        let labels = &["Person".to_string()];
4558
4559        // Insert batch of vertices with same labels
4560        let mut vids = Vec::new();
4561        for i in 0..100 {
4562            let vid = writer.next_vid().await?;
4563            let mut props = std::collections::HashMap::new();
4564            props.insert("id".to_string(), Value::Int(i));
4565            writer
4566                .insert_vertex_with_labels(vid, props, labels, None)
4567                .await?;
4568            vids.push(vid);
4569        }
4570
4571        // Verify all vertices have the correct labels
4572        let l0 = writer.l0_manager.get_current();
4573        for vid in vids {
4574            let l0_guard = l0.read();
4575            let vertex_labels = l0_guard.vertex_labels.get(&vid);
4576            assert!(vertex_labels.is_some(), "Vertex should have labels");
4577            assert_eq!(
4578                vertex_labels.unwrap(),
4579                &vec!["Person".to_string()],
4580                "Labels should match"
4581            );
4582        }
4583
4584        Ok(())
4585    }
4586
4587    /// Test that estimated_size tracks mutations correctly and approximates size_bytes().
4588    /// This verifies fix for issue #147 (O(V+E) size_bytes() in metrics).
4589    #[tokio::test]
4590    async fn test_estimated_size_tracks_mutations() -> Result<()> {
4591        use crate::storage::manager::StorageManager;
4592        use object_store::local::LocalFileSystem;
4593        use object_store::path::Path as ObjectStorePath;
4594        use uni_common::core::schema::SchemaManager;
4595
4596        let dir = tempdir()?;
4597        let path = dir.path().to_str().unwrap();
4598        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
4599        let schema_path = ObjectStorePath::from("schema.json");
4600
4601        let schema_manager =
4602            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
4603        let _label_id = schema_manager.add_label("Test")?;
4604        schema_manager.save().await?;
4605
4606        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
4607
4608        let writer = Writer::new(storage.clone(), schema_manager.clone(), 1).await?;
4609
4610        let l0 = writer.l0_manager.get_current();
4611
4612        // Initial state should be empty
4613        let initial_estimated = l0.read().estimated_size;
4614        let initial_actual = l0.read().size_bytes();
4615        assert_eq!(initial_estimated, 0, "Initial estimated_size should be 0");
4616        assert_eq!(initial_actual, 0, "Initial size_bytes should be 0");
4617
4618        // Insert vertices with properties
4619        let mut vids = Vec::new();
4620        for i in 0..10 {
4621            let vid = writer.next_vid().await?;
4622            let mut props = std::collections::HashMap::new();
4623            props.insert("name".to_string(), Value::String(format!("vertex_{}", i)));
4624            props.insert("index".to_string(), Value::Int(i));
4625            writer
4626                .insert_vertex_with_labels(vid, props, &[], None)
4627                .await?;
4628            vids.push(vid);
4629        }
4630
4631        // Verify estimated_size grew
4632        let after_vertices_estimated = l0.read().estimated_size;
4633        let after_vertices_actual = l0.read().size_bytes();
4634        assert!(
4635            after_vertices_estimated > 0,
4636            "estimated_size should grow after insertions"
4637        );
4638
4639        // Verify estimated_size is within reasonable bounds of actual size (within 2x)
4640        let ratio = after_vertices_estimated as f64 / after_vertices_actual as f64;
4641        assert!(
4642            (0.5..=2.0).contains(&ratio),
4643            "estimated_size ({}) should be within 2x of size_bytes ({}), ratio: {}",
4644            after_vertices_estimated,
4645            after_vertices_actual,
4646            ratio
4647        );
4648
4649        // Insert edges with a simple edge type
4650        let edge_type = 1u32;
4651        for i in 0..9 {
4652            let eid = writer.next_eid(edge_type).await?;
4653            writer
4654                .insert_edge(
4655                    vids[i],
4656                    vids[i + 1],
4657                    edge_type,
4658                    eid,
4659                    std::collections::HashMap::new(),
4660                    Some("NEXT".to_string()),
4661                    None,
4662                )
4663                .await?;
4664        }
4665
4666        // Verify estimated_size grew further
4667        let after_edges_estimated = l0.read().estimated_size;
4668        let after_edges_actual = l0.read().size_bytes();
4669        assert!(
4670            after_edges_estimated > after_vertices_estimated,
4671            "estimated_size should grow after edge insertions"
4672        );
4673
4674        // Verify still within reasonable bounds
4675        let ratio = after_edges_estimated as f64 / after_edges_actual as f64;
4676        assert!(
4677            (0.5..=2.0).contains(&ratio),
4678            "estimated_size ({}) should be within 2x of size_bytes ({}), ratio: {}",
4679            after_edges_estimated,
4680            after_edges_actual,
4681            ratio
4682        );
4683
4684        Ok(())
4685    }
4686
4687    /// Test that flushing WAL on a writer with no mutations succeeds cleanly.
4688    #[tokio::test]
4689    async fn test_flush_wal_empty_l0_is_noop() -> Result<()> {
4690        use crate::runtime::wal::WriteAheadLog;
4691        use crate::storage::manager::StorageManager;
4692        use object_store::local::LocalFileSystem;
4693        use object_store::path::Path as ObjectStorePath;
4694        use uni_common::core::schema::SchemaManager;
4695
4696        let dir = tempdir()?;
4697        let path = dir.path().to_str().unwrap();
4698        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
4699        let schema_path = ObjectStorePath::from("schema.json");
4700
4701        let schema_manager =
4702            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
4703        schema_manager.save().await?;
4704
4705        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
4706
4707        let wal_path = ObjectStorePath::from("wal");
4708        let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
4709
4710        let writer = Writer::new_with_config(
4711            storage.clone(),
4712            schema_manager.clone(),
4713            1,
4714            UniConfig::default(),
4715            Some(wal.clone()),
4716            None,
4717        )
4718        .await?;
4719
4720        // Flush with no mutations — should succeed cleanly
4721        let lsn = writer.flush_wal().await?;
4722        // LSN should be 0 or 1 (no real mutations flushed)
4723        assert!(lsn <= 1, "Empty flush should produce low LSN, got {}", lsn);
4724
4725        Ok(())
4726    }
4727
4728    /// Test that transaction data does not leak into main L0 without commit.
4729    #[tokio::test]
4730    async fn test_transaction_isolation_without_commit() -> Result<()> {
4731        use crate::runtime::wal::WriteAheadLog;
4732        use crate::storage::manager::StorageManager;
4733        use object_store::local::LocalFileSystem;
4734        use object_store::path::Path as ObjectStorePath;
4735        use uni_common::core::schema::SchemaManager;
4736
4737        let dir = tempdir()?;
4738        let path = dir.path().to_str().unwrap();
4739        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
4740        let schema_path = ObjectStorePath::from("schema.json");
4741
4742        let schema_manager =
4743            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
4744        let _label_id = schema_manager.add_label("Person")?;
4745        schema_manager.save().await?;
4746
4747        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
4748
4749        let wal_path = ObjectStorePath::from("wal");
4750        let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
4751
4752        let writer = Writer::new_with_config(
4753            storage.clone(),
4754            schema_manager.clone(),
4755            1,
4756            UniConfig::default(),
4757            Some(wal),
4758            None,
4759        )
4760        .await?;
4761
4762        // Create transaction L0
4763        let tx_l0 = writer.create_transaction_l0();
4764
4765        // Insert vertex into transaction L0
4766        let vid = writer.next_vid().await?;
4767        writer
4768            .insert_vertex_with_labels(
4769                vid,
4770                [("name".to_string(), Value::String("Ghost".to_string()))]
4771                    .into_iter()
4772                    .collect(),
4773                &["Person".to_string()],
4774                Some(&tx_l0),
4775            )
4776            .await?;
4777
4778        // Verify data is in transaction L0
4779        assert!(
4780            tx_l0.read().vertex_properties.contains_key(&vid),
4781            "Transaction L0 should contain the vertex"
4782        );
4783
4784        // Verify data is NOT in main L0
4785        let main_l0 = writer.l0_manager.get_current();
4786        assert!(
4787            !main_l0.read().vertex_properties.contains_key(&vid),
4788            "Main L0 should NOT contain uncommitted transaction data"
4789        );
4790
4791        // Drop transaction without committing — data should be lost
4792        drop(tx_l0);
4793
4794        // Main L0 still should not have it
4795        assert!(
4796            !main_l0.read().vertex_properties.contains_key(&vid),
4797            "Main L0 should remain clean after dropped transaction"
4798        );
4799
4800        Ok(())
4801    }
4802
4803    /// Phase 2 Day 12: the fork-fragment warn fires exactly once when
4804    /// the flush count crosses the configured threshold and stays
4805    /// silent on subsequent flushes for the lifetime of the writer.
4806    /// Primary writers (`fork_id == None`) never fire it.
4807    ///
4808    /// Tested directly against `tick_fork_fragment_observability` so
4809    /// the contract is locked in independently of the broader
4810    /// `flush_to_l1` path (the end-to-end fork-flush path is blocked
4811    /// on Day 10's on-the-fly schema overlay growth).
4812    #[tokio::test]
4813    async fn fork_fragment_warn_fires_once_then_silences() -> Result<()> {
4814        use crate::storage::manager::StorageManager;
4815        use object_store::local::LocalFileSystem;
4816        use object_store::path::Path as ObjectStorePath;
4817        use uni_common::core::fork::ForkId;
4818        use uni_common::core::schema::SchemaManager;
4819
4820        let dir = tempdir()?;
4821        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
4822        let schema_path = ObjectStorePath::from("schema.json");
4823        let schema_manager =
4824            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
4825        let storage = Arc::new(
4826            StorageManager::new(dir.path().to_str().unwrap(), schema_manager.clone()).await?,
4827        );
4828
4829        let config = UniConfig {
4830            fork_fragment_warn_threshold: 3,
4831            ..Default::default()
4832        };
4833        let mut writer =
4834            Writer::new_with_config(storage, schema_manager, 1, config, None, None).await?;
4835
4836        // Primary path: never fires.
4837        for _ in 0..10 {
4838            writer.tick_fork_fragment_observability();
4839        }
4840        assert!(!writer.fork_fragment_warn_fired.load(Ordering::Relaxed));
4841        assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 0);
4842
4843        // Fork path: tag and tick. Below threshold → no fire.
4844        writer.fork_id = Some(ForkId::new());
4845        writer.tick_fork_fragment_observability();
4846        writer.tick_fork_fragment_observability();
4847        assert!(!writer.fork_fragment_warn_fired.load(Ordering::Relaxed));
4848        assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 2);
4849
4850        // Crossing threshold → fires once.
4851        writer.tick_fork_fragment_observability();
4852        assert!(writer.fork_fragment_warn_fired.load(Ordering::Relaxed));
4853        assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 3);
4854
4855        // Subsequent ticks bump the gauge but do not re-fire.
4856        let fired_after = writer.fork_fragment_warn_fired.load(Ordering::Relaxed);
4857        for _ in 0..5 {
4858            writer.tick_fork_fragment_observability();
4859        }
4860        assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 8);
4861        assert_eq!(
4862            writer.fork_fragment_warn_fired.load(Ordering::Relaxed),
4863            fired_after
4864        );
4865
4866        Ok(())
4867    }
4868
4869    /// The hot-path mutators must not write to any `Writer` struct field.
4870    /// Phase 2 of the refactor
4871    /// gave them `&self` receivers, which the compiler enforces against
4872    /// direct `self.x = y` assignment — but interior-mutable writes
4873    /// (Mutex/Atomic/OnceLock) still compile. This regression test snapshots
4874    /// every potentially-writable field, calls each hot-path mutator, and
4875    /// asserts no field changed.
4876    ///
4877    /// Cold-path methods (`flush_to_l1`, `commit_transaction_l0`,
4878    /// `tick_fork_fragment_observability`) DO mutate fields by design and
4879    /// are intentionally out of scope here.
4880    #[tokio::test]
4881    async fn hot_path_mutators_do_not_change_writer_fields() -> Result<()> {
4882        use crate::storage::manager::StorageManager;
4883        use object_store::local::LocalFileSystem;
4884        use object_store::path::Path as ObjectStorePath;
4885        use uni_common::core::schema::SchemaManager;
4886
4887        let dir = tempdir()?;
4888        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
4889        let schema_path = ObjectStorePath::from("schema.json");
4890        let schema_manager =
4891            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
4892        schema_manager.add_label("Person")?;
4893        schema_manager.save().await?;
4894        let storage = Arc::new(
4895            StorageManager::new(dir.path().to_str().unwrap(), schema_manager.clone()).await?,
4896        );
4897
4898        let writer =
4899            Writer::new_with_config(storage, schema_manager, 1, UniConfig::default(), None, None)
4900                .await?;
4901
4902        /// Captures every `Writer` field that *could* be written by a
4903        /// hot-path mutator (i.e., every non-Arc, non-immutable-after-
4904        /// construction field). Arc'd substructures (`l0_manager`,
4905        /// `storage`, etc.) are intentionally not checked — they are
4906        /// re-pointed only at construction.
4907        #[derive(Debug, PartialEq)]
4908        struct Snapshot {
4909            last_flush_time: std::time::Instant,
4910            cached_manifest_some: bool,
4911            fork_flush_count: u64,
4912            fork_fragment_warn_fired: bool,
4913            xervo_runtime_some: bool,
4914            index_rebuild_manager_some: bool,
4915            fork_id: Option<ForkId>,
4916        }
4917
4918        fn snap(w: &Writer) -> Snapshot {
4919            Snapshot {
4920                last_flush_time: *w.last_flush_time.lock(),
4921                cached_manifest_some: w.cached_manifest.lock().is_some(),
4922                fork_flush_count: w.fork_flush_count.load(Ordering::Relaxed),
4923                fork_fragment_warn_fired: w.fork_fragment_warn_fired.load(Ordering::Relaxed),
4924                xervo_runtime_some: w.xervo_runtime.get().is_some(),
4925                index_rebuild_manager_some: w.index_rebuild_manager.get().is_some(),
4926                fork_id: w.fork_id,
4927            }
4928        }
4929
4930        // 1. insert_vertex_with_labels
4931        let before = snap(&writer);
4932        let vid = writer.next_vid().await?;
4933        writer
4934            .insert_vertex_with_labels(vid, Properties::new(), &["Person".to_string()], None)
4935            .await?;
4936        assert_eq!(
4937            snap(&writer),
4938            before,
4939            "insert_vertex_with_labels mutated a Writer field"
4940        );
4941
4942        // 2. insert_vertices_batch
4943        let before = snap(&writer);
4944        let vids = writer.allocate_vids(2).await?;
4945        writer
4946            .insert_vertices_batch(
4947                vids,
4948                vec![Properties::new(), Properties::new()],
4949                vec!["Person".into()],
4950                None,
4951            )
4952            .await?;
4953        assert_eq!(
4954            snap(&writer),
4955            before,
4956            "insert_vertices_batch mutated a Writer field"
4957        );
4958
4959        // 3. delete_vertex
4960        let before = snap(&writer);
4961        writer.delete_vertex(vid, None, None).await?;
4962        assert_eq!(
4963            snap(&writer),
4964            before,
4965            "delete_vertex mutated a Writer field"
4966        );
4967
4968        // (insert_edge / delete_edge are skipped here: their fixture cost is
4969        // disproportionate to the audit's marginal value, and the same
4970        // structural argument plus the compiler-enforced `&self` covers them.)
4971
4972        Ok(())
4973    }
4974}