Skip to main content

uni_store/runtime/
writer.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::runtime::context::QueryContext;
5use crate::runtime::flush_coordinator::{
6    FinalizeFn, FlushCoordinator, FlushOutcome as AsyncFlushOutcome, RotatedFlush, SharedFlushCtx,
7};
8use crate::runtime::id_allocator::IdAllocator;
9use crate::runtime::l0::{L0Buffer, serialize_constraint_key};
10use crate::runtime::l0_manager::L0Manager;
11use crate::runtime::property_manager::PropertyManager;
12use crate::runtime::wal::WriteAheadLog;
13use crate::storage::adjacency_manager::AdjacencyManager;
14use crate::storage::delta::{L1Entry, Op};
15use crate::storage::main_edge::MainEdgeDataset;
16use crate::storage::main_vertex::MainVertexDataset;
17use crate::storage::manager::StorageManager;
18use anyhow::{Result, anyhow};
19use chrono::Utc;
20use metrics;
21use parking_lot::{Mutex as PlMutex, RwLock};
22use std::collections::{HashMap, HashSet};
23use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
24use std::sync::{Arc, OnceLock};
25use tracing::{debug, info, instrument};
26use uni_common::Properties;
27use uni_common::Value;
28use uni_common::config::UniConfig;
29use uni_common::core::fork::ForkId;
30use uni_common::core::id::{Eid, Vid};
31use uni_common::core::schema::{ConstraintTarget, ConstraintType, IndexDefinition};
32use uni_common::core::snapshot::{EdgeSnapshot, LabelSnapshot, SnapshotManifest};
33use uni_xervo::runtime::ModelRuntime;
34use uuid::Uuid;
35
36#[derive(Clone, Debug)]
37pub struct WriterConfig {
38    pub max_mutations: usize,
39    /// Enable the partial-column MergeInsert path for SET-only flushes.
40    ///
41    /// When `true`, `Writer::insert_vertex_partial` records the touched
42    /// property keys into `L0Buffer::vertex_partial_keys` and the flush
43    /// routes those VIDs through Lance `MergeInsertBuilder` with a
44    /// subset-of-schema source, skipping the read of (and write of)
45    /// the unchanged columns — including wide ones like embeddings.
46    ///
47    /// When `false`, `insert_vertex_partial` falls back to the
48    /// read-modify-write `insert_vertex_with_labels` path (preserving
49    /// bit-for-bit equivalence with prior releases). Default `false`
50    /// for the first release; flip to `true` after telemetry on the
51    /// issue #72 ingest workload confirms the win.
52    ///
53    /// See the soundness probe at
54    /// `crates/uni-store/tests/common/storage/lance_merge_insert_probe.rs`.
55    pub partial_lance_writes: bool,
56}
57
58impl Default for WriterConfig {
59    fn default() -> Self {
60        Self {
61            max_mutations: 10_000,
62            partial_lance_writes: false,
63        }
64    }
65}
66
67/// RAII latch on [`StorageManager::flush_in_progress`].
68///
69/// Sets the flag to `true` on construction (via CAS) and back to `false` on
70/// drop, so any `?` early-exit inside `flush_to_l1` cannot leave the flag
71/// stuck. Returns `None` if a flush is already in progress, providing
72/// forward-compatible exclusion once the outer writer-RwLock is removed in
73/// Phase 4 of the concurrent-writer refactor.
74// FlushInProgressGuard moved to storage/manager.rs so flush_coordinator.rs
75// can hold it on RotatedFlush without a writer.rs back-import cycle.
76pub use crate::storage::manager::FlushInProgressGuard;
77
78/// Output of [`Writer::flush_l0_rotate`]: the to-be-flushed L0 buffer,
79/// captured WAL LSN, current_version, and the in-progress guard whose
80/// lifetime spans the full flush (including the future async stream
81/// phase that runs on a spawned task).
82struct RotateOutput {
83    old_l0_arc: Arc<RwLock<L0Buffer>>,
84    wal_lsn: u64,
85    current_version: u64,
86    flush_in_progress_guard: FlushInProgressGuard,
87}
88
89/// Project a property map to a subset selected by `keys`. Used to
90/// run `touched_needs_full_read` against just the SET-touched keys
91/// when the caller passes a fully-merged `props` map.
92fn props_subset(props: &Properties, keys: &HashSet<String>) -> Properties {
93    let mut out = Properties::new();
94    for k in keys {
95        if let Some(v) = props.get(k) {
96            out.insert(k.clone(), v.clone());
97        }
98    }
99    out
100}
101
102/// Output of [`Writer::flush_stream_l1`]: the built (but not yet
103/// published) snapshot manifest and its id. Finalize is responsible
104/// for `save_snapshot` + `set_latest_snapshot` + `cached_manifest`
105/// update.
106struct FlushOutcome {
107    manifest: SnapshotManifest,
108    snapshot_id: String,
109}
110
111pub struct Writer {
112    pub l0_manager: Arc<L0Manager>,
113    pub storage: Arc<StorageManager>,
114    pub schema_manager: Arc<uni_common::core::schema::SchemaManager>,
115    pub allocator: Arc<IdAllocator>,
116    pub config: UniConfig,
117    /// Optional embedding runtime. `OnceLock` so the initializer can run
118    /// on `&self` after the `Writer` has been wrapped in `Arc<Writer>`
119    /// (Phase 4 of concurrent_writer.md). Read through
120    /// [`Writer::xervo_runtime`] — the field itself is private to keep
121    /// callers oblivious to the OnceLock representation.
122    xervo_runtime: OnceLock<Arc<ModelRuntime>>,
123    /// Property manager for cache invalidation after flush
124    pub property_manager: Option<Arc<PropertyManager>>,
125    /// Adjacency manager for dual-write (edges survive flush).
126    adjacency_manager: Arc<AdjacencyManager>,
127    /// Timestamp of last flush or creation. Interior-mutable so that
128    /// `&self` callers can update it; uncontended in practice because all
129    /// writes happen inside the single-flusher critical section.
130    /// Arc-wrapped so it can travel into the SharedFlushCtx that the
131    /// async-flush coordinator passes to spawned stream/finalize tasks.
132    last_flush_time: Arc<PlMutex<std::time::Instant>>,
133    /// Background compaction task handle (prevents concurrent compaction races)
134    compaction_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
135    /// Optional index rebuild manager for post-flush automatic rebuild scheduling.
136    /// `OnceLock` for the same reason as `xervo_runtime`.
137    /// Wrapped in `Arc` so the async-flush finalize path can read it
138    /// from a spawned task via `SharedFlushCtx`.
139    index_rebuild_manager: Arc<OnceLock<Arc<crate::storage::index_rebuild::IndexRebuildManager>>>,
140    /// Cached snapshot manifest from the last flush. Avoids re-reading from
141    /// object store on every flush_to_l1 call. Wrapped in a `Mutex` for
142    /// `&self` access; uncontended because all access is inside the
143    /// single-flusher critical section.
144    cached_manifest: Arc<PlMutex<Option<SnapshotManifest>>>,
145    /// Identifier of the fork this writer serves, if any. `None` for
146    /// primary's writer. Set by [`crate::fork::writer_factory::new_for_fork`]
147    /// and read in `flush_to_l1` to emit fork-tagged metrics and to fire
148    /// the fragment-count guard rail (Phase 2 Day 12).
149    pub fork_id: Option<ForkId>,
150    /// Number of `flush_to_l1` calls since this writer was constructed.
151    /// Used as a proxy for L1 fragment growth on the fork's branches:
152    /// each flush typically appends ~1 fragment per touched dataset, so
153    /// the count tracks the order of magnitude of fragment accumulation.
154    /// Reading the actual `Dataset::manifest().fragments.len()` per
155    /// flush would add a per-dataset object-store roundtrip on the hot
156    /// commit path; the proxy keeps the guard rail purely observational
157    /// (Phase 5 introduces fork compaction proper). Only meaningful when
158    /// `fork_id.is_some()`. `Relaxed` is sufficient — observational only.
159    fork_flush_count: Arc<AtomicU64>,
160    /// Whether the fork-fragment warning has already fired at the
161    /// configured threshold. One-shot per writer lifetime. `Relaxed` is
162    /// sufficient — observational only.
163    fork_fragment_warn_fired: Arc<AtomicBool>,
164    /// Dedicated lock for the genuinely-exclusive flush path. Acquired by
165    /// the [`Writer::flush_to_l1`] entry and by `commit_transaction_l0`
166    /// across its WAL-append + L0-merge window. Replaces the outer
167    /// `Arc<RwLock<Writer>>` for flush exclusion once Phase 4 drops it.
168    /// Arc-wrapped so async-flush coordinator's finalize path can
169    /// re-acquire it from a spawned task via SharedFlushCtx.
170    flush_lock: Arc<tokio::sync::Mutex<()>>,
171    /// Coordinator for async-flush pipeline. Owns the back-pressure
172    /// semaphore, rotate-order sequence, single-finalizer task, and
173    /// pending-flush counter. Always present even when async flush is
174    /// disabled — the sync `flush_to_l1` path uses it for the future
175    /// `FlushInProgressGuard`/permit ownership model.
176    /// Coordinator is `None` when `async_flush_enabled = false`. The
177    /// coordinator's finalizer task captures `SharedFlushCtx` which
178    /// includes `Arc<StorageManager>`; on a fork-scoped Writer that
179    /// also pins the fork's `ForkScope` via `storage.fork_scope`, so
180    /// the holder count never drops. Constructing it only when the
181    /// feature is actually on avoids that side-effect for all
182    /// existing sync-flush paths. When async-flush graduates from
183    /// opt-in to default (Commit 12), `drop_fork` (Commit 8) handles
184    /// the drain explicitly.
185    #[allow(dead_code)] // first production use lands in Commit 6/7
186    pub(crate) flush_coordinator: Option<Arc<crate::runtime::flush_coordinator::FlushCoordinator>>,
187    /// Optimistic-concurrency commit-sequence counter (SSI). Incremented once
188    /// per successful commit under `flush_lock`; a transaction captures the
189    /// current value at begin as its read sequence (`L0Buffer::occ_read_seq`).
190    ///
191    /// Always allocated; consulted only when `config.ssi_enabled` is `true`.
192    ///
193    /// Typed through the [`crate::runtime::sync`] shim so the OCC commit core can
194    /// be model-checked under loom/shuttle; aliases to `std::AtomicU64` normally.
195    commit_sequence: Arc<crate::runtime::sync::AtomicU64>,
196    /// Bounded log of recently-committed write-sets for OCC conflict detection.
197    /// Read and updated only under `flush_lock`.
198    ///
199    /// Always allocated; consulted only when `config.ssi_enabled` is `true`.
200    committed_writes: Arc<PlMutex<crate::runtime::occ::CommitRegistry>>,
201    /// Per-row pessimistic locks for `FOR UPDATE` (SSI escape hatch), keyed by
202    /// canonical (label, key-props) bytes. A transaction holds the lock from
203    /// MATCH until commit/rollback, serializing concurrent `FOR UPDATE` writers
204    /// on the same key (avoiding optimistic abort-retry on hot keys).
205    ///
206    /// Always allocated; populated only when `config.ssi_enabled` is `true`.
207    for_update_locks: Arc<dashmap::DashMap<Vec<u8>, Arc<tokio::sync::Mutex<()>>>>,
208}
209
210/// Number of recent commits retained for OCC conflict detection. Large enough
211/// that under-run — and the resulting conservative abort — is rare in practice;
212/// each entry is a small set of touched ids.
213const OCC_REGISTRY_CAPACITY: usize = 4096;
214
215impl Writer {
216    pub async fn new(
217        storage: Arc<StorageManager>,
218        schema_manager: Arc<uni_common::core::schema::SchemaManager>,
219        start_version: u64,
220    ) -> Result<Self> {
221        Self::new_with_config(
222            storage,
223            schema_manager,
224            start_version,
225            UniConfig::default(),
226            None,
227            None,
228        )
229        .await
230    }
231
232    pub async fn new_with_config(
233        storage: Arc<StorageManager>,
234        schema_manager: Arc<uni_common::core::schema::SchemaManager>,
235        start_version: u64,
236        config: UniConfig,
237        wal: Option<Arc<WriteAheadLog>>,
238        allocator: Option<Arc<IdAllocator>>,
239    ) -> Result<Self> {
240        let allocator = if let Some(a) = allocator {
241            a
242        } else {
243            let store = storage.store();
244            let path = object_store::path::Path::from("id_allocator.json");
245            Arc::new(IdAllocator::new(store, path, 1000).await?)
246        };
247
248        let l0_manager = Arc::new(L0Manager::new(start_version, wal));
249
250        let property_manager = Some(Arc::new(PropertyManager::new(
251            storage.clone(),
252            schema_manager.clone(),
253            1000,
254        )));
255
256        let adjacency_manager = storage.adjacency_manager();
257
258        // Hoist the Arc'd fields so we can both stash them on Writer and
259        // hand the same Arcs to the SharedFlushCtx that FlushCoordinator
260        // captures. Single-source-of-truth for each piece of mutable
261        // shared state.
262        let last_flush_time = Arc::new(PlMutex::new(std::time::Instant::now()));
263        let cached_manifest = Arc::new(PlMutex::new(None));
264        let fork_flush_count = Arc::new(AtomicU64::new(0));
265        let fork_fragment_warn_fired = Arc::new(AtomicBool::new(false));
266        let flush_lock = Arc::new(tokio::sync::Mutex::new(()));
267        let compaction_handle = Arc::new(RwLock::new(None));
268        let index_rebuild_manager: Arc<
269            OnceLock<Arc<crate::storage::index_rebuild::IndexRebuildManager>>,
270        > = Arc::new(OnceLock::new());
271
272        let flush_coordinator = if config.async_flush_enabled {
273            let shared = SharedFlushCtx {
274                storage: storage.clone(),
275                l0_manager: l0_manager.clone(),
276                adjacency_manager: adjacency_manager.clone(),
277                property_manager: property_manager.clone(),
278                schema_manager: schema_manager.clone(),
279                cached_manifest: cached_manifest.clone(),
280                last_flush_time: last_flush_time.clone(),
281                fork_id: None,
282                fork_flush_count: fork_flush_count.clone(),
283                fork_fragment_warn_fired: fork_fragment_warn_fired.clone(),
284                fork_fragment_warn_threshold: config.fork_fragment_warn_threshold,
285                flush_lock: flush_lock.clone(),
286                index_rebuild_manager: index_rebuild_manager.clone(),
287                compaction_handle: compaction_handle.clone(),
288                compaction_config: config.compaction.clone(),
289                index_rebuild_config: config.index_rebuild.clone(),
290                auto_rebuild_enabled: config.index_rebuild.auto_rebuild_enabled,
291            };
292            let finalize_fn: Arc<dyn FinalizeFn> = Arc::new(WriterFinalizer);
293            Some(Arc::new(FlushCoordinator::new(
294                config.max_pending_flushes,
295                shared,
296                finalize_fn,
297            )))
298        } else {
299            None
300        };
301
302        let commit_sequence = Arc::new(crate::runtime::sync::AtomicU64::new(0));
303        let committed_writes = Arc::new(PlMutex::new(crate::runtime::occ::CommitRegistry::new(
304            OCC_REGISTRY_CAPACITY,
305        )));
306        let for_update_locks = Arc::new(dashmap::DashMap::new());
307
308        Ok(Self {
309            l0_manager,
310            storage,
311            schema_manager,
312            allocator,
313            config,
314            xervo_runtime: OnceLock::new(),
315            property_manager,
316            adjacency_manager,
317            last_flush_time,
318            compaction_handle,
319            index_rebuild_manager,
320            cached_manifest,
321            fork_id: None,
322            fork_flush_count,
323            fork_fragment_warn_fired,
324            flush_lock,
325            flush_coordinator,
326            commit_sequence,
327            committed_writes,
328            for_update_locks,
329        })
330    }
331
332    /// Returns the shared pessimistic lock handle for a `FOR UPDATE` row key,
333    /// creating it on first use. The caller `.lock_owned().await`s the returned
334    /// mutex and holds the guard for the transaction's lifetime.
335    pub fn row_lock_handle(&self, key: &[u8]) -> Arc<tokio::sync::Mutex<()>> {
336        self.for_update_locks
337            .entry(key.to_vec())
338            .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
339            .clone()
340    }
341
342    /// Prunes `FOR UPDATE` lock-map entries for `keys` that no live transaction
343    /// holds anymore, so the map does not grow without bound across the keyspace.
344    ///
345    /// Called when a transaction ends, **after** its guards have been dropped.
346    /// `remove_if` evaluates its predicate under the DashMap shard lock, which is
347    /// the same lock `row_lock_handle` takes to clone an entry — so the check
348    /// `strong_count == 1` (only the map holds the `Arc`) is race-free: a
349    /// concurrent acquirer either already cloned the `Arc` (count ≥ 2 → we skip
350    /// removal) or has not yet taken the shard lock (it will mint a fresh entry
351    /// after we remove). Either way no two transactions ever lock different
352    /// `Mutex` instances for the same key.
353    pub fn release_for_update_locks(&self, keys: &[Vec<u8>]) {
354        for key in keys {
355            self.for_update_locks
356                .remove_if(key, |_, handle| Arc::strong_count(handle) == 1);
357        }
358    }
359
360    /// Number of live entries in the `FOR UPDATE` lock map. Introspection for
361    /// tests that the map does not leak entries across transactions (G5).
362    pub fn for_update_lock_count(&self) -> usize {
363        self.for_update_locks.len()
364    }
365
366    /// The current OCC commit sequence. A `FOR UPDATE` acquisition re-stamps a
367    /// fresh transaction's `occ_read_seq` to this so its conflict-detection
368    /// baseline advances to lock-acquisition time (read-latest under the lock).
369    pub fn current_commit_sequence(&self) -> u64 {
370        self.commit_sequence
371            .load(crate::runtime::sync::Ordering::Relaxed)
372    }
373
374    /// Build a fresh `SharedFlushCtx` from this Writer's current state.
375    /// Used by the async-flush stream/finalize paths to pass into spawned
376    /// tasks without smuggling `Arc<Writer>` (which would create a cycle
377    /// with `flush_coordinator -> FinalizeFn -> Writer`).
378    pub(crate) fn shared_ctx(&self) -> SharedFlushCtx {
379        SharedFlushCtx {
380            storage: self.storage.clone(),
381            l0_manager: self.l0_manager.clone(),
382            adjacency_manager: self.adjacency_manager.clone(),
383            property_manager: self.property_manager.clone(),
384            schema_manager: self.schema_manager.clone(),
385            cached_manifest: self.cached_manifest.clone(),
386            last_flush_time: self.last_flush_time.clone(),
387            fork_id: self.fork_id,
388            fork_flush_count: self.fork_flush_count.clone(),
389            fork_fragment_warn_fired: self.fork_fragment_warn_fired.clone(),
390            fork_fragment_warn_threshold: self.config.fork_fragment_warn_threshold,
391            flush_lock: self.flush_lock.clone(),
392            index_rebuild_manager: self.index_rebuild_manager.clone(),
393            compaction_handle: self.compaction_handle.clone(),
394            compaction_config: self.config.compaction.clone(),
395            index_rebuild_config: self.config.index_rebuild.clone(),
396            auto_rebuild_enabled: self.config.index_rebuild.auto_rebuild_enabled,
397        }
398    }
399
400    /// Borrow the flush coordinator if async flush is enabled.
401    /// Returns `None` when `config.async_flush_enabled = false`.
402    /// External callers (`drop_fork`) use this to drain pending streams.
403    pub fn flush_coordinator(
404        &self,
405    ) -> Option<&Arc<crate::runtime::flush_coordinator::FlushCoordinator>> {
406        self.flush_coordinator.as_ref()
407    }
408
409    /// Set the index rebuild manager for post-flush automatic rebuild scheduling.
410    ///
411    /// One-shot: returns `Err` if already set. The receiver is `&self` so this
412    /// can be called after the `Writer` has been wrapped in `Arc<Writer>`.
413    pub fn set_index_rebuild_manager(
414        &self,
415        manager: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
416    ) -> Result<()> {
417        self.index_rebuild_manager
418            .set(manager)
419            .map_err(|_| anyhow!("index_rebuild_manager already set"))
420    }
421
422    /// Replay WAL mutations into the current L0 buffer.
423    pub async fn replay_wal(&self, wal_high_water_mark: u64) -> Result<usize> {
424        let l0 = self.l0_manager.get_current();
425        let wal = l0.read().wal.clone();
426
427        if let Some(wal) = wal {
428            wal.initialize().await?;
429            let mutations = wal.replay_since(wal_high_water_mark).await?;
430            let count = mutations.len();
431
432            if count > 0 {
433                log::info!(
434                    "Replaying {} mutations from WAL (LSN > {})",
435                    count,
436                    wal_high_water_mark
437                );
438                let mut l0_guard = l0.write();
439                l0_guard.replay_mutations(mutations)?;
440                // Rebuild the UNIQUE constraint index over the recovered rows
441                // (Bug #9 Mechanism B). `replay_mutations` restores
442                // vertices/properties/labels but never repopulates
443                // `constraint_index` (its only other caller is the live insert
444                // path). Without this, a unique key that lives only in the WAL
445                // (committed but not yet flushed to Lance) is invisible to
446                // `check_unique_constraint_multi` after recovery and a
447                // duplicate of it could be created.
448                self.rebuild_constraint_index(&mut l0_guard);
449            }
450
451            Ok(count)
452        } else {
453            Ok(0)
454        }
455    }
456
457    /// Rebuild the UNIQUE constraint index on a recovered L0 buffer.
458    ///
459    /// Scans every recovered vertex's properties and, for each enabled UNIQUE
460    /// constraint whose target label the vertex carries and whose member
461    /// properties are all present, inserts the same constraint key the live
462    /// insert path builds (`serialize_constraint_key`). Tombstoned vertices are
463    /// skipped. Called after [`L0Buffer::replay_mutations`] under the buffer's
464    /// write lock; the schema is already loaded on the `Writer`.
465    fn rebuild_constraint_index(&self, l0_guard: &mut L0Buffer) {
466        let schema = self.schema_manager.schema();
467        // Collect entries first to avoid borrowing `vertex_properties`
468        // immutably while mutating `constraint_index` through the same guard.
469        let mut keys: Vec<(Vec<u8>, Vid)> = Vec::new();
470        for (&vid, props) in &l0_guard.vertex_properties {
471            if l0_guard.vertex_tombstones.contains(&vid) {
472                continue;
473            }
474            let Some(labels) = l0_guard.vertex_labels.get(&vid) else {
475                continue;
476            };
477            for label in labels {
478                for constraint in &schema.constraints {
479                    if !constraint.enabled {
480                        continue;
481                    }
482                    let ConstraintTarget::Label(l) = &constraint.target else {
483                        continue;
484                    };
485                    if l != label {
486                        continue;
487                    }
488                    let ConstraintType::Unique {
489                        properties: unique_props,
490                    } = &constraint.constraint_type
491                    else {
492                        continue;
493                    };
494                    let mut key_values = Vec::new();
495                    let mut all_present = true;
496                    for prop in unique_props {
497                        if let Some(val) = props.get(prop) {
498                            key_values.push((prop.clone(), val.clone()));
499                        } else {
500                            all_present = false;
501                            break;
502                        }
503                    }
504                    if all_present {
505                        keys.push((serialize_constraint_key(label, &key_values), vid));
506                    }
507                }
508            }
509        }
510        for (key, vid) in keys {
511            l0_guard.insert_constraint_key(key, vid);
512        }
513    }
514
515    /// Allocates the next VID (pure auto-increment).
516    pub async fn next_vid(&self) -> Result<Vid> {
517        self.allocator.allocate_vid().await
518    }
519
520    /// Allocates multiple VIDs at once for bulk operations.
521    /// This is more efficient than calling next_vid() in a loop.
522    pub async fn allocate_vids(&self, count: usize) -> Result<Vec<Vid>> {
523        self.allocator.allocate_vids(count).await
524    }
525
526    /// Allocates the next EID (pure auto-increment).
527    pub async fn next_eid(&self, _type_id: u32) -> Result<Eid> {
528        self.allocator.allocate_eid().await
529    }
530
531    /// Allocates multiple EIDs at once for bulk operations.
532    /// This is more efficient than calling next_eid() in a loop.
533    pub async fn allocate_eids(&self, count: usize) -> Result<Vec<Eid>> {
534        self.allocator.allocate_eids(count).await
535    }
536
537    /// Install the embedding runtime exactly once. Receiver is `&self` so it
538    /// can be called after the `Writer` has been wrapped in `Arc<Writer>`.
539    pub fn set_xervo_runtime(&self, runtime: Arc<ModelRuntime>) -> Result<()> {
540        self.xervo_runtime
541            .set(runtime)
542            .map_err(|_| anyhow!("xervo_runtime already set"))
543    }
544
545    pub fn xervo_runtime(&self) -> Option<Arc<ModelRuntime>> {
546        self.xervo_runtime.get().cloned()
547    }
548
549    /// Create a new empty L0 buffer for transaction-scoped mutations.
550    ///
551    /// Only reads the current version — no exclusive lock required on Writer.
552    /// The returned buffer has no WAL reference; mutations are logged at
553    /// commit time via [`Self::commit_transaction_l0`].
554    pub fn create_transaction_l0(&self) -> Arc<RwLock<L0Buffer>> {
555        let current_version = self.l0_manager.get_current().read().current_version;
556        // Transaction mutations are logged to WAL at COMMIT time, not during the transaction.
557        let buf = L0Buffer::new(current_version, None);
558        // SSI: stamp the OCC read sequence at begin so commit can detect any
559        // transaction that committed since. Gated on the runtime `ssi_enabled`
560        // toggle — when off, `occ_read_set` stays `None` and every downstream
561        // read-set recording / commit validation self-gates to a no-op.
562        let buf = if self.config.ssi_enabled {
563            let mut buf = buf;
564            buf.occ_read_seq = self
565                .commit_sequence
566                .load(crate::runtime::sync::Ordering::Relaxed);
567            // The read path records observed ids here for SSI antidependency
568            // detection; commit consults it.
569            buf.occ_read_set = Some(Arc::new(parking_lot::Mutex::new(
570                crate::runtime::l0::OccReadSet::default(),
571            )));
572            buf
573        } else {
574            buf
575        };
576        Arc::new(RwLock::new(buf))
577    }
578
579    /// Resolve the target L0 buffer for a mutation.
580    ///
581    /// When `tx_l0` is `Some`, the mutation targets a transaction-private buffer.
582    /// When `None`, it targets the global L0 from the manager.
583    fn resolve_l0(&self, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) -> Arc<RwLock<L0Buffer>> {
584        tx_l0
585            .cloned()
586            .unwrap_or_else(|| self.l0_manager.get_current())
587    }
588
589    fn update_metrics(&self) {
590        let l0 = self.l0_manager.get_current();
591        let size = l0.read().estimated_size;
592        metrics::gauge!("l0_buffer_size_bytes").set(size as f64);
593    }
594
595    /// Overlay-aware issue-#77 edge-endpoint validation.
596    ///
597    /// The current buffer alone does not hold all committed-but-unflushed
598    /// tombstones — a flush rotation moves them onto `pending_flush` until
599    /// the Lance write completes. A vertex is effectively deleted iff,
600    /// walking newest-first (tx → current → pending newest→oldest), the
601    /// first buffer that knows the vid says "tombstoned" (an insert clears
602    /// the tombstone within a buffer, so props/tombstone are mutually
603    /// exclusive per buffer).
604    ///
605    /// Must run under `flush_lock` so the overlay cannot change before the
606    /// merge, and BEFORE the durable WAL flush (see the call site).
607    fn validate_edge_endpoints_overlay(&self, tx_l0: &L0Buffer) -> Result<()> {
608        let chain = self.l0_manager.get_pending_flush();
609        let current = self.l0_manager.get_current();
610        let effectively_deleted = |vid: &Vid| -> bool {
611            if tx_l0.vertex_properties.contains_key(vid) {
612                return false;
613            }
614            if tx_l0.vertex_tombstones.contains(vid) {
615                return true;
616            }
617            {
618                let cur = current.read();
619                if cur.vertex_properties.contains_key(vid) {
620                    return false;
621                }
622                if cur.vertex_tombstones.contains(vid) {
623                    return true;
624                }
625            }
626            for frozen in chain.iter().rev() {
627                let g = frozen.read();
628                if g.vertex_properties.contains_key(vid) {
629                    return false;
630                }
631                if g.vertex_tombstones.contains(vid) {
632                    return true;
633                }
634            }
635            false
636        };
637        for (eid, (src_vid, dst_vid, _etype)) in &tx_l0.edge_endpoints {
638            if tx_l0.tombstones.contains_key(eid) {
639                continue; // a deletion, not an insertion — never resurrects a vertex
640            }
641            if effectively_deleted(src_vid) {
642                anyhow::bail!(
643                    "Cannot insert edge {}: source vertex {} has been deleted (issue #77)",
644                    eid,
645                    src_vid
646                );
647            }
648            if effectively_deleted(dst_vid) {
649                anyhow::bail!(
650                    "Cannot insert edge {}: destination vertex {} has been deleted (issue #77)",
651                    eid,
652                    dst_vid
653                );
654            }
655        }
656        Ok(())
657    }
658
659    /// Seed `main_l0` (the current buffer) with the newest pending-overlay
660    /// value for each CRDT property the transaction writes, so the commit
661    /// merge MERGES against the committed CRDT state instead of shadowing it
662    /// (the carve-out lets concurrent CRDT writers commit on the assumption
663    /// that the merge sees the committed value — true only while that value
664    /// lives in current, not mid-flush on `pending_flush`). No-op when
665    /// nothing is pending or the property already exists in current. Vertex
666    /// properties only, mirroring the carve-out itself.
667    fn seed_crdt_state_from_chain(&self, tx_l0: &L0Buffer, main_l0: &mut L0Buffer) {
668        let chain = self.l0_manager.get_pending_flush();
669        if chain.is_empty() {
670            return;
671        }
672        for (vid, props) in &tx_l0.vertex_properties {
673            Self::seed_crdt_props(&chain, *vid, props, main_l0);
674        }
675    }
676
677    /// Per-vertex CRDT seeding (see [`Self::seed_crdt_state_from_chain`]).
678    /// Also used by the non-transactional vertex write path, which CRDT-merges
679    /// into the current buffer directly and has the same shadowing hazard
680    /// during a flush window.
681    fn seed_crdt_props(
682        chain: &[Arc<RwLock<L0Buffer>>],
683        vid: Vid,
684        props: &Properties,
685        target: &mut L0Buffer,
686    ) {
687        for (key, value) in props {
688            if crate::runtime::l0::try_as_crdt(value).is_none() {
689                continue;
690            }
691            if target
692                .vertex_properties
693                .get(&vid)
694                .is_some_and(|p| p.contains_key(key))
695            {
696                continue;
697            }
698            // Newest generation first: the first hit is the live state.
699            for frozen in chain.iter().rev() {
700                let g = frozen.read();
701                if let Some(v) = g.vertex_properties.get(&vid).and_then(|p| p.get(key)) {
702                    if crate::runtime::l0::try_as_crdt(v).is_some() {
703                        target
704                            .vertex_properties
705                            .entry(vid)
706                            .or_default()
707                            .insert(key.clone(), v.clone());
708                    }
709                    break;
710                }
711            }
712        }
713    }
714
715    /// Commit an externally-owned transaction L0 buffer.
716    ///
717    /// Writes mutations to WAL, flushes, merges into main L0, and replays
718    /// edges into the AdjacencyManager. Returns the WAL LSN of the commit
719    /// (0 when no WAL is configured).
720    /// Commit a transaction's private L0 buffer into main L0.
721    ///
722    /// Returns `(wal_lsn, flush_pending)`. When `flush_pending == true`, the
723    /// post-commit `should_flush()` predicate fired but no flush ran — the
724    /// caller is expected to spawn a background `flush_to_l1`. This is the
725    /// shape used when `UniConfig::async_flush_enabled` is set, so commits
726    /// don't block on L1-streaming I/O.
727    pub async fn commit_transaction_l0(
728        self: &Arc<Self>,
729        tx_l0_arc: Arc<RwLock<L0Buffer>>,
730    ) -> Result<(u64, bool)> {
731        // Hold `flush_lock` across WAL append + flush + main-L0 merge.
732        // Two concurrent commits serialize here; in Phase 3 the outer
733        // `Arc<RwLock<Writer>>` already provides this exclusion, so the
734        // acquisition is uncontended. Phase 4 drops the outer lock and
735        // this becomes the load-bearing serialization point.
736        let _flush_lock_guard = self.flush_lock.lock().await;
737
738        // Crash-recovery seam: simulate process death immediately after winning
739        // the commit serialization point but before any durable work. No-op
740        // unless built with `--features failpoints`. (See ssi_resilience tests.)
741        fail::fail_point!("commit::after-flush-lock");
742
743        // SSI: optimistic conflict detection. This MUST run before any WAL
744        // write — `flush_wal()` below is the durable commit point and the WAL
745        // has no abort marker, so aborting after it would resurrect this
746        // transaction on crash recovery. The write-set is reused for
747        // registration after a successful merge.
748        // Runtime-gated on `config.ssi_enabled`. When off, no validation runs
749        // and `occ_write_set` is `None`, so the post-merge registration below
750        // is skipped — reproducing last-writer-wins exactly.
751        let occ_write_set: Option<crate::runtime::occ::WriteSet> = if self.config.ssi_enabled {
752            let tx_l0 = tx_l0_arc.read();
753            let read_seq = tx_l0.occ_read_seq;
754            let write_set = crate::runtime::occ::WriteSet::from_l0(&tx_l0);
755            if !write_set.is_empty() {
756                // Telemetry: one validation per non-empty (writing) commit. The
757                // ratio of conflicts to validations is the headline abort rate.
758                metrics::counter!("uni_ssi_commit_validations_total").increment(1);
759                // Read-set is consulted only for writing transactions, so a
760                // read-only commit (empty write-set) runs at snapshot isolation.
761                let read_guard = tx_l0.occ_read_set.as_ref().map(|rs| rs.lock());
762                if let Some(conflict) =
763                    self.committed_writes
764                        .lock()
765                        .check(read_seq, &write_set, read_guard.as_deref())
766                {
767                    use crate::runtime::occ::Conflict;
768                    match &conflict {
769                        Conflict::WriteWrite { .. } => metrics::counter!(
770                            "uni_ssi_serialization_conflicts_total",
771                            "kind" => "write_write",
772                        )
773                        .increment(1),
774                        Conflict::ReadWrite { .. } => metrics::counter!(
775                            "uni_ssi_serialization_conflicts_total",
776                            "kind" => "read_write",
777                        )
778                        .increment(1),
779                        Conflict::HistoryTruncated { .. } => {
780                            metrics::counter!("uni_ssi_history_truncated_total").increment(1)
781                        }
782                    }
783                    return Err(anyhow::Error::new(
784                        uni_common::UniError::SerializationConflict {
785                            message: conflict.to_string(),
786                        },
787                    ));
788                }
789            }
790
791            // Validate against the committed-but-unflushed overlay under
792            // `flush_lock`: serializable MERGE uniqueness + CRDT carve-out
793            // soundness. The current buffer alone does not hold all committed
794            // state — a flush rotation moves it onto `pending_flush` until the
795            // Lance write completes (the Bug #9A window, here at the
796            // commit-time layer) — so every check walks [current, pending…].
797            {
798                let pending = self.l0_manager.get_pending_flush();
799                let main_l0 = self.l0_manager.get_current();
800                let overlay: Vec<Arc<RwLock<L0Buffer>>> =
801                    std::iter::once(main_l0).chain(pending).collect();
802
803                // SSI / serializable MERGE: abort if a concurrent transaction has
804                // already committed a row with one of this transaction's unique
805                // keys. Commits serialize here, so this closes the race window
806                // left by the per-insert check. (Empty index → no iterations.)
807                for (key, vid) in &tx_l0.constraint_index {
808                    if overlay
809                        .iter()
810                        .any(|b| b.read().has_constraint_key(key, *vid))
811                    {
812                        metrics::counter!("uni_ssi_constraint_conflicts_total").increment(1);
813                        return Err(anyhow::Error::new(
814                            uni_common::UniError::ConstraintConflict {
815                                message: "unique key already committed by a concurrent \
816                                          transaction"
817                                    .to_string(),
818                            },
819                        ));
820                    }
821                }
822
823                // Implicit MERGE phantom guard: a `MERGE` that *created* a node
824                // registered its (label, key-props) here even with no declared
825                // UNIQUE constraint. If a concurrent transaction already committed
826                // the same MERGE key, abort retriably so the two converge to one
827                // node on retry (the loser's MATCH then finds the committed row).
828                // Only MERGE-creates register keys, so a plain CREATE of the same
829                // properties never lands here. (Empty index → no iterations.)
830                for (key, vid) in &tx_l0.merge_guard_index {
831                    if overlay
832                        .iter()
833                        .any(|b| b.read().has_merge_guard_key(key, *vid))
834                    {
835                        metrics::counter!("uni_ssi_constraint_conflicts_total").increment(1);
836                        return Err(anyhow::Error::new(
837                            uni_common::UniError::ConstraintConflict {
838                                message: "MERGE key already committed by a concurrent \
839                                          transaction"
840                                    .to_string(),
841                            },
842                        ));
843                    }
844                }
845
846                // Same race window for global ext_id uniqueness: the per-insert
847                // check ran against an older main L0; re-probe the committed
848                // index here, where commits serialize.
849                for (ext_id, vid) in &tx_l0.extid_index {
850                    let taken = overlay.iter().any(|b| {
851                        matches!(b.read().extid_index.get(ext_id), Some(&owner) if owner != *vid)
852                    });
853                    if taken {
854                        metrics::counter!("uni_ssi_constraint_conflicts_total").increment(1);
855                        return Err(anyhow::Error::new(
856                            uni_common::UniError::ConstraintConflict {
857                                message: format!(
858                                    "ext_id '{ext_id}' already committed by a concurrent \
859                                     transaction"
860                                ),
861                            },
862                        ));
863                    }
864                }
865
866                // CRDT carve-out soundness: a pure-CRDT write was dropped from the
867                // write-set assuming its merge commutes. If the overlay holds a
868                // *different* CRDT variant for the same property, the merge would
869                // silently overwrite it — abort instead of losing the update.
870                // (Checked against every overlay buffer: conservative if an old
871                // generation held a different variant that a newer commit already
872                // replaced, but an abort+retry is always sound.)
873                for buf in &overlay {
874                    if let Some(conflict) =
875                        crate::runtime::occ::crdt_carveout_overwrite(&tx_l0, &buf.read())
876                    {
877                        metrics::counter!("uni_ssi_crdt_aborts_total").increment(1);
878                        return Err(anyhow::Error::new(
879                            uni_common::UniError::SerializationConflict {
880                                message: conflict.to_string(),
881                            },
882                        ));
883                    }
884                }
885            }
886            Some(write_set)
887        } else {
888            None
889        };
890
891        // Issue #77: an edge whose endpoint is effectively deleted makes the
892        // merge below bail. That bail MUST happen before the durable WAL flush —
893        // after it the transaction is committed-but-unmerged (a ghost commit),
894        // and WAL replay re-hits the same bail, making the database unopenable.
895        // SSI validation was deliberately placed before the flush for exactly
896        // this reason; the endpoint check belongs here too. Runs unconditionally
897        // (issue #77 is not SSI-gated) under `flush_lock`, so the overlay
898        // tombstone state cannot change between here and the merge. A
899        // tombstone may live in a flush-rotated pending buffer rather than
900        // the current buffer, so the check walks the overlay newest-first.
901        {
902            let tx_l0 = tx_l0_arc.read();
903            self.validate_edge_endpoints_overlay(&tx_l0)?;
904        }
905
906        // Crash-recovery seam: SSI validation has passed; the transaction is
907        // about to become durable. A crash here must leave NO trace (validation
908        // happens before the WAL is touched). No-op unless `failpoints`.
909        fail::fail_point!("commit::after-validate");
910
911        // 1. Write transaction mutations to WAL BEFORE merging into main L0
912        // This ensures durability before visibility.
913        {
914            let tx_l0 = tx_l0_arc.read();
915            let main_l0_arc = self.l0_manager.get_current();
916            let main_l0 = main_l0_arc.read();
917
918            // If WAL exists, write mutations to it for durability
919            if let Some(wal) = main_l0.wal.as_ref() {
920                // Order: vertices first, then edges (to ensure src/dst exist on replay)
921
922                // Vertex insertions
923                for (vid, properties) in &tx_l0.vertex_properties {
924                    if !tx_l0.vertex_tombstones.contains(vid) {
925                        let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
926                        wal.append(crate::runtime::wal::Mutation::InsertVertex {
927                            vid: *vid,
928                            properties: properties.clone(),
929                            labels,
930                        })?;
931                    }
932                }
933
934                // Vertex deletions
935                for vid in &tx_l0.vertex_tombstones {
936                    let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
937                    wal.append(crate::runtime::wal::Mutation::DeleteVertex { vid: *vid, labels })?;
938                }
939
940                // Label-only mutations (SET n:Label / REMOVE n:Label). After
941                // vertex inserts (so the vertex exists on replay), before edges,
942                // and skipping vertices deleted in this same commit.
943                for vid in &tx_l0.vertex_label_overwrites {
944                    if tx_l0.vertex_tombstones.contains(vid) {
945                        continue;
946                    }
947                    let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
948                    wal.append(crate::runtime::wal::Mutation::SetVertexLabels {
949                        vid: *vid,
950                        labels,
951                    })?;
952                }
953
954                // Crash-recovery seam: vertices appended, edges not yet. Tests
955                // assert that a crash here (before `flush_wal`) recovers NOTHING
956                // — the durable commit point is the flush below, not append.
957                fail::fail_point!("commit::mid-wal");
958
959                // Edge insertions and deletions from edge_endpoints
960                for (eid, (src_vid, dst_vid, edge_type)) in &tx_l0.edge_endpoints {
961                    if tx_l0.tombstones.contains_key(eid) {
962                        let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
963                        wal.append(crate::runtime::wal::Mutation::DeleteEdge {
964                            eid: *eid,
965                            src_vid: *src_vid,
966                            dst_vid: *dst_vid,
967                            edge_type: *edge_type,
968                            version,
969                        })?;
970                    } else {
971                        let properties =
972                            tx_l0.edge_properties.get(eid).cloned().unwrap_or_default();
973                        let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
974                        let edge_type_name = tx_l0.edge_types.get(eid).cloned();
975                        wal.append(crate::runtime::wal::Mutation::InsertEdge {
976                            src_vid: *src_vid,
977                            dst_vid: *dst_vid,
978                            edge_type: *edge_type,
979                            eid: *eid,
980                            version,
981                            properties,
982                            edge_type_name,
983                        })?;
984                    }
985                }
986
987                // Tombstones for edges that only exist in the global L0 (not in
988                // this transaction's edge_endpoints).  Without this, deletes of
989                // pre-existing edges would be silently lost.
990                for (eid, tombstone) in &tx_l0.tombstones {
991                    if !tx_l0.edge_endpoints.contains_key(eid) {
992                        let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
993                        wal.append(crate::runtime::wal::Mutation::DeleteEdge {
994                            eid: *eid,
995                            src_vid: tombstone.src_vid,
996                            dst_vid: tombstone.dst_vid,
997                            edge_type: tombstone.edge_type,
998                            version,
999                        })?;
1000                    }
1001                }
1002            }
1003        }
1004
1005        // 2. Flush WAL to durable storage - THIS IS THE COMMIT POINT
1006        let wal_lsn = self.flush_wal().await?;
1007
1008        // Crash-recovery seam: the WAL is durable but main L0 has NOT merged.
1009        // A crash here must RECOVER the transaction on replay (it is committed),
1010        // even though it was never made visible in-process. No-op unless `failpoints`.
1011        fail::fail_point!("commit::after-wal-flush");
1012
1013        // Component C1: if an outstanding snapshot pins the current generation,
1014        // clone it aside (lazy copy-on-write) before merging, so the pinning
1015        // transaction's reads stay isolated from this commit. No-op — and zero
1016        // cost — when nothing is pinned (the common case). We hold `flush_lock`,
1017        // so this cannot race a flush rotate or another commit's merge; the merge
1018        // below re-fetches `get_current()`, landing in the fresh post-freeze buffer.
1019        // Self-gates on the runtime SSI toggle: a snapshot is only ever pinned by
1020        // a transaction begun under `ssi_enabled`, so `is_current_pinned()` is
1021        // always false when SSI is off and this is a zero-cost no-op.
1022        if self.l0_manager.is_current_pinned() {
1023            self.l0_manager.freeze_current_for_snapshot();
1024            metrics::counter!("uni_l0_snapshot_freezes_total").increment(1);
1025        }
1026
1027        // 3. Merge into main L0 and make visible
1028        {
1029            // Write-lock the tx buffer: `merge_take` moves its property maps
1030            // into main L0 instead of cloning them. The commit consumes the
1031            // transaction, so the drained maps are never observed afterwards;
1032            // everything read below (endpoints, versions, tombstones) is left
1033            // intact.
1034            let mut tx_l0 = tx_l0_arc.write();
1035            let main_l0_arc = self.l0_manager.get_current();
1036            let mut main_l0 = main_l0_arc.write();
1037            // A CRDT property's committed state may live only in a
1038            // flush-rotated pending buffer (the post-rotation current is
1039            // empty until the Lance write completes — the Bug #9A window).
1040            // `merge_crdt_properties` merges against the CURRENT buffer's
1041            // value — without seeding, the tx's CRDT state would SHADOW the
1042            // pending buffer's at read time (newest buffer wins per property)
1043            // and concurrent increments would be lost. Seed the newest
1044            // overlay value for each CRDT property the tx writes that current
1045            // lacks, so the merge below merges instead of replaces.
1046            self.seed_crdt_state_from_chain(&tx_l0, &mut main_l0);
1047            main_l0.merge_take(&mut tx_l0)?;
1048
1049            // Replay transaction edges into the AdjacencyManager overlay
1050            for (eid, (src, dst, etype)) in &tx_l0.edge_endpoints {
1051                let edge_version = tx_l0
1052                    .edge_versions
1053                    .get(eid)
1054                    .copied()
1055                    .unwrap_or(main_l0.current_version);
1056                if tx_l0.tombstones.contains_key(eid) {
1057                    self.adjacency_manager
1058                        .add_tombstone(*eid, *src, *dst, *etype, edge_version);
1059                } else {
1060                    self.adjacency_manager
1061                        .insert_edge(*src, *dst, *eid, *etype, edge_version);
1062                }
1063            }
1064
1065            // Replay tombstones for edges that only exist in the global L0
1066            // (not in this transaction's edge_endpoints).
1067            for (eid, tombstone) in &tx_l0.tombstones {
1068                if !tx_l0.edge_endpoints.contains_key(eid) {
1069                    let edge_version = tx_l0
1070                        .edge_versions
1071                        .get(eid)
1072                        .copied()
1073                        .unwrap_or(main_l0.current_version);
1074                    self.adjacency_manager.add_tombstone(
1075                        *eid,
1076                        tombstone.src_vid,
1077                        tombstone.dst_vid,
1078                        tombstone.edge_type,
1079                        edge_version,
1080                    );
1081                }
1082            }
1083        }
1084
1085        // Crash-recovery seam: durable AND merged, but the in-memory commit
1086        // registry has not recorded this write-set yet. A crash here is
1087        // indistinguishable from one at `after-wal-flush` on reopen (the
1088        // registry is in-memory and rebuilt empty); the tx still recovers.
1089        fail::fail_point!("commit::after-merge");
1090
1091        // SSI: register this commit's write-set under a fresh commit sequence so
1092        // later transactions detect conflicts against it. Still under
1093        // `flush_lock`, before the async-flush branch can drop the guard.
1094        // `occ_write_set` is `Some` only when `config.ssi_enabled`.
1095        if let Some(write_set) = occ_write_set
1096            && !write_set.is_empty()
1097        {
1098            // Bump-then-record via the shared OCC seam (see `CommitRegistry::commit`)
1099            // so production and the loom/shuttle models exercise identical logic.
1100            self.committed_writes
1101                .lock()
1102                .commit(&self.commit_sequence, write_set);
1103        }
1104
1105        self.update_metrics();
1106
1107        // 4. Best-effort post-commit auto-flush.
1108        //
1109        // Two paths:
1110        // - async_flush_enabled = false (default): inline under our
1111        //   existing flush_lock guard via flush_inline_under_lock.
1112        // - async_flush_enabled = true: rotate inline, drop flush_lock,
1113        //   then submit the stream phase to the coordinator. Gated on
1114        //   `pending_flush_count() < max_pending_flushes` so we don't
1115        //   stack up rotations beyond the configured pipeline depth.
1116        //   `try_acquire_permit` is non-blocking: if we lose the race
1117        //   for the last permit, we just skip this trigger (the next
1118        //   commit retries).
1119        let mut flush_pending = false;
1120        if self.should_flush() {
1121            if self.config.async_flush_enabled
1122                && let Some(coord) = self.flush_coordinator.as_ref()
1123                && coord.pending_flush_count() < self.config.max_pending_flushes
1124            {
1125                match coord.try_acquire_permit() {
1126                    Some(permit) => {
1127                        match self.flush_l0_rotate().await {
1128                            Ok(rotate_out) => {
1129                                // Allocate the rotate seq and bump pending ONLY
1130                                // after the rotate succeeds (Bug #3). A failed
1131                                // rotate must consume neither: the finalizer
1132                                // advances strictly in consecutive seq order and
1133                                // only decrements pending on finalize, so a
1134                                // leaked seq/pending from a failed rotate would
1135                                // wedge the finalizer forever and climb pending
1136                                // toward `max_pending_flushes`. The seq is still
1137                                // allocated under `flush_lock` (immediately after
1138                                // the rotate, before the guard drops below), so
1139                                // concurrent rotates keep seq order == rotation
1140                                // order, and the seq is not used until submit.
1141                                let seq = coord.next_rotate_seq();
1142                                coord.note_pending();
1143                                // Release flush_lock BEFORE the spawn so concurrent
1144                                // commits can proceed while the stream runs.
1145                                drop(_flush_lock_guard);
1146                                let parent_manifest = self.cached_manifest.lock().clone();
1147                                let rotated = crate::runtime::flush_coordinator::RotatedFlush {
1148                                    seq,
1149                                    old_l0_arc: rotate_out.old_l0_arc.clone(),
1150                                    wal_lsn: rotate_out.wal_lsn,
1151                                    current_version: rotate_out.current_version,
1152                                    name: None,
1153                                    parent_manifest,
1154                                    permit,
1155                                    flush_in_progress_guard: rotate_out.flush_in_progress_guard,
1156                                };
1157                                let writer = self.clone();
1158                                let _ticket = coord.submit_for_stream(
1159                                    rotated,
1160                                    move |old_l0, wal, ver, n| async move {
1161                                        let outcome =
1162                                            writer.flush_stream_l1(old_l0, wal, ver, n).await?;
1163                                        Ok(crate::runtime::flush_coordinator::FlushOutcome {
1164                                            new_manifest: outcome.manifest,
1165                                            snapshot_id: outcome.snapshot_id,
1166                                        })
1167                                    },
1168                                );
1169                                flush_pending = true;
1170                                // Early return — flush_lock already dropped.
1171                                return Ok((wal_lsn, flush_pending));
1172                            }
1173                            Err(e) => {
1174                                tracing::warn!("Async rotate failed (non-critical): {}", e);
1175                                // No seq was allocated and pending was not
1176                                // bumped (both moved into the Ok arm for Bug
1177                                // #3), so the finalizer is not wedged. The
1178                                // permit drops here, freeing the slot.
1179                            }
1180                        }
1181                    }
1182                    None => {
1183                        // Race: someone else grabbed the last permit. Skip;
1184                        // next commit will retry should_flush().
1185                        metrics::counter!("uni_flush_trigger_skipped_total").increment(1);
1186                    }
1187                }
1188            } else if let Err(e) = self.flush_inline_under_lock(None).await {
1189                tracing::warn!("Post-commit flush check failed (non-critical): {}", e);
1190            }
1191        }
1192
1193        Ok((wal_lsn, flush_pending))
1194    }
1195
1196    /// Flush the WAL buffer to durable storage.
1197    ///
1198    /// Returns the LSN of the flushed segment, or `0` when no WAL is configured.
1199    pub async fn flush_wal(&self) -> Result<u64> {
1200        let l0 = self.l0_manager.get_current();
1201        let wal = l0.read().wal.clone();
1202
1203        match wal {
1204            Some(wal) => Ok(wal.flush().await?),
1205            None => Ok(0),
1206        }
1207    }
1208
1209    /// Record property removals in the active L0 mutation stats.
1210    ///
1211    /// Routes to the transaction L0 if provided, otherwise to the main L0.
1212    pub fn track_properties_removed(&self, count: usize, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) {
1213        if count == 0 {
1214            return;
1215        }
1216        let l0 = self.resolve_l0(tx_l0);
1217        l0.write().mutation_stats.properties_removed += count;
1218    }
1219
1220    /// Validates vertex constraints for the given properties.
1221    /// In the new design, label is passed as a parameter since VID no longer embeds label.
1222    async fn validate_vertex_constraints_for_label(
1223        &self,
1224        vid: Vid,
1225        properties: &Properties,
1226        label: &str,
1227        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1228    ) -> Result<()> {
1229        self.validate_vertex_constraints_for_label_impl(vid, properties, label, tx_l0, false)
1230            .await
1231    }
1232
1233    /// Partial-update sibling: validates only constraints touching keys
1234    /// present in `properties` (the touched set). NOT NULL is checked
1235    /// only for touched keys; multi-key UNIQUE / CHECK / EXISTS are
1236    /// skipped when any referenced key is absent (the caller is
1237    /// expected to have routed to the full-row path in that case via
1238    /// `touched_needs_full_read`).
1239    async fn validate_vertex_constraints_for_label_partial(
1240        &self,
1241        vid: Vid,
1242        properties: &Properties,
1243        label: &str,
1244        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1245    ) -> Result<()> {
1246        self.validate_vertex_constraints_for_label_impl(vid, properties, label, tx_l0, true)
1247            .await
1248    }
1249
1250    async fn validate_vertex_constraints_for_label_impl(
1251        &self,
1252        vid: Vid,
1253        properties: &Properties,
1254        label: &str,
1255        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1256        partial: bool,
1257    ) -> Result<()> {
1258        let schema = self.schema_manager.schema();
1259
1260        {
1261            // 1. Check NOT NULL constraints (from Property definitions).
1262            //    Under partial-update mode, skip properties NOT in
1263            //    `properties` — they retain their previous (already-
1264            //    validated) value.
1265            if let Some(props_meta) = schema.properties.get(label) {
1266                for (prop_name, meta) in props_meta {
1267                    if !meta.nullable {
1268                        let present = properties.get(prop_name);
1269                        if partial && present.is_none() {
1270                            continue;
1271                        }
1272                        if present.is_none_or(|v| v.is_null()) {
1273                            log::warn!(
1274                                "Constraint violation: Property '{}' cannot be null for label '{}'",
1275                                prop_name,
1276                                label
1277                            );
1278                            return Err(anyhow!(
1279                                "Constraint violation: Property '{}' cannot be null",
1280                                prop_name
1281                            ));
1282                        }
1283                    }
1284                }
1285            }
1286
1287            // 2. Check Explicit Constraints (Unique, Check, etc.)
1288            for constraint in &schema.constraints {
1289                if !constraint.enabled {
1290                    continue;
1291                }
1292                match &constraint.target {
1293                    ConstraintTarget::Label(l) if l == label => {}
1294                    _ => continue,
1295                }
1296
1297                match &constraint.constraint_type {
1298                    ConstraintType::Unique {
1299                        properties: unique_props,
1300                    } => {
1301                        // Support single and multi-property unique constraints
1302                        if !unique_props.is_empty() {
1303                            let mut key_values = Vec::new();
1304                            let mut missing = false;
1305                            for prop in unique_props {
1306                                if let Some(val) = properties.get(prop) {
1307                                    key_values.push((prop.clone(), val.clone()));
1308                                } else {
1309                                    missing = true; // Can't enforce if property missing (partial update?)
1310                                    // For INSERT, missing means null?
1311                                    // If property is nullable, unique constraint typically allows multiple nulls or ignores?
1312                                    // For now, only check if ALL keys are present
1313                                }
1314                            }
1315
1316                            if !missing {
1317                                self.check_unique_constraint_multi(label, &key_values, vid, tx_l0)
1318                                    .await?;
1319                            }
1320                        }
1321                    }
1322                    ConstraintType::Exists { property } => {
1323                        if properties.get(property).is_none_or(|v| v.is_null()) {
1324                            log::warn!(
1325                                "Constraint violation: Property '{}' must exist for label '{}'",
1326                                property,
1327                                label
1328                            );
1329                            return Err(anyhow!(
1330                                "Constraint violation: Property '{}' must exist",
1331                                property
1332                            ));
1333                        }
1334                    }
1335                    ConstraintType::Check { expression } => {
1336                        if !self.evaluate_check_constraint(expression, properties)? {
1337                            return Err(anyhow!(
1338                                "CHECK constraint '{}' violated: expression '{}' evaluated to false",
1339                                constraint.name,
1340                                expression
1341                            ));
1342                        }
1343                    }
1344                    _ => {
1345                        return Err(anyhow!("Unsupported constraint type"));
1346                    }
1347                }
1348            }
1349        }
1350        Ok(())
1351    }
1352
1353    /// Validates vertex constraints for a vertex with the given labels.
1354    /// Labels must be passed explicitly since the vertex may not yet be in L0.
1355    /// Unknown labels (not in schema) are skipped.
1356    async fn validate_vertex_constraints(
1357        &self,
1358        vid: Vid,
1359        properties: &Properties,
1360        labels: &[String],
1361        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1362    ) -> Result<()> {
1363        let schema = self.schema_manager.schema();
1364
1365        // Validate constraints only for known labels
1366        for label in labels {
1367            // Skip unknown labels (schemaless support)
1368            if schema.get_label_case_insensitive(label).is_none() {
1369                continue;
1370            }
1371            self.validate_vertex_constraints_for_label(vid, properties, label, tx_l0)
1372                .await?;
1373        }
1374
1375        // Check global ext_id uniqueness if ext_id is provided
1376        if let Some(ext_id) = properties.get("ext_id").and_then(|v| v.as_str()) {
1377            self.check_extid_globally_unique(ext_id, vid, tx_l0).await?;
1378        }
1379
1380        Ok(())
1381    }
1382
1383    /// Partial sibling of `validate_vertex_constraints` — validates only
1384    /// constraints touching keys present in `properties`. Used by
1385    /// `insert_vertex_partial`'s fast path; the caller pre-screens for
1386    /// multi-key UNIQUE constraints via `touched_needs_full_read`.
1387    async fn validate_vertex_constraints_partial(
1388        &self,
1389        vid: Vid,
1390        touched: &Properties,
1391        labels: &[String],
1392        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1393    ) -> Result<()> {
1394        let schema = self.schema_manager.schema();
1395        for label in labels {
1396            if schema.get_label_case_insensitive(label).is_none() {
1397                continue;
1398            }
1399            self.validate_vertex_constraints_for_label_partial(vid, touched, label, tx_l0)
1400                .await?;
1401        }
1402        if let Some(ext_id) = touched.get("ext_id").and_then(|v| v.as_str()) {
1403            self.check_extid_globally_unique(ext_id, vid, tx_l0).await?;
1404        }
1405        Ok(())
1406    }
1407
1408    /// Collect ext_ids and unique constraint keys from an iterator of vertex properties.
1409    ///
1410    /// Used to build a constraint key index from L0 buffers for batch validation.
1411    fn collect_constraint_keys_from_properties<'a>(
1412        properties_iter: impl Iterator<Item = &'a Properties>,
1413        label: &str,
1414        constraints: &[uni_common::core::schema::Constraint],
1415        existing_keys: &mut HashMap<String, HashSet<String>>,
1416        existing_extids: &mut HashSet<String>,
1417    ) {
1418        for props in properties_iter {
1419            if let Some(ext_id) = props.get("ext_id").and_then(|v| v.as_str()) {
1420                existing_extids.insert(ext_id.to_string());
1421            }
1422
1423            for constraint in constraints {
1424                if !constraint.enabled {
1425                    continue;
1426                }
1427                if let ConstraintTarget::Label(l) = &constraint.target {
1428                    if l != label {
1429                        continue;
1430                    }
1431                } else {
1432                    continue;
1433                }
1434
1435                if let ConstraintType::Unique {
1436                    properties: unique_props,
1437                } = &constraint.constraint_type
1438                {
1439                    let mut key_parts = Vec::new();
1440                    let mut all_present = true;
1441                    for prop in unique_props {
1442                        if let Some(val) = props.get(prop) {
1443                            key_parts.push(format!("{}:{}", prop, val));
1444                        } else {
1445                            all_present = false;
1446                            break;
1447                        }
1448                    }
1449                    if all_present {
1450                        let key = key_parts.join("|");
1451                        existing_keys
1452                            .entry(constraint.name.clone())
1453                            .or_default()
1454                            .insert(key);
1455                    }
1456                }
1457            }
1458        }
1459    }
1460
1461    /// Validates constraints for a batch of vertices efficiently.
1462    ///
1463    /// This method builds an in-memory index from L0 buffers ONCE instead of scanning
1464    /// per vertex, reducing complexity from O(n²) to O(n) for bulk inserts.
1465    ///
1466    /// # Arguments
1467    /// * `vids` - VIDs of vertices being inserted
1468    /// * `properties_batch` - Properties for each vertex
1469    /// * `label` - Label for all vertices (assumes single label for now)
1470    ///
1471    /// # Performance
1472    /// For N vertices with unique constraints:
1473    /// - Old approach: O(N²) - scan L0 buffer N times
1474    /// - New approach: O(N) - scan L0 buffer once, build HashSet, check each vertex in O(1)
1475    async fn validate_vertex_batch_constraints(
1476        &self,
1477        vids: &[Vid],
1478        properties_batch: &[Properties],
1479        label: &str,
1480        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1481    ) -> Result<()> {
1482        if vids.len() != properties_batch.len() {
1483            return Err(anyhow!("VID/properties length mismatch"));
1484        }
1485
1486        let schema = self.schema_manager.schema();
1487
1488        // 1. Validate NOT NULL constraints for each vertex
1489        if let Some(props_meta) = schema.properties.get(label) {
1490            for (idx, properties) in properties_batch.iter().enumerate() {
1491                for (prop_name, meta) in props_meta {
1492                    if !meta.nullable && properties.get(prop_name).is_none_or(|v| v.is_null()) {
1493                        return Err(anyhow!(
1494                            "Constraint violation at index {}: Property '{}' cannot be null",
1495                            idx,
1496                            prop_name
1497                        ));
1498                    }
1499                }
1500            }
1501        }
1502
1503        // 2. Build constraint key index from L0 buffers (ONCE for entire batch)
1504        let mut existing_keys: HashMap<String, HashSet<String>> = HashMap::new();
1505        let mut existing_extids: HashSet<String> = HashSet::new();
1506
1507        // Scan current L0 buffer
1508        {
1509            let l0 = self.l0_manager.get_current();
1510            let l0_guard = l0.read();
1511            Self::collect_constraint_keys_from_properties(
1512                l0_guard.vertex_properties.values(),
1513                label,
1514                &schema.constraints,
1515                &mut existing_keys,
1516                &mut existing_extids,
1517            );
1518        }
1519
1520        // Scan transaction L0 if present
1521        if let Some(tx_l0) = tx_l0 {
1522            let tx_l0_guard = tx_l0.read();
1523            Self::collect_constraint_keys_from_properties(
1524                tx_l0_guard.vertex_properties.values(),
1525                label,
1526                &schema.constraints,
1527                &mut existing_keys,
1528                &mut existing_extids,
1529            );
1530        }
1531
1532        // 3. Check batch vertices against index AND check for duplicates within batch
1533        let mut batch_keys: HashMap<String, HashMap<String, usize>> = HashMap::new();
1534        let mut batch_extids: HashMap<String, usize> = HashMap::new();
1535
1536        for (idx, (_vid, properties)) in vids.iter().zip(properties_batch.iter()).enumerate() {
1537            // Check ext_id uniqueness
1538            if let Some(ext_id) = properties.get("ext_id").and_then(|v| v.as_str()) {
1539                if existing_extids.contains(ext_id) {
1540                    return Err(anyhow!(
1541                        "Constraint violation at index {}: ext_id '{}' already exists",
1542                        idx,
1543                        ext_id
1544                    ));
1545                }
1546                if let Some(first_idx) = batch_extids.get(ext_id) {
1547                    return Err(anyhow!(
1548                        "Constraint violation: ext_id '{}' duplicated in batch at indices {} and {}",
1549                        ext_id,
1550                        first_idx,
1551                        idx
1552                    ));
1553                }
1554                batch_extids.insert(ext_id.to_string(), idx);
1555            }
1556
1557            // Check unique constraints
1558            for constraint in &schema.constraints {
1559                if !constraint.enabled {
1560                    continue;
1561                }
1562                if let ConstraintTarget::Label(l) = &constraint.target {
1563                    if l != label {
1564                        continue;
1565                    }
1566                } else {
1567                    continue;
1568                }
1569
1570                match &constraint.constraint_type {
1571                    ConstraintType::Unique {
1572                        properties: unique_props,
1573                    } => {
1574                        let mut key_parts = Vec::new();
1575                        let mut all_present = true;
1576                        for prop in unique_props {
1577                            if let Some(val) = properties.get(prop) {
1578                                key_parts.push(format!("{}:{}", prop, val));
1579                            } else {
1580                                all_present = false;
1581                                break;
1582                            }
1583                        }
1584
1585                        if all_present {
1586                            let key = key_parts.join("|");
1587
1588                            // Check against existing L0 keys
1589                            if let Some(keys) = existing_keys.get(&constraint.name)
1590                                && keys.contains(&key)
1591                            {
1592                                return Err(anyhow!(
1593                                    "Constraint violation at index {}: Duplicate composite key for label '{}' (constraint '{}')",
1594                                    idx,
1595                                    label,
1596                                    constraint.name
1597                                ));
1598                            }
1599
1600                            // Check for duplicates within batch
1601                            let batch_constraint_keys =
1602                                batch_keys.entry(constraint.name.clone()).or_default();
1603                            if let Some(first_idx) = batch_constraint_keys.get(&key) {
1604                                return Err(anyhow!(
1605                                    "Constraint violation: Duplicate key '{}' in batch at indices {} and {}",
1606                                    key,
1607                                    first_idx,
1608                                    idx
1609                                ));
1610                            }
1611                            batch_constraint_keys.insert(key, idx);
1612                        }
1613                    }
1614                    ConstraintType::Exists { property }
1615                        if properties.get(property).is_none_or(|v| v.is_null()) =>
1616                    {
1617                        return Err(anyhow!(
1618                            "Constraint violation at index {}: Property '{}' must exist",
1619                            idx,
1620                            property
1621                        ));
1622                    }
1623                    ConstraintType::Check { expression }
1624                        if !self.evaluate_check_constraint(expression, properties)? =>
1625                    {
1626                        return Err(anyhow!(
1627                            "Constraint violation at index {}: CHECK constraint '{}' violated",
1628                            idx,
1629                            constraint.name
1630                        ));
1631                    }
1632                    _ => {}
1633                }
1634            }
1635        }
1636
1637        // 4. Check storage for unique constraints (can batch this into a single query)
1638        for constraint in &schema.constraints {
1639            if !constraint.enabled {
1640                continue;
1641            }
1642            if let ConstraintTarget::Label(l) = &constraint.target {
1643                if l != label {
1644                    continue;
1645                }
1646            } else {
1647                continue;
1648            }
1649
1650            if let ConstraintType::Unique {
1651                properties: unique_props,
1652            } = &constraint.constraint_type
1653            {
1654                // Build compound OR filter for all batch vertices
1655                let mut or_filters = Vec::new();
1656                for properties in properties_batch.iter() {
1657                    let mut and_parts = Vec::new();
1658                    let mut all_present = true;
1659                    for prop in unique_props {
1660                        if let Some(val) = properties.get(prop) {
1661                            let val_str = match val {
1662                                Value::String(s) => format!("'{}'", s.replace('\'', "''")),
1663                                Value::Int(n) => n.to_string(),
1664                                Value::Float(f) => f.to_string(),
1665                                Value::Bool(b) => b.to_string(),
1666                                _ => {
1667                                    all_present = false;
1668                                    break;
1669                                }
1670                            };
1671                            and_parts.push(format!("{} = {}", prop, val_str));
1672                        } else {
1673                            all_present = false;
1674                            break;
1675                        }
1676                    }
1677                    if all_present {
1678                        or_filters.push(format!("({})", and_parts.join(" AND ")));
1679                    }
1680                }
1681
1682                #[cfg(feature = "lance-backend")]
1683                if !or_filters.is_empty() {
1684                    let vid_list: Vec<String> =
1685                        vids.iter().map(|v| v.as_u64().to_string()).collect();
1686                    let filter = format!(
1687                        "({}) AND _deleted = false AND _vid NOT IN ({})",
1688                        or_filters.join(" OR "),
1689                        vid_list.join(", ")
1690                    );
1691
1692                    if let Ok(ds) = self.storage.vertex_dataset(label)
1693                        && let Ok(lance_ds) = ds.open_raw().await
1694                    {
1695                        let count = lance_ds.count_rows(Some(filter.clone())).await?;
1696                        if count > 0 {
1697                            return Err(anyhow!(
1698                                "Constraint violation: Duplicate composite key for label '{}' in storage (constraint '{}')",
1699                                label,
1700                                constraint.name
1701                            ));
1702                        }
1703                    }
1704                }
1705            }
1706        }
1707
1708        Ok(())
1709    }
1710
1711    /// Checks that ext_id is globally unique across all vertices.
1712    ///
1713    /// Searches L0 buffers (current, transaction, pending) and the main vertices table
1714    /// to ensure no other vertex uses this ext_id.
1715    ///
1716    /// # Errors
1717    ///
1718    /// Returns error if another vertex with the same ext_id exists.
1719    async fn check_extid_globally_unique(
1720        &self,
1721        ext_id: &str,
1722        current_vid: Vid,
1723        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1724    ) -> Result<()> {
1725        // Check L0 buffers: current, transaction, and pending flush
1726        let l0_buffers_to_check: Vec<Arc<RwLock<L0Buffer>>> = {
1727            let mut buffers = vec![self.l0_manager.get_current()];
1728            if let Some(tx_l0) = tx_l0 {
1729                buffers.push(tx_l0.clone());
1730            }
1731            buffers.extend(self.l0_manager.get_pending_flush());
1732            buffers
1733        };
1734
1735        for l0 in &l0_buffers_to_check {
1736            // O(1) per buffer via the maintained `extid_index` (the previous
1737            // full `vertex_properties` scan made constrained ingest O(n²)).
1738            if let Some(&vid) = l0.read().extid_index.get(ext_id)
1739                && vid != current_vid
1740            {
1741                return Err(anyhow!(
1742                    "Constraint violation: ext_id '{}' already exists (vertex {:?})",
1743                    ext_id,
1744                    vid
1745                ));
1746            }
1747        }
1748
1749        // Check main vertices table (if it exists)
1750        // Pass None for global uniqueness check (not snapshot-isolated)
1751        let backend = self.storage.backend();
1752        if let Ok(Some(found_vid)) = MainVertexDataset::find_by_ext_id(backend, ext_id, None).await
1753            && found_vid != current_vid
1754        {
1755            return Err(anyhow!(
1756                "Constraint violation: ext_id '{}' already exists (vertex {:?})",
1757                ext_id,
1758                found_vid
1759            ));
1760        }
1761
1762        Ok(())
1763    }
1764
1765    /// Helper to get vertex labels from L0 buffer.
1766    fn get_vertex_labels_from_l0(&self, vid: Vid) -> Option<Vec<String>> {
1767        let l0 = self.l0_manager.get_current();
1768        let l0_guard = l0.read();
1769        // Check if vertex is tombstoned (deleted) - if so, return None
1770        if l0_guard.vertex_tombstones.contains(&vid) {
1771            return None;
1772        }
1773        l0_guard.get_vertex_labels(vid).map(|l| l.to_vec())
1774    }
1775
1776    /// Get vertex labels from all sources: current L0, pending L0s, and storage.
1777    /// This is the proper way to read vertex labels after a flush, as it checks both
1778    /// in-memory buffers and persisted storage.
1779    pub async fn get_vertex_labels(
1780        &self,
1781        vid: Vid,
1782        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1783    ) -> Option<Vec<String>> {
1784        // 1. Check current L0
1785        if let Some(labels) = self.get_vertex_labels_from_l0(vid) {
1786            return Some(labels);
1787        }
1788
1789        // 2. Check transaction L0 if present
1790        if let Some(tx_l0) = tx_l0 {
1791            let guard = tx_l0.read();
1792            if guard.vertex_tombstones.contains(&vid) {
1793                return None;
1794            }
1795            if let Some(labels) = guard.get_vertex_labels(vid) {
1796                return Some(labels.to_vec());
1797            }
1798        }
1799
1800        // 3. Check pending flush L0s
1801        for pending_l0 in self.l0_manager.get_pending_flush() {
1802            let guard = pending_l0.read();
1803            if guard.vertex_tombstones.contains(&vid) {
1804                return None;
1805            }
1806            if let Some(labels) = guard.get_vertex_labels(vid) {
1807                return Some(labels.to_vec());
1808            }
1809        }
1810
1811        // 4. Check storage
1812        self.find_vertex_labels_in_storage(vid).await.ok().flatten()
1813    }
1814
1815    /// Helper to get edge type from L0 buffer.
1816    fn get_edge_type_from_l0(&self, eid: Eid) -> Option<String> {
1817        let l0 = self.l0_manager.get_current();
1818        let l0_guard = l0.read();
1819        l0_guard.get_edge_type(eid).map(|s| s.to_string())
1820    }
1821
1822    /// Look up the edge type ID (u32) for an EID from the L0 buffer's edge endpoints.
1823    /// Falls back to the transaction L0 if available.
1824    pub fn get_edge_type_id_from_l0(
1825        &self,
1826        eid: Eid,
1827        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1828    ) -> Option<u32> {
1829        // Check transaction L0 first
1830        if let Some(tx_l0) = tx_l0 {
1831            let guard = tx_l0.read();
1832            if let Some((_, _, etype)) = guard.get_edge_endpoint_full(eid) {
1833                return Some(etype);
1834            }
1835        }
1836        // Fall back to main L0
1837        let l0 = self.l0_manager.get_current();
1838        let l0_guard = l0.read();
1839        l0_guard
1840            .get_edge_endpoint_full(eid)
1841            .map(|(_, _, etype)| etype)
1842    }
1843
1844    /// Set the type name for an edge (used for schemaless edge types).
1845    /// This is called during CREATE for edge types not found in the schema.
1846    pub fn set_edge_type(
1847        &self,
1848        eid: Eid,
1849        type_name: String,
1850        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1851    ) {
1852        self.resolve_l0(tx_l0).write().set_edge_type(eid, type_name);
1853    }
1854
1855    /// Evaluate a simple CHECK constraint expression.
1856    /// Supports: "property op value" (e.g., "age > 18", "status = 'active'")
1857    fn evaluate_check_constraint(&self, expression: &str, properties: &Properties) -> Result<bool> {
1858        let parts: Vec<&str> = expression.split_whitespace().collect();
1859        if parts.len() != 3 {
1860            // For now, only support "prop op val"
1861            // Fallback to true if too complex to avoid breaking, but warn
1862            log::warn!(
1863                "Complex CHECK constraint expression '{}' not fully supported yet; allowing write.",
1864                expression
1865            );
1866            return Ok(true);
1867        }
1868
1869        let prop_part = parts[0].trim_start_matches('(');
1870        // Handle "variable.property" format - take the part after the dot
1871        let prop_name = if let Some(idx) = prop_part.find('.') {
1872            &prop_part[idx + 1..]
1873        } else {
1874            prop_part
1875        };
1876
1877        let op = parts[1];
1878        let val_str = parts[2].trim_end_matches(')');
1879
1880        let prop_val = match properties.get(prop_name) {
1881            Some(v) => v,
1882            None => return Ok(true), // If property missing, CHECK usually passes (unless NOT NULL)
1883        };
1884
1885        // Parse value string (handle quotes for strings)
1886        let target_val = if (val_str.starts_with('\'') && val_str.ends_with('\''))
1887            || (val_str.starts_with('"') && val_str.ends_with('"'))
1888        {
1889            Value::String(val_str[1..val_str.len() - 1].to_string())
1890        } else if let Ok(n) = val_str.parse::<i64>() {
1891            Value::Int(n)
1892        } else if let Ok(n) = val_str.parse::<f64>() {
1893            Value::Float(n)
1894        } else if let Ok(b) = val_str.parse::<bool>() {
1895            Value::Bool(b)
1896        } else {
1897            // Check for internal format wrappers if they somehow leaked through
1898            if val_str.starts_with("Number(") && val_str.ends_with(')') {
1899                let n_str = &val_str[7..val_str.len() - 1];
1900                if let Ok(n) = n_str.parse::<i64>() {
1901                    Value::Int(n)
1902                } else if let Ok(n) = n_str.parse::<f64>() {
1903                    Value::Float(n)
1904                } else {
1905                    Value::String(val_str.to_string())
1906                }
1907            } else {
1908                Value::String(val_str.to_string())
1909            }
1910        };
1911
1912        match op {
1913            "=" | "==" => Ok(prop_val == &target_val),
1914            "!=" | "<>" => Ok(prop_val != &target_val),
1915            ">" => self
1916                .compare_values(prop_val, &target_val)
1917                .map(|o| o.is_gt()),
1918            "<" => self
1919                .compare_values(prop_val, &target_val)
1920                .map(|o| o.is_lt()),
1921            ">=" => self
1922                .compare_values(prop_val, &target_val)
1923                .map(|o| o.is_ge()),
1924            "<=" => self
1925                .compare_values(prop_val, &target_val)
1926                .map(|o| o.is_le()),
1927            _ => {
1928                log::warn!("Unsupported operator '{}' in CHECK constraint", op);
1929                Ok(true)
1930            }
1931        }
1932    }
1933
1934    fn compare_values(&self, a: &Value, b: &Value) -> Result<std::cmp::Ordering> {
1935        use std::cmp::Ordering;
1936
1937        fn cmp_f64(x: f64, y: f64) -> Ordering {
1938            x.partial_cmp(&y).unwrap_or(Ordering::Equal)
1939        }
1940
1941        match (a, b) {
1942            (Value::Int(n1), Value::Int(n2)) => Ok(n1.cmp(n2)),
1943            (Value::Float(f1), Value::Float(f2)) => Ok(cmp_f64(*f1, *f2)),
1944            (Value::Int(n), Value::Float(f)) => Ok(cmp_f64(*n as f64, *f)),
1945            (Value::Float(f), Value::Int(n)) => Ok(cmp_f64(*f, *n as f64)),
1946            (Value::String(s1), Value::String(s2)) => Ok(s1.cmp(s2)),
1947            _ => Err(anyhow!(
1948                "Cannot compare incompatible types: {:?} vs {:?}",
1949                a,
1950                b
1951            )),
1952        }
1953    }
1954
1955    async fn check_unique_constraint_multi(
1956        &self,
1957        label: &str,
1958        key_values: &[(String, Value)],
1959        current_vid: Vid,
1960        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1961    ) -> Result<()> {
1962        // Serialize constraint key once for O(1) lookups
1963        let key = serialize_constraint_key(label, key_values);
1964
1965        // 1. Check L0 (in-memory) using O(1) constraint index
1966        {
1967            let l0 = self.l0_manager.get_current();
1968            let l0_guard = l0.read();
1969            if l0_guard.has_constraint_key(&key, current_vid) {
1970                return Err(anyhow!(
1971                    "Constraint violation: Duplicate composite key for label '{}'",
1972                    label
1973                ));
1974            }
1975        }
1976
1977        // 1b. Check pending-flush buffers (Bug #9A). A flush rotates a key's
1978        // buffer onto `pending_flush` and installs a fresh empty current
1979        // buffer; until the rotated rows reach Lance the key is invisible to
1980        // both the current-buffer check above and the storage check below, so
1981        // a duplicate could slip through that flush window. Mirror the read
1982        // paths (e.g. `check_extid_globally_unique`, `get_vertex_labels`) that
1983        // already consult `pending_flush`.
1984        for pending_l0 in self.l0_manager.get_pending_flush() {
1985            if pending_l0.read().has_constraint_key(&key, current_vid) {
1986                return Err(anyhow!(
1987                    "Constraint violation: Duplicate composite key for label '{}' (in pending flush)",
1988                    label
1989                ));
1990            }
1991        }
1992
1993        // Check Transaction L0
1994        if let Some(tx_l0) = tx_l0 {
1995            let tx_l0_guard = tx_l0.read();
1996            if tx_l0_guard.has_constraint_key(&key, current_vid) {
1997                return Err(anyhow!(
1998                    "Constraint violation: Duplicate composite key for label '{}' (in tx)",
1999                    label
2000                ));
2001            }
2002        }
2003
2004        // 2. Check Storage (L1/L2)
2005        let filters: Vec<String> = key_values
2006            .iter()
2007            .map(|(prop, val)| {
2008                let val_str = match val {
2009                    Value::String(s) => format!("'{}'", s.replace('\'', "''")),
2010                    Value::Int(n) => n.to_string(),
2011                    Value::Float(f) => f.to_string(),
2012                    Value::Bool(b) => b.to_string(),
2013                    _ => "NULL".to_string(),
2014                };
2015                format!("{} = {}", prop, val_str)
2016            })
2017            .collect();
2018
2019        let mut filter = filters.join(" AND ");
2020        filter.push_str(&format!(
2021            " AND _deleted = false AND _vid != {}",
2022            current_vid.as_u64()
2023        ));
2024
2025        #[cfg(feature = "lance-backend")]
2026        if let Ok(ds) = self.storage.vertex_dataset(label)
2027            && let Ok(lance_ds) = ds.open_raw().await
2028        {
2029            let count = lance_ds.count_rows(Some(filter.clone())).await?;
2030            if count > 0 {
2031                return Err(anyhow!(
2032                    "Constraint violation: Duplicate composite key for label '{}' (in storage). Filter: {}",
2033                    label,
2034                    filter
2035                ));
2036            }
2037        }
2038
2039        Ok(())
2040    }
2041
2042    async fn check_write_pressure(&self) -> Result<()> {
2043        let status = self
2044            .storage
2045            .compaction_status()
2046            .map_err(|e| anyhow::anyhow!("Failed to get compaction status: {}", e))?;
2047        let l1_runs = status.l1_runs;
2048        let throttle = &self.config.throttle;
2049
2050        if l1_runs >= throttle.hard_limit {
2051            log::warn!("Write stalled: L1 runs ({}) at hard limit", l1_runs);
2052            // Simple polling for now
2053            while self
2054                .storage
2055                .compaction_status()
2056                .map_err(|e| anyhow::anyhow!("Failed to get compaction status: {}", e))?
2057                .l1_runs
2058                >= throttle.hard_limit
2059            {
2060                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2061            }
2062        } else if l1_runs >= throttle.soft_limit {
2063            let excess = l1_runs - throttle.soft_limit;
2064            // Cap multiplier to avoid overflow
2065            let excess = std::cmp::min(excess, 31);
2066            let multiplier = 2_u32.pow(excess as u32);
2067            let delay = throttle.base_delay * multiplier;
2068            tokio::time::sleep(delay).await;
2069        }
2070        Ok(())
2071    }
2072
2073    /// Check transaction memory limit to prevent OOM.
2074    /// No-op when no transaction is active.
2075    fn check_transaction_memory(&self, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) -> Result<()> {
2076        if let Some(tx_l0) = tx_l0 {
2077            let size = tx_l0.read().estimated_size;
2078            if size > self.config.max_transaction_memory {
2079                return Err(anyhow!(
2080                    "Transaction memory limit exceeded: {} bytes used, limit is {} bytes. \
2081                     Roll back or commit the current transaction.",
2082                    size,
2083                    self.config.max_transaction_memory
2084                ));
2085            }
2086        }
2087        Ok(())
2088    }
2089
2090    async fn get_query_context(
2091        &self,
2092        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2093    ) -> Option<QueryContext> {
2094        Some(QueryContext::new_with_pending(
2095            self.l0_manager.get_current(),
2096            tx_l0.cloned(),
2097            self.l0_manager.get_pending_flush(),
2098        ))
2099    }
2100
2101    /// Layer-1 CRDT variant enforcement, shared by the single-vertex and batch
2102    /// write paths.
2103    ///
2104    /// Rejects a declared CRDT property written as a parsed CRDT value
2105    /// (`Value::Map`) whose variant differs from the schema's declared variant.
2106    /// A mismatch would make the commit-time merge silently overwrite instead of
2107    /// merge, and the OCC CRDT carve-out (`occ::crdt_carveout_overwrite` /
2108    /// `WriteSet::from_l0`) would hide it as a lost update — so it must be caught
2109    /// at write time, on *every* write path. `try_as_crdt` is `Map`-gated, so the
2110    /// JSON-string (Cypher) form and non-CRDT values pass through untouched: they
2111    /// are never carved out and stay conflictable.
2112    fn enforce_crdt_variants(
2113        props_meta: &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
2114        properties: &Properties,
2115    ) -> Result<()> {
2116        for (key, value) in properties {
2117            let Some(meta) = props_meta.get(key) else {
2118                continue;
2119            };
2120            let uni_common::core::schema::DataType::Crdt(expected) = &meta.r#type else {
2121                continue;
2122            };
2123            if let Some(crdt) = crate::runtime::l0::try_as_crdt(value)
2124                && crdt.type_name() != expected.type_name()
2125            {
2126                return Err(anyhow::Error::new(uni_common::UniError::Constraint {
2127                    message: format!(
2128                        "CRDT property '{key}' must be written as a {} value",
2129                        expected.type_name()
2130                    ),
2131                }));
2132            }
2133        }
2134        Ok(())
2135    }
2136
2137    /// Prepare a vertex for upsert by merging CRDT properties with existing values.
2138    ///
2139    /// When `label` is provided, uses it directly to look up property metadata.
2140    /// Otherwise falls back to discovering the label from L0 buffers and storage.
2141    ///
2142    /// # Errors
2143    ///
2144    /// Returns an error if CRDT property merging fails.
2145    async fn prepare_vertex_upsert(
2146        &self,
2147        vid: Vid,
2148        properties: &mut Properties,
2149        label: Option<&str>,
2150        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2151    ) -> Result<()> {
2152        let Some(pm) = &self.property_manager else {
2153            return Ok(());
2154        };
2155
2156        let schema = self.schema_manager.schema();
2157
2158        // Resolve label: use provided label or discover from L0/storage
2159        let discovered_labels;
2160        let label_name = if let Some(l) = label {
2161            Some(l)
2162        } else {
2163            discovered_labels = self.get_vertex_labels(vid, tx_l0).await;
2164            discovered_labels
2165                .as_ref()
2166                .and_then(|l| l.first().map(|s| s.as_str()))
2167        };
2168
2169        let Some(label_str) = label_name else {
2170            return Ok(());
2171        };
2172        let Some(props_meta) = schema.properties.get(label_str) else {
2173            return Ok(());
2174        };
2175
2176        // Identify CRDT properties in the insert data
2177        let crdt_keys: Vec<String> = properties
2178            .keys()
2179            .filter(|key| {
2180                props_meta.get(*key).is_some_and(|meta| {
2181                    matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2182                })
2183            })
2184            .cloned()
2185            .collect();
2186
2187        if crdt_keys.is_empty() {
2188            return Ok(());
2189        }
2190
2191        // Enforce that each declared CRDT property written as a parsed CRDT value
2192        // (`Value::Map`) carries its declared variant. A mismatched variant makes
2193        // `merge_crdt_properties` overwrite rather than merge at commit, and the
2194        // OCC carve-out (`occ::crdt_carveout_overwrite` / `WriteSet::from_l0`)
2195        // would hide that as a silent lost update — reject it at the source.
2196        //
2197        // Only the `Map` form is checked: it is exactly the form the carve-out
2198        // applies to (`try_as_crdt` is `Map`-gated). A CRDT written as a JSON
2199        // string (the Cypher form) or a non-CRDT value is never carved out — it
2200        // stays conflictable — so it poses no carve-out soundness risk and is left
2201        // to the existing merge/parse path. This is the declared-property half of
2202        // the layered fix; the commit-time check covers undeclared CRDT-shaped values.
2203        Self::enforce_crdt_variants(props_meta, properties)?;
2204
2205        let ctx = self.get_query_context(tx_l0).await;
2206        for key in crdt_keys {
2207            let existing = pm.get_vertex_prop_with_ctx(vid, &key, ctx.as_ref()).await?;
2208            if !existing.is_null()
2209                && let Some(val) = properties.get_mut(&key)
2210            {
2211                *val = pm.merge_crdt_values(&existing, val)?;
2212            }
2213        }
2214
2215        Ok(())
2216    }
2217
2218    async fn prepare_edge_upsert(
2219        &self,
2220        eid: Eid,
2221        properties: &mut Properties,
2222        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2223    ) -> Result<()> {
2224        if let Some(pm) = &self.property_manager {
2225            let schema = self.schema_manager.schema();
2226            // Get edge type from L0 buffer instead of from EID
2227            let type_name = self.get_edge_type_from_l0(eid);
2228
2229            if let Some(ref t_name) = type_name
2230                && let Some(props_meta) = schema.properties.get(t_name)
2231            {
2232                let mut crdt_keys = Vec::new();
2233                for (key, _) in properties.iter() {
2234                    if let Some(meta) = props_meta.get(key)
2235                        && matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2236                    {
2237                        crdt_keys.push(key.clone());
2238                    }
2239                }
2240
2241                if !crdt_keys.is_empty() {
2242                    let ctx = self.get_query_context(tx_l0).await;
2243                    for key in crdt_keys {
2244                        let existing = pm.get_edge_prop(eid, &key, ctx.as_ref()).await?;
2245
2246                        if !existing.is_null()
2247                            && let Some(val) = properties.get_mut(&key)
2248                        {
2249                            *val = pm.merge_crdt_values(&existing, val)?;
2250                        }
2251                    }
2252                }
2253            }
2254        }
2255        Ok(())
2256    }
2257
2258    #[instrument(skip(self, properties), level = "trace")]
2259    pub async fn insert_vertex(
2260        &self,
2261        vid: Vid,
2262        properties: Properties,
2263        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2264    ) -> Result<()> {
2265        self.insert_vertex_with_labels(vid, properties, &[], tx_l0)
2266            .await?;
2267        Ok(())
2268    }
2269
2270    /// Component C1 (G4): before a non-transactional mutation merges into main
2271    /// L0, if an outstanding snapshot pins the current generation, freeze it
2272    /// aside so snapshots taken *before* this write stay isolated from it.
2273    ///
2274    /// `flush_lock` (acquired and released here) serializes the freeze against
2275    /// concurrent commit-time freezes/merges, matching the atomicity the tx
2276    /// commit path gets. No-op for transactional writes (their freeze happens at
2277    /// commit) and — the common case — when nothing is pinned, where it costs one
2278    /// atomic load. Freezes at most once per pinned generation: the freeze
2279    /// installs a fresh unpinned `current`, so later writes in the same bulk
2280    /// import see no pin and merge in place, and the snapshot keeps reading the
2281    /// frozen pre-import buffer.
2282    async fn freeze_for_non_tx_write_if_pinned(&self, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) {
2283        // Self-gates on the runtime SSI toggle: nothing pins a snapshot unless a
2284        // transaction began under `ssi_enabled`, so `is_current_pinned()` is
2285        // always false (one atomic load) when SSI is off.
2286        if tx_l0.is_none() && self.l0_manager.is_current_pinned() {
2287            let _flush_lock_guard = self.flush_lock.lock().await;
2288            // Re-check under the lock: a concurrent commit may have frozen first.
2289            if self.l0_manager.is_current_pinned() {
2290                self.l0_manager.freeze_current_for_snapshot();
2291                metrics::counter!("uni_l0_snapshot_freezes_total").increment(1);
2292            }
2293        }
2294    }
2295
2296    #[instrument(skip(self, properties, labels), level = "trace")]
2297    pub async fn insert_vertex_with_labels(
2298        &self,
2299        vid: Vid,
2300        mut properties: Properties,
2301        labels: &[String],
2302        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2303    ) -> Result<Properties> {
2304        let start = std::time::Instant::now();
2305        self.check_write_pressure().await?;
2306        self.check_transaction_memory(tx_l0)?;
2307
2308        // Component C1 (G4): a non-transactional write (`tx_l0 == None`, e.g. bulk
2309        // import / LOAD CSV) mutates main L0 directly, outside the commit-time
2310        // snapshot freeze. Freeze the pinned generation aside first so snapshots
2311        // taken before this write stay isolated from it.
2312        self.freeze_for_non_tx_write_if_pinned(tx_l0).await;
2313
2314        if !self.try_defer_embedding(labels, &properties, vid, tx_l0) {
2315            self.process_embeddings_for_labels(labels, &mut properties)
2316                .await?;
2317        }
2318        self.validate_vertex_constraints(vid, &properties, labels, tx_l0)
2319            .await?;
2320        self.prepare_vertex_upsert(
2321            vid,
2322            &mut properties,
2323            labels.first().map(|s| s.as_str()),
2324            tx_l0,
2325        )
2326        .await?;
2327
2328        // Clone properties and labels before moving into L0 to return them and populate constraint index
2329        let properties_copy = properties.clone();
2330        let labels_copy = labels.to_vec();
2331
2332        {
2333            // For a non-tx write, re-resolve the live `current` buffer and hold
2334            // `flush_lock` across the (synchronous) write so a concurrent flush
2335            // rotate (which takes `flush_lock` via `begin_flush`) cannot install
2336            // a fresh `current` between resolve and write, dropping our write
2337            // (Bug #4). For a tx write `resolve_l0` returns the tx-private
2338            // buffer, never the rotating `current`, so no `flush_lock` is needed
2339            // (and taking it would risk re-entrancy with the commit path).
2340            let _flush_lock_guard = if tx_l0.is_none() {
2341                Some(self.flush_lock.lock().await)
2342            } else {
2343                None
2344            };
2345            let l0 = self.resolve_l0(tx_l0);
2346            let mut l0_guard = l0.write();
2347            // Generation chaining: a non-tx CRDT write into the (post-freeze,
2348            // possibly empty) current buffer must merge against the chained
2349            // committed state, not shadow it. No-op when the chain is empty
2350            // or this is a tx-private write.
2351            if tx_l0.is_none() {
2352                let pending = self.l0_manager.get_pending_flush();
2353                if !pending.is_empty() {
2354                    Self::seed_crdt_props(&pending, vid, &properties, &mut l0_guard);
2355                }
2356            }
2357            l0_guard.insert_vertex_with_labels(vid, properties, labels);
2358
2359            // Populate constraint index for O(1) duplicate detection
2360            let schema = self.schema_manager.schema();
2361            for label in &labels_copy {
2362                if schema.get_label_case_insensitive(label).is_none() {
2363                    if self.config.strict_schema {
2364                        return Err(anyhow::anyhow!(
2365                            "Label '{}' is not defined in the schema \
2366                             (strict_schema is enabled).",
2367                            label
2368                        ));
2369                    }
2370                    continue; // Schemaless: skip unknown labels.
2371                }
2372
2373                // For each unique constraint on this label, insert into constraint index
2374                for constraint in &schema.constraints {
2375                    if !constraint.enabled {
2376                        continue;
2377                    }
2378                    if let ConstraintTarget::Label(l) = &constraint.target {
2379                        if l != label {
2380                            continue;
2381                        }
2382                    } else {
2383                        continue;
2384                    }
2385
2386                    if let ConstraintType::Unique {
2387                        properties: unique_props,
2388                    } = &constraint.constraint_type
2389                    {
2390                        let mut key_values = Vec::new();
2391                        let mut all_present = true;
2392                        for prop in unique_props {
2393                            if let Some(val) = properties_copy.get(prop) {
2394                                key_values.push((prop.clone(), val.clone()));
2395                            } else {
2396                                all_present = false;
2397                                break;
2398                            }
2399                        }
2400
2401                        if all_present {
2402                            let key = serialize_constraint_key(label, &key_values);
2403                            l0_guard.insert_constraint_key(key, vid);
2404                        }
2405                    }
2406                }
2407            }
2408        }
2409
2410        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2411        self.update_metrics();
2412
2413        if tx_l0.is_none() {
2414            self.check_flush().await?;
2415        }
2416        if start.elapsed().as_millis() > 100 {
2417            log::warn!("Slow insert_vertex: {}ms", start.elapsed().as_millis());
2418        }
2419        Ok(properties_copy)
2420    }
2421
2422    /// True iff routing this partial write through MergeInsert would
2423    /// miss a constraint check. Specifically: a multi-key UNIQUE
2424    /// constraint where the touched-set doesn't cover all member keys
2425    /// requires the unchanged keys from the existing row to compute
2426    /// the composite. Conservative: also returns true if any touched
2427    /// key is `ext_id` (uniqueness checked globally — handled in the
2428    /// full-row path).
2429    fn touched_needs_full_read(&self, touched: &Properties, labels: &[String]) -> bool {
2430        if touched.contains_key("ext_id") {
2431            return true;
2432        }
2433        let schema = self.schema_manager.schema();
2434        for label in labels {
2435            if schema.get_label_case_insensitive(label).is_none() {
2436                continue;
2437            }
2438            for constraint in &schema.constraints {
2439                if !constraint.enabled {
2440                    continue;
2441                }
2442                if let ConstraintTarget::Label(l) = &constraint.target {
2443                    if !l.eq_ignore_ascii_case(label) {
2444                        continue;
2445                    }
2446                } else {
2447                    continue;
2448                }
2449                if let ConstraintType::Unique {
2450                    properties: unique_props,
2451                } = &constraint.constraint_type
2452                {
2453                    if unique_props.len() < 2 {
2454                        continue; // single-key UNIQUE — partial path sees the key
2455                    }
2456                    if unique_props.iter().any(|p| touched.contains_key(p)) {
2457                        return true;
2458                    }
2459                }
2460            }
2461        }
2462        false
2463    }
2464
2465    /// Insert a vertex's FULL property row plus a touched-keys hint so
2466    /// the flush emits ONLY those columns via Lance MergeInsert.
2467    ///
2468    /// Caller must have read the full row (via PropertyManager) and
2469    /// applied SET-touched values on top before calling — same input
2470    /// shape as `insert_vertex_with_labels`. The new arg `touched_keys`
2471    /// is the set of property keys this SET statement actually
2472    /// assigned; L0 records it in `vertex_partial_keys[vid]` and the
2473    /// flush filters the MergeInsert source schema down to those keys.
2474    /// When `UniConfig::partial_lance_writes == false`, falls through
2475    /// to `insert_vertex_with_labels` (Append) — preserving bit-for-bit
2476    /// equivalence with prior releases.
2477    #[instrument(skip(self, props, touched_keys, labels), level = "trace")]
2478    pub async fn insert_vertex_partial_full(
2479        &self,
2480        vid: Vid,
2481        mut props: Properties,
2482        touched_keys: HashSet<String>,
2483        labels: &[String],
2484        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2485    ) -> Result<()> {
2486        if !self.config.partial_lance_writes
2487            || self.touched_needs_full_read(&props_subset(&props, &touched_keys), labels)
2488        {
2489            self.insert_vertex_with_labels(vid, props, labels, tx_l0)
2490                .await?;
2491            return Ok(());
2492        }
2493
2494        self.check_write_pressure().await?;
2495        self.check_transaction_memory(tx_l0)?;
2496        if !self.try_defer_embedding(labels, &props, vid, tx_l0) {
2497            self.process_embeddings_for_labels(labels, &mut props)
2498                .await?;
2499        }
2500        // Full-row validation runs because we have the complete map;
2501        // no need for the partial-only validator.
2502        self.validate_vertex_constraints(vid, &props, labels, tx_l0)
2503            .await?;
2504        {
2505            let l0 = self.resolve_l0(tx_l0);
2506            let mut l0_guard = l0.write();
2507            l0_guard.insert_vertex_partial_full(vid, props, touched_keys, labels);
2508        }
2509        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2510        metrics::counter!("uni_partial_writes_total").increment(1);
2511        self.update_metrics();
2512        if tx_l0.is_none() {
2513            self.check_flush().await?;
2514        }
2515        Ok(())
2516    }
2517
2518    /// Insert a vertex's *partial* property set without first reading the
2519    /// full row.
2520    ///
2521    /// When `WriterConfig::partial_lance_writes` is `true`, the touched
2522    /// keys flow into `L0Buffer::vertex_partial_keys` so the next flush
2523    /// emits them via Lance `MergeInsertBuilder` against a subset-of-
2524    /// schema source — preserving untouched columns (e.g., embeddings)
2525    /// byte-equal in Lance with no read at the caller and no write of
2526    /// those columns.
2527    ///
2528    /// When the flag is `false`, this falls back to the existing
2529    /// `insert_vertex_with_labels` path after merging `touched` with
2530    /// the current properties from L0/storage. The caller can therefore
2531    /// use this entry point unconditionally; the optimization activates
2532    /// only when the flag is on.
2533    #[instrument(skip(self, touched, labels), level = "trace")]
2534    pub async fn insert_vertex_partial(
2535        &self,
2536        vid: Vid,
2537        touched: Properties,
2538        labels: &[String],
2539        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2540    ) -> Result<()> {
2541        let needs_full_read =
2542            !self.config.partial_lance_writes || self.touched_needs_full_read(&touched, labels);
2543        if needs_full_read {
2544            // Flag-off fallback (or constraint-driven fallback): merge
2545            // `touched` with the current full property snapshot from
2546            // L0/storage and route through the existing path. Preserves
2547            // bit-for-bit equivalence with the pre-Round-11 release.
2548            let existing = if let Some(pm) = &self.property_manager {
2549                pm.get_all_vertex_props_with_ctx(vid, None)
2550                    .await
2551                    .unwrap_or_default()
2552                    .unwrap_or_default()
2553            } else {
2554                Properties::new()
2555            };
2556            let mut merged = existing;
2557            for (k, v) in touched {
2558                merged.insert(k, v);
2559            }
2560            self.insert_vertex_with_labels(vid, merged, labels, tx_l0)
2561                .await?;
2562            return Ok(());
2563        }
2564
2565        // Flag-on fast path: stage the partial update directly. Pressure
2566        // checks, embedding generation, constraint validation all still
2567        // run — but the validator is the partial-aware variant that
2568        // skips NOT NULL / multi-key UNIQUE / CHECK / EXISTS for
2569        // properties not present in `touched`. Multi-key UNIQUE that
2570        // overlaps the touched set forces a fallback above via
2571        // `touched_needs_full_read`.
2572        let mut touched = touched;
2573        self.check_write_pressure().await?;
2574        self.check_transaction_memory(tx_l0)?;
2575        if !self.try_defer_embedding(labels, &touched, vid, tx_l0) {
2576            self.process_embeddings_for_labels(labels, &mut touched)
2577                .await?;
2578        }
2579        self.validate_vertex_constraints_partial(vid, &touched, labels, tx_l0)
2580            .await?;
2581
2582        {
2583            let l0 = self.resolve_l0(tx_l0);
2584            let mut l0_guard = l0.write();
2585            l0_guard.insert_vertex_partial(vid, touched, labels);
2586        }
2587
2588        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2589        metrics::counter!("uni_partial_writes_total").increment(1);
2590        self.update_metrics();
2591        if tx_l0.is_none() {
2592            self.check_flush().await?;
2593        }
2594        Ok(())
2595    }
2596
2597    /// Insert multiple vertices with batched operations.
2598    ///
2599    /// This method uses batched operations to achieve O(N) complexity instead of O(N²)
2600    /// for bulk inserts with unique constraints.
2601    ///
2602    /// # Performance Improvements
2603    /// - Batch VID allocation: 1 call instead of N calls
2604    /// - Batch constraint validation: O(N) instead of O(N²)
2605    /// - Batch embedding generation: 1 API call per config instead of N calls
2606    /// - Transaction wrapping: Automatic flush deferral, atomicity
2607    ///
2608    /// # Arguments
2609    /// * `vids` - Pre-allocated VIDs for the vertices
2610    /// * `properties_batch` - Properties for each vertex
2611    /// * `labels` - Labels for all vertices (assumes single label for simplicity)
2612    ///
2613    /// # Errors
2614    /// Returns error if:
2615    /// - VID/properties length mismatch
2616    /// - Constraint violation detected
2617    /// - Embedding generation fails
2618    /// - Transaction commit fails
2619    ///
2620    /// # Atomicity
2621    /// If this method fails, all changes are rolled back (if transaction was started here).
2622    pub async fn insert_vertices_batch(
2623        &self,
2624        vids: Vec<Vid>,
2625        mut properties_batch: Vec<Properties>,
2626        labels: Vec<String>,
2627        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2628    ) -> Result<Vec<Properties>> {
2629        let start = std::time::Instant::now();
2630
2631        // Validate inputs
2632        if vids.len() != properties_batch.len() {
2633            return Err(anyhow!(
2634                "VID/properties size mismatch: {} vids, {} properties",
2635                vids.len(),
2636                properties_batch.len()
2637            ));
2638        }
2639
2640        if vids.is_empty() {
2641            return Ok(Vec::new());
2642        }
2643
2644        // Batch operations — writes go directly to the resolved L0.
2645        // Atomicity is guaranteed by the caller holding the writer lock.
2646        let result = async {
2647            self.check_write_pressure().await?;
2648            self.check_transaction_memory(tx_l0)?;
2649
2650            // Component C1 (G4): batch bulk-import is the canonical non-tx write —
2651            // freeze the pinned generation aside before merging so snapshot
2652            // readers stay isolated. No-op when unpinned or transactional.
2653            self.freeze_for_non_tx_write_if_pinned(tx_l0).await;
2654
2655            // Batch embedding generation (1 API call per config)
2656            self.process_embeddings_for_batch(&labels, &mut properties_batch)
2657                .await?;
2658
2659            // Batch constraint validation (O(N) instead of O(N²))
2660            let label = labels
2661                .first()
2662                .ok_or_else(|| anyhow!("No labels provided"))?;
2663            self.validate_vertex_batch_constraints(&vids, &properties_batch, label, tx_l0)
2664                .await?;
2665
2666            // Batch prepare (CRDT merging if needed)
2667            // Check schema once: skip entirely if no CRDT properties for this label.
2668            // For new vertices (freshly allocated VIDs), there are no existing CRDT
2669            // values to merge, so the per-vertex lookup is unnecessary in that case.
2670            let has_crdt_fields = {
2671                let schema = self.schema_manager.schema();
2672                schema
2673                    .properties
2674                    .get(label.as_str())
2675                    .is_some_and(|props_meta| {
2676                        props_meta.values().any(|meta| {
2677                            matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2678                        })
2679                    })
2680            };
2681
2682            if has_crdt_fields {
2683                // Layer-1 variant enforcement (G3): the batch path must reject a
2684                // declared-CRDT variant mismatch exactly as the single-vertex
2685                // `prepare_vertex_upsert` does. Without this, a wrong-variant CRDT
2686                // written via batch import slips past write-time validation and
2687                // the OCC carve-out then masks the overwrite as a lost update.
2688                {
2689                    let schema = self.schema_manager.schema();
2690                    if let Some(props_meta) = schema.properties.get(label.as_str()) {
2691                        for props in &properties_batch {
2692                            Self::enforce_crdt_variants(props_meta, props)?;
2693                        }
2694                    }
2695                }
2696
2697                // Batch fetch existing CRDT values: collect VIDs that need merging,
2698                // then query once via PropertyManager instead of per-vertex lookups.
2699                let schema = self.schema_manager.schema();
2700                let crdt_keys: Vec<String> = schema
2701                    .properties
2702                    .get(label.as_str())
2703                    .map(|props_meta| {
2704                        props_meta
2705                            .iter()
2706                            .filter(|(_, meta)| {
2707                                matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2708                            })
2709                            .map(|(key, _)| key.clone())
2710                            .collect()
2711                    })
2712                    .unwrap_or_default();
2713
2714                if let Some(pm) = &self.property_manager {
2715                    let ctx = self.get_query_context(tx_l0).await;
2716                    for (vid, props) in vids.iter().zip(&mut properties_batch) {
2717                        for key in &crdt_keys {
2718                            if props.contains_key(key) {
2719                                let existing =
2720                                    pm.get_vertex_prop_with_ctx(*vid, key, ctx.as_ref()).await?;
2721                                if !existing.is_null()
2722                                    && let Some(val) = props.get_mut(key)
2723                                {
2724                                    *val = pm.merge_crdt_values(&existing, val)?;
2725                                }
2726                            }
2727                        }
2728                    }
2729                }
2730            }
2731
2732            // Batch L0 writes — route to active L0 (transaction L0 if active, else current).
2733            let target_l0 = self.resolve_l0(tx_l0);
2734
2735            let properties_result = properties_batch.clone();
2736            {
2737                let mut l0_guard = target_l0.write();
2738                for (vid, props) in vids.iter().zip(properties_batch.iter()) {
2739                    l0_guard.insert_vertex_with_labels(*vid, props.clone(), &labels);
2740                }
2741            }
2742
2743            // Update metrics (batch increment)
2744            metrics::counter!("uni_l0_buffer_mutations_total").increment(vids.len() as u64);
2745            self.update_metrics();
2746
2747            Ok::<Vec<Properties>, anyhow::Error>(properties_result)
2748        }
2749        .await;
2750
2751        let props = result?;
2752
2753        if start.elapsed().as_millis() > 100 {
2754            log::warn!(
2755                "Slow insert_vertices_batch ({} vertices): {}ms",
2756                vids.len(),
2757                start.elapsed().as_millis()
2758            );
2759        }
2760
2761        Ok(props)
2762    }
2763
2764    /// Delete a vertex by VID.
2765    ///
2766    /// When `labels` is provided, uses them directly to populate L0 for
2767    /// correct tombstone flushing. Otherwise discovers labels from L0
2768    /// buffers and storage (which can be slow for many vertices).
2769    ///
2770    /// # Errors
2771    ///
2772    /// Returns an error if write pressure stalls, label lookup fails, or
2773    /// the L0 delete operation fails.
2774    #[instrument(skip(self, labels), level = "trace")]
2775    pub async fn delete_vertex(
2776        &self,
2777        vid: Vid,
2778        labels: Option<Vec<String>>,
2779        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2780    ) -> Result<()> {
2781        let start = std::time::Instant::now();
2782        self.check_write_pressure().await?;
2783        self.check_transaction_memory(tx_l0)?;
2784        self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
2785
2786        // Before deleting, ensure we have the vertex's labels stored in L0 so
2787        // the tombstone can be flushed to the correct label datasets. Discover
2788        // them up front (this may await storage) WITHOUT pinning the buffer we
2789        // will eventually mutate — for non-tx writes the live `current` buffer
2790        // is re-resolved below under `flush_lock`, so a concurrent rotate can't
2791        // drop our write (Bug #4). `resolve_l0` here is only used for cheap
2792        // reads that tolerate a racing rotate.
2793        let has_labels = {
2794            let l0_guard = self.resolve_l0(tx_l0);
2795            let guard = l0_guard.read();
2796            guard.vertex_labels.contains_key(&vid)
2797        };
2798
2799        let backfill_labels = if has_labels {
2800            None
2801        } else if let Some(provided) = labels {
2802            // Caller provided labels — skip the lookup entirely
2803            Some(provided)
2804        } else {
2805            // Discover labels from pending flush L0s, then storage
2806            let mut found = None;
2807            for pending_l0 in self.l0_manager.get_pending_flush() {
2808                let pending_guard = pending_l0.read();
2809                if let Some(l) = pending_guard.get_vertex_labels(vid) {
2810                    found = Some(l.to_vec());
2811                    break;
2812                }
2813            }
2814            if found.is_none() {
2815                found = self.find_vertex_labels_in_storage(vid).await?;
2816            }
2817            found
2818        };
2819
2820        // Test-only seam (no-op without the `failpoints` feature): pause a
2821        // non-transactional delete AFTER the awaited label discovery but BEFORE
2822        // it re-resolves the live buffer and writes the tombstone. A concurrent
2823        // flush can rotate+complete a buffer in this window; the fix re-resolves
2824        // `get_current()` and mutates it under `flush_lock`, so the tombstone
2825        // always lands in the live buffer (Bug #4 — silent lost delete across
2826        // L0 rotation).
2827        fail::fail_point!("nontx::after-capture");
2828
2829        // Apply the label backfill and the tombstone together. For a non-tx
2830        // write, hold `flush_lock` across the (synchronous) re-resolve + write
2831        // so a concurrent flush rotate (which takes `flush_lock` via
2832        // `begin_flush`) cannot install a fresh `current` between our resolve
2833        // and our write. For a tx write `resolve_l0` returns the tx-private
2834        // buffer (never the rotating `current`), so no `flush_lock` is needed
2835        // — and taking it there would risk re-entrancy with the commit path.
2836        if tx_l0.is_none() {
2837            let _flush_lock_guard = self.flush_lock.lock().await;
2838            let l0 = self.l0_manager.get_current();
2839            let mut guard = l0.write();
2840            if let Some(found_labels) = backfill_labels {
2841                guard.vertex_labels.insert(vid, found_labels);
2842            }
2843            guard.delete_vertex(vid)?;
2844        } else {
2845            let l0 = self.resolve_l0(tx_l0);
2846            let mut guard = l0.write();
2847            if let Some(found_labels) = backfill_labels {
2848                guard.vertex_labels.insert(vid, found_labels);
2849            }
2850            guard.delete_vertex(vid)?;
2851        }
2852        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2853        self.update_metrics();
2854
2855        if tx_l0.is_none() {
2856            self.check_flush().await?;
2857        }
2858        if start.elapsed().as_millis() > 100 {
2859            log::warn!("Slow delete_vertex: {}ms", start.elapsed().as_millis());
2860        }
2861        Ok(())
2862    }
2863
2864    /// Find vertex labels from storage by querying the main vertices table.
2865    /// Returns the labels from the latest non-deleted version of the vertex.
2866    async fn find_vertex_labels_in_storage(&self, vid: Vid) -> Result<Option<Vec<String>>> {
2867        use crate::backend::types::ScanRequest;
2868        use arrow_array::Array;
2869        use arrow_array::cast::AsArray;
2870
2871        let backend = self.storage.backend();
2872        let table_name = MainVertexDataset::table_name();
2873
2874        // Check if table exists first; if not, vertex hasn't been flushed to storage yet
2875        if !backend.table_exists(table_name).await? {
2876            return Ok(None);
2877        }
2878
2879        // Query for this specific vid (don't filter by _deleted yet - we need to find the latest version first)
2880        let filter = format!("_vid = {}", vid.as_u64());
2881        let batches = backend
2882            .scan(
2883                ScanRequest::all(table_name)
2884                    .with_filter(filter)
2885                    .with_columns(vec![
2886                        "_vid".to_string(),
2887                        "labels".to_string(),
2888                        "_version".to_string(),
2889                        "_deleted".to_string(),
2890                    ]),
2891            )
2892            .await
2893            .unwrap_or_default();
2894
2895        // Find the row with the highest version number
2896        let mut max_version: Option<u64> = None;
2897        let mut labels: Option<Vec<String>> = None;
2898        let mut is_deleted = false;
2899
2900        for batch in batches {
2901            if batch.num_rows() == 0 {
2902                continue;
2903            }
2904
2905            let version_array = batch
2906                .column_by_name("_version")
2907                .unwrap()
2908                .as_primitive::<arrow_array::types::UInt64Type>();
2909
2910            let deleted_array = batch.column_by_name("_deleted").unwrap().as_boolean();
2911
2912            let labels_array = batch.column_by_name("labels").unwrap().as_list::<i32>();
2913
2914            for row_idx in 0..batch.num_rows() {
2915                let version = version_array.value(row_idx);
2916
2917                if max_version.is_none_or(|mv| version > mv) {
2918                    is_deleted = deleted_array.value(row_idx);
2919
2920                    let labels_list = labels_array.value(row_idx);
2921                    let string_array = labels_list.as_string::<i32>();
2922                    let vertex_labels: Vec<String> = (0..string_array.len())
2923                        .filter(|&i| !string_array.is_null(i))
2924                        .map(|i| string_array.value(i).to_string())
2925                        .collect();
2926
2927                    max_version = Some(version);
2928                    labels = Some(vertex_labels);
2929                }
2930            }
2931        }
2932
2933        // If the latest version is deleted, return None
2934        if is_deleted { Ok(None) } else { Ok(labels) }
2935    }
2936
2937    #[expect(clippy::too_many_arguments)]
2938    #[instrument(skip(self, props, touched_keys), level = "trace")]
2939    pub async fn insert_edge_partial_full(
2940        &self,
2941        src_vid: Vid,
2942        dst_vid: Vid,
2943        edge_type: u32,
2944        eid: Eid,
2945        props: Properties,
2946        edge_type_name: Option<String>,
2947        touched_keys: HashSet<String>,
2948        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2949    ) -> Result<()> {
2950        self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
2951        if !self.config.partial_lance_writes {
2952            return self
2953                .insert_edge(
2954                    src_vid,
2955                    dst_vid,
2956                    edge_type,
2957                    eid,
2958                    props,
2959                    edge_type_name,
2960                    tx_l0,
2961                )
2962                .await;
2963        }
2964
2965        let start = std::time::Instant::now();
2966        self.check_write_pressure().await?;
2967        self.check_transaction_memory(tx_l0)?;
2968        let mut props = props;
2969        self.prepare_edge_upsert(eid, &mut props, tx_l0).await?;
2970
2971        let l0 = self.resolve_l0(tx_l0);
2972        l0.write().insert_edge_partial_full(
2973            src_vid,
2974            dst_vid,
2975            edge_type,
2976            eid,
2977            props,
2978            edge_type_name,
2979            touched_keys,
2980        )?;
2981
2982        if tx_l0.is_none() {
2983            let version = l0.read().current_version;
2984            self.adjacency_manager
2985                .insert_edge(src_vid, dst_vid, eid, edge_type, version);
2986        }
2987
2988        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2989        metrics::counter!("uni_partial_writes_total").increment(1);
2990        self.update_metrics();
2991        if tx_l0.is_none() {
2992            self.check_flush().await?;
2993        }
2994        if start.elapsed().as_millis() > 100 {
2995            log::warn!(
2996                "Slow insert_edge_partial_full: {}ms",
2997                start.elapsed().as_millis()
2998            );
2999        }
3000        Ok(())
3001    }
3002
3003    #[expect(clippy::too_many_arguments)]
3004    pub async fn insert_edge(
3005        &self,
3006        src_vid: Vid,
3007        dst_vid: Vid,
3008        edge_type: u32,
3009        eid: Eid,
3010        mut properties: Properties,
3011        edge_type_name: Option<String>,
3012        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
3013    ) -> Result<()> {
3014        let start = std::time::Instant::now();
3015        self.check_write_pressure().await?;
3016        self.check_transaction_memory(tx_l0)?;
3017        self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
3018        self.prepare_edge_upsert(eid, &mut properties, tx_l0)
3019            .await?;
3020
3021        let l0 = self.resolve_l0(tx_l0);
3022        l0.write()
3023            .insert_edge(src_vid, dst_vid, edge_type, eid, properties, edge_type_name)?;
3024
3025        // Dual-write to AdjacencyManager overlay (survives flush).
3026        // Skip for transaction-local L0 -- transaction edges are overlaid separately.
3027        if tx_l0.is_none() {
3028            let version = l0.read().current_version;
3029            self.adjacency_manager
3030                .insert_edge(src_vid, dst_vid, eid, edge_type, version);
3031        }
3032
3033        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
3034        self.update_metrics();
3035
3036        if tx_l0.is_none() {
3037            self.check_flush().await?;
3038        }
3039        if start.elapsed().as_millis() > 100 {
3040            log::warn!("Slow insert_edge: {}ms", start.elapsed().as_millis());
3041        }
3042        Ok(())
3043    }
3044
3045    #[instrument(skip(self), level = "trace")]
3046    pub async fn delete_edge(
3047        &self,
3048        eid: Eid,
3049        src_vid: Vid,
3050        dst_vid: Vid,
3051        edge_type: u32,
3052        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
3053    ) -> Result<()> {
3054        let start = std::time::Instant::now();
3055        self.check_write_pressure().await?;
3056        self.check_transaction_memory(tx_l0)?;
3057        self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
3058        let l0 = self.resolve_l0(tx_l0);
3059
3060        l0.write().delete_edge(eid, src_vid, dst_vid, edge_type)?;
3061
3062        // Dual-write tombstone to AdjacencyManager overlay.
3063        if tx_l0.is_none() {
3064            let version = l0.read().current_version;
3065            self.adjacency_manager
3066                .add_tombstone(eid, src_vid, dst_vid, edge_type, version);
3067        }
3068        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
3069        self.update_metrics();
3070
3071        if tx_l0.is_none() {
3072            self.check_flush().await?;
3073        }
3074        if start.elapsed().as_millis() > 100 {
3075            log::warn!("Slow delete_edge: {}ms", start.elapsed().as_millis());
3076        }
3077        Ok(())
3078    }
3079
3080    /// Decide whether a flush should be triggered based on mutation count
3081    /// or elapsed time since the last flush.
3082    ///
3083    /// Extracted from [`Writer::check_flush`] so `commit_transaction_l0` can
3084    /// reuse the decision while bypassing the lock-acquiring entry point
3085    /// (it already holds `flush_lock`).
3086    fn should_flush(&self) -> bool {
3087        let count = self.l0_manager.get_current().read().mutation_count;
3088        if count == 0 {
3089            return false;
3090        }
3091        if count >= self.config.auto_flush_threshold {
3092            return true;
3093        }
3094        if let Some(interval) = self.config.auto_flush_interval
3095            && self.last_flush_time.lock().elapsed() >= interval
3096            && count >= self.config.auto_flush_min_mutations
3097        {
3098            return true;
3099        }
3100        false
3101    }
3102
3103    /// Check if flush should be triggered based on mutation count or time elapsed.
3104    /// This method is called after each write operation and can also be called
3105    /// by a background task for time-based flushing.
3106    pub async fn check_flush(&self) -> Result<()> {
3107        if self.should_flush() {
3108            self.flush_to_l1(None).await?;
3109        }
3110        Ok(())
3111    }
3112
3113    /// Process embeddings for a vertex using labels passed directly.
3114    /// Use this when labels haven't been stored to L0 yet.
3115    async fn process_embeddings_for_labels(
3116        &self,
3117        labels: &[String],
3118        properties: &mut Properties,
3119    ) -> Result<()> {
3120        let label_name = labels.first().map(|s| s.as_str());
3121        self.process_embeddings_impl(label_name, properties).await
3122    }
3123
3124    /// Phase B: if `defer_embeddings` is enabled in `UniConfig` and the
3125    /// vertex has an embedding config that hasn't been satisfied by the
3126    /// caller-provided properties, enqueue the VID in
3127    /// `L0Buffer::pending_embeddings` and return `true`. The caller then
3128    /// skips `process_embeddings_for_labels` and the embedding is computed
3129    /// in a single batched call at flush time via
3130    /// `drain_pending_embeddings`.
3131    ///
3132    /// Returns `false` (caller falls back to today's per-row eager embed)
3133    /// if any of:
3134    ///  - the flag is off,
3135    ///  - no label has an embedding config,
3136    ///  - the user already provided the target property (matches the
3137    ///    existing skip-if-present semantics at writer.rs:2727).
3138    ///
3139    /// Trade-off: when deferral is active, in-tx reads of the embedding
3140    /// column return only what was already in storage (or nothing for
3141    /// brand-new vertices). Existing tests that RETURN n.embedding in
3142    /// the same tx as a SET on the source column must run with the flag
3143    /// off; opt in only when no such reads happen between write and
3144    /// commit.
3145    fn try_defer_embedding(
3146        &self,
3147        labels: &[String],
3148        properties: &Properties,
3149        vid: Vid,
3150        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
3151    ) -> bool {
3152        if !self.config.defer_embeddings {
3153            return false;
3154        }
3155        let Some(label) = labels.first() else {
3156            return false;
3157        };
3158
3159        let schema = self.schema_manager.schema();
3160        let mut has_unsatisfied_cfg = false;
3161        for idx in &schema.indexes {
3162            if let IndexDefinition::Vector(v_cfg) = idx
3163                && v_cfg.label == *label
3164                && v_cfg.embedding_config.is_some()
3165                && !properties.contains_key(&v_cfg.property)
3166            {
3167                has_unsatisfied_cfg = true;
3168                break;
3169            }
3170        }
3171        if !has_unsatisfied_cfg {
3172            return false;
3173        }
3174
3175        let l0 = self.resolve_l0(tx_l0);
3176        let mut guard = l0.write();
3177        guard.pending_embeddings.insert(vid, label.clone());
3178        true
3179    }
3180
3181    /// Drain `pending_embeddings` from the rotated old-L0 right before
3182    /// `flush_stream_l1` reads it. Groups by label, issues one batched
3183    /// `process_embeddings_for_batch` call per label, and writes the
3184    /// resulting embedding vectors into each VID's `vertex_properties`
3185    /// map. After this returns, the flush proceeds against an L0 that
3186    /// looks no different from one whose embeddings were generated
3187    /// per-row at insert.
3188    ///
3189    /// Idempotent: a VID whose embedding was already materialized
3190    /// (e.g., by on-demand read paths in a future Phase B revision) is
3191    /// detected via `properties.contains_key(target_prop)` inside
3192    /// `process_embeddings_for_batch` (writer.rs:~2650), so re-running
3193    /// the drain is safe.
3194    async fn drain_pending_embeddings(&self, old_l0_arc: &Arc<RwLock<L0Buffer>>) -> Result<()> {
3195        let by_label: HashMap<String, Vec<Vid>> = {
3196            let guard = old_l0_arc.read();
3197            if guard.pending_embeddings.is_empty() {
3198                return Ok(());
3199            }
3200            let mut m: HashMap<String, Vec<Vid>> = HashMap::new();
3201            for (vid, label) in &guard.pending_embeddings {
3202                m.entry(label.clone()).or_default().push(*vid);
3203            }
3204            m
3205        };
3206
3207        for (label, vids) in by_label {
3208            let mut properties_batch: Vec<Properties> = {
3209                let guard = old_l0_arc.read();
3210                vids.iter()
3211                    .map(|vid| {
3212                        guard
3213                            .vertex_properties
3214                            .get(vid)
3215                            .cloned()
3216                            .unwrap_or_default()
3217                    })
3218                    .collect()
3219            };
3220
3221            self.process_embeddings_for_batch(std::slice::from_ref(&label), &mut properties_batch)
3222                .await?;
3223
3224            let mut guard = old_l0_arc.write();
3225            for (vid, props) in vids.iter().zip(properties_batch) {
3226                let target = guard.vertex_properties.entry(*vid).or_default();
3227                for (k, v) in props {
3228                    target.insert(k, v);
3229                }
3230                guard.pending_embeddings.remove(vid);
3231            }
3232        }
3233        Ok(())
3234    }
3235
3236    /// Process embeddings for a batch of vertices efficiently.
3237    ///
3238    /// Groups vertices by embedding config and makes batched API calls to the
3239    /// embedding service instead of calling once per vertex.
3240    ///
3241    /// # Performance
3242    /// For N vertices with embedding config:
3243    /// - Old approach: N API calls to embedding service
3244    /// - New approach: 1 API call per embedding config (usually 1 total)
3245    async fn process_embeddings_for_batch(
3246        &self,
3247        labels: &[String],
3248        properties_batch: &mut [Properties],
3249    ) -> Result<()> {
3250        let label_name = labels.first().map(|s| s.as_str());
3251        let schema = self.schema_manager.schema();
3252
3253        if let Some(label) = label_name {
3254            // Find vector indexes with embedding config for this label
3255            let mut configs = Vec::new();
3256            for idx in &schema.indexes {
3257                if let IndexDefinition::Vector(v_config) = idx
3258                    && v_config.label == label
3259                    && let Some(emb_config) = &v_config.embedding_config
3260                {
3261                    configs.push((v_config.property.clone(), emb_config.clone()));
3262                }
3263            }
3264
3265            if configs.is_empty() {
3266                return Ok(());
3267            }
3268
3269            for (target_prop, emb_config) in configs {
3270                // Collect input texts from all vertices that need embeddings
3271                let mut input_texts: Vec<String> = Vec::new();
3272                let mut needs_embedding: Vec<usize> = Vec::new();
3273
3274                for (idx, properties) in properties_batch.iter().enumerate() {
3275                    // Skip if target property already exists
3276                    if properties.contains_key(&target_prop) {
3277                        continue;
3278                    }
3279
3280                    // Check if source properties exist
3281                    let mut inputs = Vec::new();
3282                    for src_prop in &emb_config.source_properties {
3283                        if let Some(val) = properties.get(src_prop)
3284                            && let Some(s) = val.as_str()
3285                        {
3286                            inputs.push(s.to_string());
3287                        }
3288                    }
3289
3290                    if !inputs.is_empty() {
3291                        let input_text = inputs.join(" ");
3292                        let input_text = match &emb_config.document_prefix {
3293                            Some(prefix) => format!("{prefix}{input_text}"),
3294                            None => input_text,
3295                        };
3296                        input_texts.push(input_text);
3297                        needs_embedding.push(idx);
3298                    }
3299                }
3300
3301                if input_texts.is_empty() {
3302                    continue;
3303                }
3304
3305                let runtime = self.xervo_runtime.get().ok_or_else(|| {
3306                    anyhow!("Uni-Xervo runtime not configured for auto-embedding")
3307                })?;
3308                let embedder = runtime.embedding(&emb_config.alias).await?;
3309
3310                // Batch generate embeddings (single API call)
3311                let input_refs: Vec<&str> = input_texts.iter().map(|s| s.as_str()).collect();
3312                let embeddings = embedder.embed(input_refs).await?;
3313
3314                // Distribute results back to properties
3315                for (embedding_idx, &prop_idx) in needs_embedding.iter().enumerate() {
3316                    if let Some(vec) = embeddings.get(embedding_idx) {
3317                        let vals: Vec<Value> =
3318                            vec.iter().map(|f| Value::Float(*f as f64)).collect();
3319                        properties_batch[prop_idx].insert(target_prop.clone(), Value::List(vals));
3320                    }
3321                }
3322            }
3323        }
3324
3325        Ok(())
3326    }
3327
3328    async fn process_embeddings_impl(
3329        &self,
3330        label_name: Option<&str>,
3331        properties: &mut Properties,
3332    ) -> Result<()> {
3333        let schema = self.schema_manager.schema();
3334
3335        if let Some(label) = label_name {
3336            // Find vector indexes with embedding config for this label
3337            let mut configs = Vec::new();
3338            for idx in &schema.indexes {
3339                if let IndexDefinition::Vector(v_config) = idx
3340                    && v_config.label == label
3341                    && let Some(emb_config) = &v_config.embedding_config
3342                {
3343                    configs.push((v_config.property.clone(), emb_config.clone()));
3344                }
3345            }
3346
3347            if configs.is_empty() {
3348                log::info!("No embedding config found for label {}", label);
3349            }
3350
3351            for (target_prop, emb_config) in configs {
3352                // If target property already exists, skip (assume user provided it)
3353                if properties.contains_key(&target_prop) {
3354                    continue;
3355                }
3356
3357                // Check if source properties exist
3358                let mut inputs = Vec::new();
3359                for src_prop in &emb_config.source_properties {
3360                    if let Some(val) = properties.get(src_prop)
3361                        && let Some(s) = val.as_str()
3362                    {
3363                        inputs.push(s.to_string());
3364                    }
3365                }
3366
3367                if inputs.is_empty() {
3368                    continue;
3369                }
3370
3371                let input_text = inputs.join(" ");
3372                let input_text = match &emb_config.document_prefix {
3373                    Some(prefix) => format!("{prefix}{input_text}"),
3374                    None => input_text,
3375                };
3376
3377                let runtime = self.xervo_runtime.get().ok_or_else(|| {
3378                    anyhow!("Uni-Xervo runtime not configured for auto-embedding")
3379                })?;
3380                let embedder = runtime.embedding(&emb_config.alias).await?;
3381
3382                // Generate
3383                let embeddings = embedder.embed(vec![input_text.as_str()]).await?;
3384                if let Some(vec) = embeddings.first() {
3385                    // Store as array of floats
3386                    let vals: Vec<Value> = vec.iter().map(|f| Value::Float(*f as f64)).collect();
3387                    properties.insert(target_prop.clone(), Value::List(vals));
3388                }
3389            }
3390        }
3391        Ok(())
3392    }
3393
3394    /// Flushes the current in-memory L0 buffer to L1 storage.
3395    ///
3396    /// # Lock Ordering
3397    ///
3398    /// To prevent deadlocks, locks must be acquired in the following order:
3399    /// 1. `Writer` lock (held by caller via outer `Arc<RwLock<Writer>>`; removed in Phase 4)
3400    /// 2. `flush_lock` (acquired by this entry point; held across the whole flush)
3401    /// 3. `L0Manager` lock (via `begin_flush` / `get_current`)
3402    /// 4. `L0Buffer` lock (individual buffer RWLocks)
3403    /// 5. `Index` / `Storage` locks (during actual flush)
3404    ///
3405    /// Callers that already hold `flush_lock` (today only `commit_transaction_l0`)
3406    /// must call `flush_inline_under_lock` (private) directly to avoid a re-entrant
3407    /// `tokio::sync::Mutex` deadlock — see concurrent_writer.md §5.5.
3408    pub async fn flush_to_l1(&self, name: Option<String>) -> Result<String> {
3409        // Drain any in-flight async flushes first. `flush_to_l1` is a
3410        // SYNCHRONIZATION BARRIER — callers (test fixtures, fork
3411        // setup, shutdown paths) rely on it as "all writes are now
3412        // durably in Lance". Without the drain, an async stream from
3413        // a recent commit might still be writing to Lance when
3414        // `flush_to_l1` returns, leaving a window where forks branch
3415        // off pre-write Lance state and lose data.
3416        if let Some(coord) = self.flush_coordinator.as_ref() {
3417            let _ = coord.drain(self.config.drop_fork_drain_timeout).await;
3418        }
3419        let _flush_lock_guard = self.flush_lock.lock().await;
3420        self.flush_inline_under_lock(name).await
3421    }
3422
3423    /// Async-flush entry point: rotate under `flush_lock`, release the
3424    /// lock, then submit the stream phase to the [`FlushCoordinator`].
3425    /// Returns a [`FlushTicket`](crate::runtime::flush_coordinator::FlushTicket)
3426    /// that resolves when finalize completes.
3427    ///
3428    /// Errors if `config.async_flush_enabled = false` (the coordinator
3429    /// is `None` in that case — see `flush_coordinator` field doc).
3430    pub async fn flush_to_l1_async(
3431        self: &Arc<Self>,
3432        name: Option<String>,
3433    ) -> Result<crate::runtime::flush_coordinator::FlushTicket> {
3434        let coord = self
3435            .flush_coordinator
3436            .as_ref()
3437            .ok_or_else(|| anyhow!("async flush not enabled (config.async_flush_enabled=false)"))?
3438            .clone();
3439        // 1. Acquire permit FIRST (outside flush_lock) so we don't
3440        //    introduce a permit-while-holding-flush-lock convoy.
3441        let permit = coord.acquire_permit().await?;
3442        // 2. Rotate under flush_lock (µs work), then allocate the rotate seq
3443        //    and bump pending ONLY after the rotate succeeds (Bug #3). A failed
3444        //    rotate (the `?` below) must consume neither: the finalizer
3445        //    advances in strictly consecutive seq order and only decrements
3446        //    pending on finalize, so a leaked seq/pending would wedge it
3447        //    forever. The seq is allocated under `flush_lock`, immediately
3448        //    after the rotate and before the guard drops, so concurrent rotates
3449        //    keep seq order == rotation order, and the seq is unused until
3450        //    submit. On the `?` error path the permit drops, freeing the slot.
3451        let (
3452            RotateOutput {
3453                old_l0_arc,
3454                wal_lsn,
3455                current_version,
3456                flush_in_progress_guard,
3457            },
3458            seq,
3459        ) = {
3460            let _flush_lock_guard = self.flush_lock.lock().await;
3461            let rotate_out = self.flush_l0_rotate().await?;
3462            let seq = coord.next_rotate_seq();
3463            coord.note_pending();
3464            (rotate_out, seq)
3465        };
3466        // 3. Build the coordinator's RotatedFlush. parent_manifest is the
3467        //    cached_manifest snapshot at this moment.
3468        let parent_manifest = self.cached_manifest.lock().clone();
3469        let rotated = RotatedFlush {
3470            seq,
3471            old_l0_arc: old_l0_arc.clone(),
3472            wal_lsn,
3473            current_version,
3474            name: name.clone(),
3475            parent_manifest,
3476            permit,
3477            flush_in_progress_guard,
3478        };
3479        // 4. Spawn the stream phase via the coordinator. The closure
3480        //    captures Arc<Writer> transiently — drops when stream
3481        //    completes (bounded, ~50-500 ms).
3482        let writer = self.clone();
3483        let ticket = coord.submit_for_stream(rotated, move |old_l0, wal, ver, n| async move {
3484            let outcome = writer.flush_stream_l1(old_l0, wal, ver, n).await?;
3485            Ok(crate::runtime::flush_coordinator::FlushOutcome {
3486                new_manifest: outcome.manifest,
3487                snapshot_id: outcome.snapshot_id,
3488            })
3489        });
3490        Ok(ticket)
3491    }
3492
3493    /// Phase A+B+C of the flush: flush the WAL, rotate L0 (so the
3494    /// to-be-flushed buffer moves to `pending_flush` and a fresh L0 takes
3495    /// its place), and hand off the WAL to the new L0.
3496    ///
3497    /// Runs in microseconds. Must be called under `flush_lock` (the caller
3498    /// is responsible). The returned [`RotateOutput`] carries everything
3499    /// the subsequent stream + finalize phases need; in particular the
3500    /// [`FlushInProgressGuard`] is bound to the return value so it stays
3501    /// alive for the full flush lifetime — including any future async
3502    /// path where stream runs on a spawned task.
3503    async fn flush_l0_rotate(&self) -> Result<RotateOutput> {
3504        // Acquire the in-progress counter BEFORE any heavy work. The
3505        // guard lives on RotateOutput; dropping RotateOutput drops the
3506        // guard, so the counter goes back to zero exactly when the flush
3507        // is fully done.
3508        let flush_in_progress_guard = FlushInProgressGuard::new(&self.storage);
3509
3510        // A. Flush WAL BEFORE rotating L0. If WAL flush fails, the
3511        // current L0 is still active and mutations are retained in
3512        // memory until restart/retry.
3513        let wal_for_truncate = {
3514            let current_l0 = self.l0_manager.get_current();
3515            let l0_guard = current_l0.read();
3516            l0_guard.wal.clone()
3517        };
3518        // Test-only seam (no-op without the `failpoints` feature): inject a
3519        // WAL-flush failure here to drive the "failed async rotate wedges the
3520        // finalizer" regression (Bug #3). When configured to "return" it makes
3521        // `flush_l0_rotate` return Err exactly as a real WAL-flush failure would.
3522        fail::fail_point!("flush::rotate-fail", |_| {
3523            Err(anyhow!("flush::rotate-fail injected WAL-flush failure"))
3524        });
3525        let wal_lsn = if let Some(ref w) = wal_for_truncate {
3526            w.flush().await?
3527        } else {
3528            0
3529        };
3530
3531        // B. Begin flush: rotate L0 and keep old L0 visible to reads via
3532        // pending_flush until complete_flush is called by finalize.
3533        let old_l0_arc = self.l0_manager.begin_flush(0, None);
3534        metrics::counter!("uni_l0_buffer_rotations_total").increment(1);
3535
3536        // C. WAL handoff: record wal_lsn on old L0, transfer WAL handle
3537        // and current_version to the new L0.
3538        let current_version;
3539        {
3540            let mut old_l0_guard = old_l0_arc.write();
3541            current_version = old_l0_guard.current_version;
3542            old_l0_guard.wal_lsn_at_flush = wal_lsn;
3543            let wal = old_l0_guard.wal.take();
3544            let new_l0_arc = self.l0_manager.get_current();
3545            let mut new_l0_guard = new_l0_arc.write();
3546            new_l0_guard.wal = wal;
3547            new_l0_guard.current_version = current_version;
3548        }
3549
3550        Ok(RotateOutput {
3551            old_l0_arc,
3552            wal_lsn,
3553            current_version,
3554            flush_in_progress_guard,
3555        })
3556    }
3557
3558    /// Phases D, E, F, G of the flush: L1 collect, orphan resolve,
3559    /// manifest seed, Lance writes. Reads from `old_l0_arc` (kept in
3560    /// pending_flush by Phase B); writes append-only Lance datasets; does
3561    /// NOT call save_snapshot / set_latest_snapshot — those are
3562    /// finalize's job, so the manifest doesn't get published until the
3563    /// next phase.
3564    ///
3565    /// Today takes `&self`; in a follow-up commit this becomes a
3566    /// static `Send + 'static` function over `SharedFlushCtx` so it can
3567    /// run on a spawned task while concurrent commits proceed.
3568    async fn flush_stream_l1(
3569        &self,
3570        old_l0_arc: Arc<RwLock<L0Buffer>>,
3571        wal_lsn: u64,
3572        current_version: u64,
3573        name: Option<String>,
3574    ) -> Result<FlushOutcome> {
3575        // Test-only seam (no-op without the `failpoints` feature): the rotate
3576        // (begin_flush) already moved the to-be-flushed buffer onto
3577        // pending_flush and installed a fresh empty current buffer, but the
3578        // rotated rows are NOT yet durable in Lance. Pausing here holds that
3579        // window open to drive the unique-constraint-hole regression
3580        // (Bug #9 Mechanism A).
3581        fail::fail_point!("flush::after-rotate-before-lance");
3582
3583        // Phase B: materialize any deferred embeddings before column
3584        // extraction. No-op when `defer_embeddings` is off (the set will
3585        // be empty). On-demand reads of the embedding column are a TODO
3586        // for a future revision (see UniConfig::defer_embeddings docs).
3587        self.drain_pending_embeddings(&old_l0_arc).await?;
3588
3589        let schema = self.schema_manager.schema();
3590        // 2. Acquire Read lock on Old L0 for flushing
3591        let mut entries_by_type: HashMap<u32, Vec<L1Entry>> = HashMap::new();
3592        // (Vid, labels, properties, deleted, version)
3593        type VertexEntry = (Vid, Vec<String>, Properties, bool, u64);
3594        let mut vertices_by_label: HashMap<u16, Vec<VertexEntry>> = HashMap::new();
3595        // Partial-column updates (Lance MergeInsert path). Per-VID tuple:
3596        // (vid, full L0 properties map, version, set of keys to update).
3597        // Only the keys in the HashSet are emitted to the partial source;
3598        // the full props map is retained so the per-row column extractor
3599        // can read each touched key's value.
3600        type PartialEntry = (Vid, Properties, u64, std::collections::HashSet<String>);
3601        let mut partial_by_label: HashMap<u16, Vec<PartialEntry>> = HashMap::new();
3602        // DELETE-via-MergeInsert (Round-12 §B): tombstones flush as a
3603        // partial source with just `_vid`, `_deleted=true`, `_version`,
3604        // `_updated_at`. Skips the wide-row Append payload that adds
3605        // nothing on a soft-delete.
3606        let mut tombstones_by_label: HashMap<u16, Vec<(Vid, u64)>> = HashMap::new();
3607        let mut main_vertex_tombstones: Vec<(Vid, u64)> = Vec::new();
3608        // Collect vertex timestamps from L0 for flushing to storage
3609        let mut vertex_created_at: HashMap<Vid, i64> = HashMap::new();
3610        let mut vertex_updated_at: HashMap<Vid, i64> = HashMap::new();
3611        // Track tombstones missing labels for storage query fallback
3612        let mut orphaned_tombstones: Vec<(Vid, u64)> = Vec::new();
3613
3614        {
3615            let old_l0 = old_l0_arc.read();
3616
3617            // 1. Collect all edges and tombstones from L0
3618            for edge in old_l0.graph.edges() {
3619                let properties = old_l0
3620                    .edge_properties
3621                    .get(&edge.eid)
3622                    .cloned()
3623                    .unwrap_or_default();
3624                let version = old_l0.edge_versions.get(&edge.eid).copied().unwrap_or(0);
3625
3626                // Get timestamps from L0 buffer (populated during insert)
3627                let created_at = old_l0.edge_created_at.get(&edge.eid).copied();
3628                let updated_at = old_l0.edge_updated_at.get(&edge.eid).copied();
3629
3630                entries_by_type
3631                    .entry(edge.edge_type)
3632                    .or_default()
3633                    .push(L1Entry {
3634                        src_vid: edge.src_vid,
3635                        dst_vid: edge.dst_vid,
3636                        eid: edge.eid,
3637                        op: Op::Insert,
3638                        version,
3639                        properties,
3640                        created_at,
3641                        updated_at,
3642                    });
3643            }
3644
3645            // From tombstones
3646            for tombstone in old_l0.tombstones.values() {
3647                let version = old_l0
3648                    .edge_versions
3649                    .get(&tombstone.eid)
3650                    .copied()
3651                    .unwrap_or(0);
3652                // Get timestamps - for deletes, updated_at reflects deletion time
3653                let created_at = old_l0.edge_created_at.get(&tombstone.eid).copied();
3654                let updated_at = old_l0.edge_updated_at.get(&tombstone.eid).copied();
3655
3656                entries_by_type
3657                    .entry(tombstone.edge_type)
3658                    .or_default()
3659                    .push(L1Entry {
3660                        src_vid: tombstone.src_vid,
3661                        dst_vid: tombstone.dst_vid,
3662                        eid: tombstone.eid,
3663                        op: Op::Delete,
3664                        version,
3665                        properties: HashMap::new(),
3666                        created_at,
3667                        updated_at,
3668                    });
3669            }
3670
3671            // 1b. Collect vertices by label (using vertex_labels from L0)
3672            //
3673            // Helper: fan-out a single vertex entry into per-label buckets.
3674            // Each per-label table row carries the full label set so multi-label
3675            // info is preserved after flush.
3676            let push_vertex_to_labels =
3677                |vid: Vid,
3678                 all_labels: &[String],
3679                 props: Properties,
3680                 deleted: bool,
3681                 version: u64,
3682                 out: &mut HashMap<u16, Vec<VertexEntry>>| {
3683                    for label in all_labels {
3684                        if let Some(label_id) = schema.label_id_by_name(label) {
3685                            out.entry(label_id).or_default().push((
3686                                vid,
3687                                all_labels.to_vec(),
3688                                props.clone(),
3689                                deleted,
3690                                version,
3691                            ));
3692                        }
3693                    }
3694                };
3695
3696            for (vid, props) in &old_l0.vertex_properties {
3697                let version = old_l0.vertex_versions.get(vid).copied().unwrap_or(0);
3698                // Collect timestamps for this vertex
3699                if let Some(&ts) = old_l0.vertex_created_at.get(vid) {
3700                    vertex_created_at.insert(*vid, ts);
3701                }
3702                if let Some(&ts) = old_l0.vertex_updated_at.get(vid) {
3703                    vertex_updated_at.insert(*vid, ts);
3704                }
3705                if let Some(labels) = old_l0.vertex_labels.get(vid) {
3706                    // Partial-write routing: when this VID was last
3707                    // touched via `insert_vertex_partial` AND the
3708                    // partial_lance_writes flag is on, send only the
3709                    // touched columns to a MergeInsert batch. Otherwise
3710                    // (CREATE, MERGE-ON-CREATE, full-replace SET, DELETE
3711                    // — or flag off) use the existing full-row Append.
3712                    let is_partial = self.config.partial_lance_writes
3713                        && old_l0.vertex_partial_keys.contains_key(vid);
3714                    if is_partial {
3715                        if let Some(touched) = old_l0.vertex_partial_keys.get(vid) {
3716                            for label in labels {
3717                                if let Some(label_id) = schema.label_id_by_name(label) {
3718                                    partial_by_label.entry(label_id).or_default().push((
3719                                        *vid,
3720                                        props.clone(),
3721                                        version,
3722                                        touched.clone(),
3723                                    ));
3724                                }
3725                            }
3726                        }
3727                    } else {
3728                        push_vertex_to_labels(
3729                            *vid,
3730                            labels,
3731                            props.clone(),
3732                            false,
3733                            version,
3734                            &mut vertices_by_label,
3735                        );
3736                    }
3737                }
3738            }
3739            for &vid in &old_l0.vertex_tombstones {
3740                let version = old_l0.vertex_versions.get(&vid).copied().unwrap_or(0);
3741                if let Some(&ts) = old_l0.vertex_updated_at.get(&vid) {
3742                    vertex_updated_at.insert(vid, ts);
3743                }
3744                if let Some(labels) = old_l0.vertex_labels.get(&vid) {
3745                    // Round-12 §B: tombstones flush via Lance MergeInsert
3746                    // (just `_vid`, `_deleted=true`, `_version`,
3747                    // `_updated_at`) — skipping the wide-row Append.
3748                    // Unconditional (no `partial_lance_writes` gating);
3749                    // tombstone Append carries no useful payload.
3750                    for label in labels {
3751                        if let Some(label_id) = schema.label_id_by_name(label) {
3752                            tombstones_by_label
3753                                .entry(label_id)
3754                                .or_default()
3755                                .push((vid, version));
3756                        }
3757                    }
3758                } else {
3759                    // Tombstone missing labels (old WAL format) - collect for storage query fallback
3760                    orphaned_tombstones.push((vid, version));
3761                }
3762            }
3763        } // Drop read lock
3764
3765        // Resolve orphaned tombstones (missing labels) from storage
3766        if !orphaned_tombstones.is_empty() {
3767            tracing::warn!(
3768                count = orphaned_tombstones.len(),
3769                "Tombstones missing labels in L0, querying storage as fallback"
3770            );
3771            for (vid, version) in orphaned_tombstones {
3772                if let Ok(Some(labels)) = self.find_vertex_labels_in_storage(vid).await
3773                    && !labels.is_empty()
3774                {
3775                    for label in &labels {
3776                        if let Some(label_id) = schema.label_id_by_name(label) {
3777                            // Round-12 §B: route through partial tombstone too.
3778                            tombstones_by_label
3779                                .entry(label_id)
3780                                .or_default()
3781                                .push((vid, version));
3782                        }
3783                    }
3784                }
3785            }
3786        }
3787
3788        // 1. Load previous snapshot from cache, or fall back to storage.
3789        //
3790        // Use clone() not take(): for the async path, multiple
3791        // concurrent streams may run; if we take() here, a sibling
3792        // stream sees cached_manifest = None and seeds from
3793        // load_latest_snapshot (stale), losing the chain. clone()
3794        // preserves the parent. Finalize writes back the new manifest
3795        // unconditionally.
3796        let mut manifest = if let Some(cached) = self.cached_manifest.lock().clone() {
3797            cached
3798        } else {
3799            self.storage
3800                .snapshot_manager()
3801                .load_latest_snapshot()
3802                .await?
3803                .unwrap_or_else(|| {
3804                    SnapshotManifest::new(Uuid::new_v4().to_string(), schema.schema_version)
3805                })
3806        };
3807
3808        // Update snapshot metadata
3809        // Save parent snapshot ID before generating new one (for lineage tracking)
3810        let parent_id = manifest.snapshot_id.clone();
3811        manifest.parent_snapshot = Some(parent_id);
3812        manifest.snapshot_id = Uuid::new_v4().to_string();
3813        manifest.name = name;
3814        manifest.created_at = Utc::now();
3815        manifest.version_high_water_mark = current_version;
3816        manifest.wal_high_water_mark = wal_lsn;
3817        let snapshot_id = manifest.snapshot_id.clone();
3818
3819        tracing::Span::current().record("snapshot_id", &snapshot_id);
3820
3821        // 2. Write main unified tables FIRST (before deltas).
3822        //    Ensures the dual-write invariant: by the time an EID appears in a
3823        //    delta table, it already exists in main_edges. This prevents the
3824        //    compaction debug_assert from firing when compaction interleaves
3825        //    with flush at async yield points.
3826        //
3827        // 2.1 Main edges table
3828        let (main_edges, edge_created_at_map, edge_updated_at_map) = {
3829            let _old_l0 = old_l0_arc.read();
3830            let mut main_edges: Vec<(
3831                uni_common::core::id::Eid,
3832                Vid,
3833                Vid,
3834                String,
3835                Properties,
3836                bool,
3837                u64,
3838            )> = Vec::new();
3839            let mut edge_created_at_map: HashMap<uni_common::core::id::Eid, i64> = HashMap::new();
3840            let mut edge_updated_at_map: HashMap<uni_common::core::id::Eid, i64> = HashMap::new();
3841
3842            for (&edge_type_id, entries) in entries_by_type.iter() {
3843                for entry in entries {
3844                    let edge_type_name = self
3845                        .storage
3846                        .schema_manager()
3847                        .edge_type_name_by_id_unified(edge_type_id)
3848                        .unwrap_or_else(|| "unknown".to_string());
3849
3850                    let deleted = matches!(entry.op, Op::Delete);
3851                    main_edges.push((
3852                        entry.eid,
3853                        entry.src_vid,
3854                        entry.dst_vid,
3855                        edge_type_name,
3856                        entry.properties.clone(),
3857                        deleted,
3858                        entry.version,
3859                    ));
3860
3861                    if let Some(ts) = entry.created_at {
3862                        edge_created_at_map.insert(entry.eid, ts);
3863                    }
3864                    if let Some(ts) = entry.updated_at {
3865                        edge_updated_at_map.insert(entry.eid, ts);
3866                    }
3867                }
3868            }
3869
3870            (main_edges, edge_created_at_map, edge_updated_at_map)
3871        };
3872
3873        if !main_edges.is_empty() {
3874            let main_edge_batch = MainEdgeDataset::build_record_batch(
3875                &main_edges,
3876                Some(&edge_created_at_map),
3877                Some(&edge_updated_at_map),
3878            )?;
3879            MainEdgeDataset::write_batch(self.storage.backend(), main_edge_batch).await?;
3880            MainEdgeDataset::ensure_default_indexes(self.storage.backend()).await?;
3881        }
3882
3883        // 2.2 Main vertices table
3884        let main_vertices: Vec<(Vid, Vec<String>, Properties, bool, u64)> = {
3885            let old_l0 = old_l0_arc.read();
3886            let mut vertices = Vec::new();
3887
3888            // Live vertices: full-row Append on the main table (the
3889            // props_json blob is required for global ID lookups). For
3890            // partial-row VIDs (vertex_partial_keys non-empty), the
3891            // main table still needs the full props for the
3892            // ext_id-uniqueness path; we keep the Append here. The
3893            // per-label Lance write IS partial via MergeInsert.
3894            for (vid, props) in &old_l0.vertex_properties {
3895                let version = old_l0.vertex_versions.get(vid).copied().unwrap_or(0);
3896                let labels = old_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
3897                vertices.push((*vid, labels, props.clone(), false, version));
3898            }
3899
3900            // Tombstones: collected into `main_vertex_tombstones` for
3901            // the MergeInsert path below; skipping the wide-row Append.
3902            for &vid in &old_l0.vertex_tombstones {
3903                let version = old_l0.vertex_versions.get(&vid).copied().unwrap_or(0);
3904                main_vertex_tombstones.push((vid, version));
3905            }
3906
3907            vertices
3908        };
3909
3910        if !main_vertices.is_empty() {
3911            let main_vertex_batch = MainVertexDataset::build_record_batch(
3912                &main_vertices,
3913                Some(&vertex_created_at),
3914                Some(&vertex_updated_at),
3915            )?;
3916            MainVertexDataset::write_batch(self.storage.backend(), main_vertex_batch).await?;
3917        }
3918        // Round-12 §B: tombstones via MergeInsert on the main vertices
3919        // table. Independent of `vertex_properties` length.
3920        if !main_vertex_tombstones.is_empty() {
3921            let tomb_batch = MainVertexDataset::build_tombstone_partial_batch(
3922                &main_vertex_tombstones,
3923                Some(&vertex_updated_at),
3924            )?;
3925            MainVertexDataset::merge_insert_tombstone_batch(self.storage.backend(), tomb_batch)
3926                .await?;
3927        }
3928        if !main_vertices.is_empty() || !main_vertex_tombstones.is_empty() {
3929            MainVertexDataset::ensure_default_indexes(self.storage.backend()).await?;
3930        }
3931
3932        // 3. For each edge type, write FWD and BWD delta runs
3933        for (&edge_type_id, entries) in entries_by_type.iter() {
3934            // Get edge type name from unified lookup (handles both schema'd and schemaless)
3935            let edge_type_name = self
3936                .storage
3937                .schema_manager()
3938                .edge_type_name_by_id_unified(edge_type_id)
3939                .ok_or_else(|| anyhow!("Edge type ID {} not found", edge_type_id))?;
3940
3941            // FWD Run (sorted by src_vid)
3942            // Round-12 §A: split entries into full-row Append and
3943            // partial MergeInsert routes based on `edge_partial_keys`.
3944            // Edges in `edge_partial_keys` were last written via
3945            // `insert_edge_partial_full`; the per-edge-type delta
3946            // tables receive only the touched schema columns plus
3947            // (when any overflow key was touched) the regenerated
3948            // `overflow_json` blob. Untouched columns retain their
3949            // previous-version value via Lance MergeInsert.
3950            let partial_eids: std::collections::HashSet<Eid> = {
3951                let old_l0 = old_l0_arc.read();
3952                entries
3953                    .iter()
3954                    .filter(|e| {
3955                        self.config.partial_lance_writes
3956                            && old_l0.edge_partial_keys.contains_key(&e.eid)
3957                    })
3958                    .map(|e| e.eid)
3959                    .collect()
3960            };
3961            let touched_union_by_eid: HashMap<Eid, std::collections::HashSet<String>> = {
3962                let old_l0 = old_l0_arc.read();
3963                partial_eids
3964                    .iter()
3965                    .filter_map(|eid| old_l0.edge_partial_keys.get(eid).map(|s| (*eid, s.clone())))
3966                    .collect()
3967            };
3968            let (full_entries, partial_entries): (Vec<L1Entry>, Vec<L1Entry>) = entries
3969                .clone()
3970                .into_iter()
3971                .partition(|e| !partial_eids.contains(&e.eid));
3972
3973            let backend = self.storage.backend();
3974
3975            // FWD run (sorted by src_vid)
3976            let mut fwd_full = full_entries.clone();
3977            fwd_full.sort_by_key(|e| e.src_vid);
3978            let mut fwd_partial = partial_entries.clone();
3979            fwd_partial.sort_by_key(|e| e.src_vid);
3980            let fwd_ds = self.storage.delta_dataset(&edge_type_name, "fwd")?;
3981            if !fwd_full.is_empty() {
3982                let fwd_batch = fwd_ds.build_record_batch(&fwd_full, &schema)?;
3983                fwd_ds.write_run(backend, fwd_batch).await?;
3984            }
3985            if !fwd_partial.is_empty() {
3986                let touched_union: std::collections::HashSet<String> = fwd_partial
3987                    .iter()
3988                    .flat_map(|e| {
3989                        touched_union_by_eid
3990                            .get(&e.eid)
3991                            .cloned()
3992                            .unwrap_or_default()
3993                            .into_iter()
3994                    })
3995                    .collect();
3996                let fwd_partial_batch =
3997                    fwd_ds.build_partial_record_batch(&fwd_partial, &touched_union, &schema)?;
3998                fwd_ds
3999                    .merge_insert_partial_run(backend, fwd_partial_batch)
4000                    .await?;
4001            }
4002            fwd_ds.ensure_eid_index(backend).await?;
4003
4004            // BWD Run (sorted by dst_vid)
4005            let mut bwd_full = full_entries.clone();
4006            bwd_full.sort_by_key(|e| e.dst_vid);
4007            let mut bwd_partial = partial_entries.clone();
4008            bwd_partial.sort_by_key(|e| e.dst_vid);
4009            let bwd_ds = self.storage.delta_dataset(&edge_type_name, "bwd")?;
4010            if !bwd_full.is_empty() {
4011                let bwd_batch = bwd_ds.build_record_batch(&bwd_full, &schema)?;
4012                bwd_ds.write_run(backend, bwd_batch).await?;
4013            }
4014            if !bwd_partial.is_empty() {
4015                let touched_union: std::collections::HashSet<String> = bwd_partial
4016                    .iter()
4017                    .flat_map(|e| {
4018                        touched_union_by_eid
4019                            .get(&e.eid)
4020                            .cloned()
4021                            .unwrap_or_default()
4022                            .into_iter()
4023                    })
4024                    .collect();
4025                let bwd_partial_batch =
4026                    bwd_ds.build_partial_record_batch(&bwd_partial, &touched_union, &schema)?;
4027                bwd_ds
4028                    .merge_insert_partial_run(backend, bwd_partial_batch)
4029                    .await?;
4030            }
4031            bwd_ds.ensure_eid_index(backend).await?;
4032
4033            // Update Manifest
4034            let current_snap =
4035                manifest
4036                    .edges
4037                    .entry(edge_type_name.to_string())
4038                    .or_insert(EdgeSnapshot {
4039                        version: 0,
4040                        count: 0,
4041                        lance_version: 0,
4042                    });
4043            current_snap.version += 1;
4044            current_snap.count += entries.len() as u64;
4045            // LanceDB tables don't expose Lance version directly
4046            current_snap.lance_version = 0;
4047
4048            // Note: No CSR invalidation needed. AdjacencyManager's overlay
4049            // already has these edges via dual-write in insert_edge/delete_edge.
4050        }
4051
4052        // 4. Per-label vertex table writes
4053        // Iterate all labels that have either full-row OR partial-write
4054        // data pending. A label may appear in only one of the two maps
4055        // (e.g., all updates on this label were partial-only).
4056        let all_label_ids: std::collections::HashSet<u16> = vertices_by_label
4057            .keys()
4058            .chain(partial_by_label.keys())
4059            .chain(tombstones_by_label.keys())
4060            .copied()
4061            .collect();
4062        for label_id in all_label_ids {
4063            let vertices = vertices_by_label.remove(&label_id).unwrap_or_default();
4064            let label_name = schema
4065                .label_name_by_id(label_id)
4066                .ok_or_else(|| anyhow!("Label ID {} not found", label_id))?;
4067
4068            let ds = self.storage.vertex_dataset(label_name)?;
4069
4070            // Collect inverted index updates before consuming vertices
4071            // Maps: cfg.property -> (added, removed)
4072            type InvertedUpdateMap = HashMap<String, (HashMap<Vid, Vec<String>>, HashSet<Vid>)>;
4073            let mut inverted_updates: InvertedUpdateMap = HashMap::new();
4074
4075            for idx in &schema.indexes {
4076                if let IndexDefinition::Inverted(cfg) = idx
4077                    && cfg.label == label_name
4078                {
4079                    let mut added: HashMap<Vid, Vec<String>> = HashMap::new();
4080                    let mut removed: HashSet<Vid> = HashSet::new();
4081
4082                    for (vid, _labels, props, deleted, _version) in &vertices {
4083                        if *deleted {
4084                            removed.insert(*vid);
4085                        } else if let Some(prop_value) = props.get(&cfg.property) {
4086                            // Extract terms from the property value (List<String>)
4087                            if let Some(arr) = prop_value.as_array() {
4088                                let terms: Vec<String> = arr
4089                                    .iter()
4090                                    .filter_map(|v| v.as_str().map(ToString::to_string))
4091                                    .collect();
4092                                if !terms.is_empty() {
4093                                    added.insert(*vid, terms);
4094                                }
4095                            }
4096                        }
4097                    }
4098                    // Round-12 §B: tombstones no longer in `vertices`;
4099                    // pull them from `tombstones_by_label` for inverted
4100                    // index removal.
4101                    if let Some(tomb_rows) = tombstones_by_label.get(&label_id) {
4102                        for (vid, _) in tomb_rows {
4103                            removed.insert(*vid);
4104                        }
4105                    }
4106
4107                    if !added.is_empty() || !removed.is_empty() {
4108                        inverted_updates.insert(cfg.property.clone(), (added, removed));
4109                    }
4110                }
4111            }
4112
4113            let mut v_data = Vec::new();
4114            let mut d_data = Vec::new();
4115            let mut ver_data = Vec::new();
4116            for (vid, labels, props, deleted, version) in vertices {
4117                v_data.push((vid, labels, props));
4118                d_data.push(deleted);
4119                ver_data.push(version);
4120            }
4121
4122            let backend = self.storage.backend();
4123
4124            // Skip the full-row Append entirely if this label only has
4125            // partial-write rows pending.
4126            if !v_data.is_empty() {
4127                let batch = ds.build_record_batch_with_timestamps(
4128                    &v_data,
4129                    &d_data,
4130                    &ver_data,
4131                    &schema,
4132                    Some(&vertex_created_at),
4133                    Some(&vertex_updated_at),
4134                )?;
4135                ds.write_batch(backend, batch, &schema).await?;
4136            }
4137
4138            // Partial-column batch (Lance MergeInsert path). The flag
4139            // gates whether the routing classified any VIDs as partial;
4140            // outside the flag this collection is always empty so the
4141            // call below is a cheap no-op.
4142            if let Some(partial_rows) = partial_by_label.remove(&label_id)
4143                && !partial_rows.is_empty()
4144            {
4145                let touched_union: std::collections::HashSet<String> = partial_rows
4146                    .iter()
4147                    .flat_map(|(_, _, _, keys)| keys.iter().cloned())
4148                    .collect();
4149                let pairs: Vec<(Vid, Properties)> = partial_rows
4150                    .iter()
4151                    .map(|(vid, props, _, _)| (*vid, props.clone()))
4152                    .collect();
4153                let versions: Vec<u64> = partial_rows.iter().map(|(_, _, v, _)| *v).collect();
4154                let partial_batch = ds.build_partial_record_batch(
4155                    &pairs,
4156                    &versions,
4157                    &touched_union,
4158                    &schema,
4159                    Some(&vertex_updated_at),
4160                )?;
4161                if partial_batch.num_rows() > 0 {
4162                    ds.merge_insert_batch(backend, partial_batch).await?;
4163                }
4164            }
4165
4166            // Tombstone batch (Round-12 §B): always MergeInsert with
4167            // just `_vid`, `_deleted=true`, `_version`, `_updated_at`.
4168            // No partial_lance_writes gating — tombstones never carry
4169            // useful property payload to write. Captured tombstone vids
4170            // also drive `remove_from_vid_labels_index` below.
4171            let tombstone_rows = tombstones_by_label.remove(&label_id).unwrap_or_default();
4172            if !tombstone_rows.is_empty() {
4173                let tomb_batch =
4174                    ds.build_tombstone_partial_batch(&tombstone_rows, Some(&vertex_updated_at))?;
4175                if tomb_batch.num_rows() > 0 {
4176                    ds.merge_insert_batch(backend, tomb_batch).await?;
4177                }
4178            }
4179
4180            ds.ensure_default_indexes(backend).await?;
4181
4182            // Update VidLabelsIndex (if enabled). v_data carries live
4183            // vertices; tombstone removals come from the
4184            // `tombstone_rows` captured above the MergeInsert call.
4185            for ((vid, labels, _props), &deleted) in v_data.iter().zip(d_data.iter()) {
4186                if deleted {
4187                    self.storage.remove_from_vid_labels_index(*vid);
4188                } else {
4189                    self.storage.update_vid_labels_index(*vid, labels.clone());
4190                }
4191            }
4192            for (vid, _) in &tombstone_rows {
4193                self.storage.remove_from_vid_labels_index(*vid);
4194            }
4195
4196            // Update Manifest
4197            let current_snap =
4198                manifest
4199                    .vertices
4200                    .entry(label_name.to_string())
4201                    .or_insert(LabelSnapshot {
4202                        version: 0,
4203                        count: 0,
4204                        lance_version: 0,
4205                    });
4206            current_snap.version += 1;
4207            current_snap.count += v_data.len() as u64;
4208            // LanceDB tables don't expose Lance version directly
4209            current_snap.lance_version = 0;
4210
4211            // Invalidate table cache to ensure next read picks up new version
4212            self.storage.invalidate_table_cache(label_name);
4213
4214            // Apply inverted index updates incrementally
4215            #[cfg(feature = "lance-backend")]
4216            for idx in &schema.indexes {
4217                if let IndexDefinition::Inverted(cfg) = idx
4218                    && cfg.label == label_name
4219                    && let Some((added, removed)) = inverted_updates.get(&cfg.property)
4220                {
4221                    self.storage
4222                        .index_manager()
4223                        .update_inverted_index_incremental(cfg, added, removed)
4224                        .await?;
4225                }
4226            }
4227
4228            // Update UID index with new vertex mappings
4229            // Collect (UniId, Vid) mappings from non-deleted vertices
4230            #[cfg(feature = "lance-backend")]
4231            {
4232                let mut uid_mappings: Vec<(uni_common::core::id::UniId, Vid)> = Vec::new();
4233                for (vid, _labels, props) in &v_data {
4234                    let ext_id = props.get("ext_id").and_then(|v| v.as_str());
4235                    let uid = crate::storage::vertex::VertexDataset::compute_vertex_uid(
4236                        label_name, ext_id, props,
4237                    );
4238                    uid_mappings.push((uid, *vid));
4239                }
4240
4241                if !uid_mappings.is_empty()
4242                    && let Ok(uid_index) = self.storage.uid_index(label_name)
4243                {
4244                    // Stamp mappings with this flush's MVCC version so a later
4245                    // re-create of the same UID deterministically outranks the
4246                    // stale mapping (review C3).
4247                    uid_index
4248                        .write_mapping_versioned(&uid_mappings, current_version)
4249                        .await?;
4250                }
4251            }
4252        }
4253        Ok(FlushOutcome {
4254            manifest,
4255            snapshot_id,
4256        })
4257    }
4258
4259    /// Composition entry that assumes the caller already holds `flush_lock`.
4260    /// Runs rotate + stream + finalize_locked in sequence. Used by
4261    /// [`Writer::flush_to_l1`] (acquires the lock first) and by
4262    /// `commit_transaction_l0`'s post-merge auto-flush branch (which already
4263    /// holds the lock from the commit critical section).
4264    #[instrument(
4265        skip(self),
4266        fields(snapshot_id, mutations_count, size_bytes),
4267        level = "info"
4268    )]
4269    async fn flush_inline_under_lock(&self, name: Option<String>) -> Result<String> {
4270        let start = std::time::Instant::now();
4271
4272        let (initial_size, initial_count) = {
4273            let l0_arc = self.l0_manager.get_current();
4274            let l0 = l0_arc.read();
4275            (l0.estimated_size, l0.mutation_count)
4276        };
4277        tracing::Span::current().record("size_bytes", initial_size);
4278        tracing::Span::current().record("mutations_count", initial_count);
4279
4280        debug!("Starting L0 flush to L1");
4281
4282        // Phases A (WAL pre-flush), B (rotate), C (WAL handoff).
4283        // FlushInProgressGuard lives on RotateOutput and stays alive for
4284        // the full flush — including the finalize_locked call below.
4285        let RotateOutput {
4286            old_l0_arc,
4287            wal_lsn,
4288            current_version,
4289            flush_in_progress_guard: _flush_guard,
4290        } = self.flush_l0_rotate().await?;
4291
4292        // Phases D (L1 collect), E (orphan resolve), F (manifest seed),
4293        // G (Lance writes). Builds the manifest but does NOT publish it.
4294        let FlushOutcome {
4295            manifest,
4296            snapshot_id,
4297        } = self
4298            .flush_stream_l1(old_l0_arc.clone(), wal_lsn, current_version, name)
4299            .await?;
4300
4301        // Phases H..S: publish manifest, complete_flush, WAL truncate,
4302        // property cache clear, last_flush_time, metrics, l1_runs++,
4303        // compaction trigger, index-rebuild scheduling, fork tick.
4304        self.flush_finalize_locked(
4305            old_l0_arc,
4306            wal_lsn,
4307            manifest,
4308            snapshot_id,
4309            initial_size,
4310            initial_count,
4311            start,
4312        )
4313        .await
4314    }
4315
4316    /// Phases H..S of the flush: publish the manifest and run all
4317    /// post-publish bookkeeping. Assumes the caller already holds
4318    /// `flush_lock` — see [`Writer::flush_finalize_now`] for the
4319    /// lock-acquiring variant used by the async finalize path.
4320    #[allow(clippy::too_many_arguments)]
4321    async fn flush_finalize_locked(
4322        &self,
4323        old_l0_arc: Arc<RwLock<L0Buffer>>,
4324        wal_lsn: u64,
4325        manifest: SnapshotManifest,
4326        snapshot_id: String,
4327        initial_size: usize,
4328        initial_count: usize,
4329        start: std::time::Instant,
4330    ) -> Result<String> {
4331        Self::flush_finalize_body(
4332            &self.shared_ctx(),
4333            old_l0_arc,
4334            wal_lsn,
4335            manifest,
4336            snapshot_id,
4337            initial_size,
4338            initial_count,
4339            start,
4340        )
4341        .await
4342    }
4343
4344    /// Phases H..S of the flush, lock-acquiring variant. Used by the
4345    /// async-flush finalizer task (running on a spawned tokio task),
4346    /// which holds neither `&self` nor `flush_lock`. Briefly re-acquires
4347    /// `flush_lock` to serialize the publish boundary, then runs the
4348    /// same body as `flush_finalize_locked` but over a SharedFlushCtx.
4349    #[allow(clippy::too_many_arguments)]
4350    pub(crate) async fn flush_finalize_now(
4351        shared: SharedFlushCtx,
4352        old_l0_arc: Arc<RwLock<L0Buffer>>,
4353        wal_lsn: u64,
4354        manifest: SnapshotManifest,
4355        snapshot_id: String,
4356        initial_size: usize,
4357        initial_count: usize,
4358        start: std::time::Instant,
4359    ) -> Result<String> {
4360        let _flush_lock_guard = shared.flush_lock.clone().lock_owned().await;
4361        Self::flush_finalize_body(
4362            &shared,
4363            old_l0_arc,
4364            wal_lsn,
4365            manifest,
4366            snapshot_id,
4367            initial_size,
4368            initial_count,
4369            start,
4370        )
4371        .await
4372    }
4373
4374    /// Shared body of `flush_finalize_locked` and `flush_finalize_now`.
4375    /// Static over `SharedFlushCtx`; the caller is responsible for
4376    /// holding `flush_lock`.
4377    #[allow(clippy::too_many_arguments)]
4378    async fn flush_finalize_body(
4379        shared: &SharedFlushCtx,
4380        old_l0_arc: Arc<RwLock<L0Buffer>>,
4381        wal_lsn: u64,
4382        mut manifest: SnapshotManifest,
4383        snapshot_id: String,
4384        initial_size: usize,
4385        initial_count: usize,
4386        start: std::time::Instant,
4387    ) -> Result<String> {
4388        // Parent-snapshot fixup. The stream phase built `manifest` with
4389        // parent_snapshot set from cached_manifest at stream time. If
4390        // OTHER flushes (sync or async) have finalized since then,
4391        // cached_manifest has advanced. Re-link this manifest to the
4392        // current cached chain so we don't orphan their data when we
4393        // overwrite cached_manifest below.
4394        let current_parent_id = shared
4395            .cached_manifest
4396            .lock()
4397            .as_ref()
4398            .map(|m| m.snapshot_id.clone());
4399        if current_parent_id.is_some() && manifest.parent_snapshot != current_parent_id {
4400            manifest.parent_snapshot = current_parent_id;
4401            metrics::counter!("uni_flush_parent_chain_fixups_total").increment(1);
4402        }
4403
4404        // H. Publish manifest (body first, then pointer — recovery is
4405        // idempotent if we crash between the two).
4406        shared
4407            .storage
4408            .snapshot_manager()
4409            .save_snapshot(&manifest)
4410            .await?;
4411        shared
4412            .storage
4413            .snapshot_manager()
4414            .set_latest_snapshot(&manifest.snapshot_id)
4415            .await?;
4416
4417        // H2. Durability barrier (review C4). `save_snapshot` / `set_latest_snapshot`
4418        // wrote the manifest body and the `catalog/latest` pointer through the
4419        // object store, which does NOT fsync. WAL truncation (K, below) removes
4420        // the only other durable copy of this flush's data, so a crash after K
4421        // but before the OS flushed those writes would lose the snapshot —
4422        // recovery could not resolve `latest`. Make them durable now (local-fs
4423        // only; remote stores provide their own durability on `put`).
4424        crate::snapshot::manager::fsync_snapshot_pointer(
4425            shared.storage.local_fs_root().as_deref(),
4426            &manifest.snapshot_id,
4427        )
4428        .map_err(|e| {
4429            anyhow!(
4430                "fsync snapshot {} before WAL truncate: {}",
4431                manifest.snapshot_id,
4432                e
4433            )
4434        })?;
4435
4436        // I. Cache manifest for next flush to avoid re-reading from object store.
4437        *shared.cached_manifest.lock() = Some(manifest.clone());
4438
4439        // L. Invalidate the property cache BEFORE removing the flushed buffer
4440        // from the L0 chain (Bug #10). `clear_cache` has no dependency on the
4441        // complete_flush (J) / WAL-truncate (K) steps below, so clearing it
4442        // first closes the non-monotonic-read window: once the buffer leaves
4443        // the L0 chain at J a freshly-written value would otherwise miss the
4444        // chain and fall through to a stale cache entry. By the time finalize
4445        // runs the streamed rows are already durable in L1, so a post-clear
4446        // read falls through to fresh storage instead. The finalizer holds
4447        // `flush_lock` throughout, so reordering L ahead of J is safe.
4448        if let Some(ref pm) = shared.property_manager {
4449            pm.clear_cache().await;
4450        }
4451
4452        // J. Complete flush: remove old L0 from pending_flush. MUST happen
4453        // BEFORE WAL truncation so min_pending_wal_lsn is accurate.
4454        shared.l0_manager.complete_flush(&old_l0_arc);
4455
4456        // Test-only seam (no-op without the `failpoints` feature): pause AFTER
4457        // complete_flush removed the buffer from the L0 chain (J) but BEFORE
4458        // WAL truncation (K). The property cache is already cleared (L moved
4459        // ahead of J above), so a read in this window falls through to fresh
4460        // L1 storage rather than a stale cache entry (Bug #10 — non-monotonic
4461        // read after flush finalize).
4462        fail::fail_point!("flush::after-complete-before-cache-clear");
4463
4464        // K. Truncate WAL up to the safe LSN.
4465        let wal_handle = shared.l0_manager.get_current().read().wal.clone();
4466        if let Some(w) = wal_handle {
4467            let safe_lsn = shared
4468                .l0_manager
4469                .min_pending_wal_lsn()
4470                .map(|min_pending| min_pending.min(wal_lsn))
4471                .unwrap_or(wal_lsn);
4472            w.truncate_before(safe_lsn).await?;
4473        }
4474
4475        // M. Reset last flush time for time-based auto-flush.
4476        *shared.last_flush_time.lock() = std::time::Instant::now();
4477
4478        info!(
4479            snapshot_id,
4480            mutations_count = initial_count,
4481            size_bytes = initial_size,
4482            "L0 flush to L1 completed successfully"
4483        );
4484        metrics::histogram!("uni_flush_duration_seconds").record(start.elapsed().as_secs_f64());
4485        metrics::counter!("uni_flush_bytes_total").increment(initial_size as u64);
4486        metrics::counter!("uni_flush_rows_total").increment(initial_count as u64);
4487
4488        // P. Increment flush generation counter for write throttling.
4489        {
4490            let mut status = uni_common::sync::acquire_mutex(
4491                &shared.storage.compaction_status,
4492                "compaction_status",
4493            )?;
4494            status.l1_runs += 1;
4495        }
4496
4497        // Q. Trigger CSR compaction if enough frozen segments have accumulated.
4498        let am = shared.adjacency_manager.clone();
4499        if am.should_compact(shared.compaction_config.frozen_segments_compact_threshold) {
4500            let previous_still_running = {
4501                let guard = shared.compaction_handle.read();
4502                guard.as_ref().is_some_and(|h| !h.is_finished())
4503            };
4504            if previous_still_running {
4505                info!("Skipping compaction: previous compaction still in progress");
4506            } else {
4507                let handle = tokio::spawn(async move {
4508                    am.compact();
4509                });
4510                *shared.compaction_handle.write() = Some(handle);
4511            }
4512        }
4513
4514        // R. Post-flush: check if any indexes need rebuilding based on thresholds.
4515        if shared.auto_rebuild_enabled
4516            && let Some(rebuild_mgr) = shared.index_rebuild_manager.get()
4517        {
4518            Self::schedule_index_rebuilds_if_needed_static(
4519                &manifest,
4520                rebuild_mgr.clone(),
4521                shared.schema_manager.clone(),
4522                shared.index_rebuild_config.clone(),
4523            );
4524        }
4525
4526        // S. Emit fork-fragment observability after a successful forked flush.
4527        Self::tick_fork_fragment_observability_static(
4528            shared.fork_id,
4529            shared.fork_flush_count.clone(),
4530            shared.fork_fragment_warn_fired.clone(),
4531            shared.fork_fragment_warn_threshold,
4532        );
4533
4534        Ok(snapshot_id)
4535    }
4536
4537    /// Increment fork-flush bookkeeping and fire the fragment warn
4538    /// once if the threshold is crossed.
4539    ///
4540    /// Each flush typically appends ~1 fragment per touched dataset on
4541    /// the fork's branches; without compaction (deferred to Phase 5)
4542    /// long-lived heavy-write forks degrade. The flush count is a
4543    /// proxy for actual fragment growth — reading
4544    /// `Dataset::manifest().fragments.len()` per dataset would add a
4545    /// per-flush object-store roundtrip on the hot commit path, which
4546    /// is too costly for a purely observational guard rail.
4547    ///
4548    /// No-op for primary writers (`fork_id == None`).
4549    #[allow(dead_code)] // called by tests; production path uses _static
4550    pub(crate) fn tick_fork_fragment_observability(&self) {
4551        Self::tick_fork_fragment_observability_static(
4552            self.fork_id,
4553            self.fork_flush_count.clone(),
4554            self.fork_fragment_warn_fired.clone(),
4555            self.config.fork_fragment_warn_threshold,
4556        );
4557    }
4558
4559    /// Static variant of [`Writer::tick_fork_fragment_observability`].
4560    /// Used by the async-flush finalize path, where we hold a
4561    /// [`SharedFlushCtx`] bundle of Arcs rather than `&Writer`.
4562    pub(crate) fn tick_fork_fragment_observability_static(
4563        fork_id: Option<ForkId>,
4564        fork_flush_count: Arc<AtomicU64>,
4565        fork_fragment_warn_fired: Arc<AtomicBool>,
4566        warn_threshold: usize,
4567    ) {
4568        let Some(fork_id) = fork_id else { return };
4569        // `Relaxed` is sufficient: observational counter, no synchronizes-with.
4570        let new_count = fork_flush_count.fetch_add(1, Ordering::Relaxed) + 1;
4571        let fork_label = fork_id.to_string();
4572        metrics::gauge!(
4573            "uni_fork_l1_flushes",
4574            "fork" => fork_label.clone(),
4575        )
4576        .set(new_count as f64);
4577        let threshold = warn_threshold as u64;
4578        if !fork_fragment_warn_fired.load(Ordering::Relaxed)
4579            && threshold > 0
4580            && new_count >= threshold
4581        {
4582            fork_fragment_warn_fired.store(true, Ordering::Relaxed);
4583            tracing::warn!(
4584                fork = %fork_label,
4585                flush_count = new_count,
4586                threshold,
4587                "fork has exceeded the L1 flush-count threshold; \
4588                 fork compaction is deferred to Phase 5 — consider \
4589                 drop+recreate or promotion to bound fragment growth"
4590            );
4591        }
4592    }
4593
4594    /// Check rebuild thresholds and schedule background index rebuilds for
4595    /// labels that exceed growth or age limits. Marks affected indexes as
4596    /// `Stale` and spawns an async task to schedule the rebuild.
4597    #[allow(dead_code)] // production path uses _static; kept as the
4598    // documented instance entry point.
4599    fn schedule_index_rebuilds_if_needed(
4600        &self,
4601        manifest: &SnapshotManifest,
4602        rebuild_mgr: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
4603    ) {
4604        Self::schedule_index_rebuilds_if_needed_static(
4605            manifest,
4606            rebuild_mgr,
4607            self.schema_manager.clone(),
4608            self.config.index_rebuild.clone(),
4609        );
4610    }
4611
4612    /// Static variant of [`Writer::schedule_index_rebuilds_if_needed`].
4613    /// Used by the async-flush finalize path, where we hold the
4614    /// [`SchemaManager`] via `SharedFlushCtx` rather than `&Writer`.
4615    pub(crate) fn schedule_index_rebuilds_if_needed_static(
4616        manifest: &SnapshotManifest,
4617        rebuild_mgr: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
4618        schema_manager: Arc<uni_common::core::schema::SchemaManager>,
4619        index_rebuild_config: uni_common::config::IndexRebuildConfig,
4620    ) {
4621        let checker =
4622            crate::storage::index_rebuild::RebuildTriggerChecker::new(index_rebuild_config);
4623        let schema = schema_manager.schema();
4624        let labels = checker.labels_needing_rebuild(manifest, &schema.indexes);
4625
4626        if labels.is_empty() {
4627            return;
4628        }
4629
4630        // Mark affected indexes as Stale
4631        for label in &labels {
4632            for idx in &schema.indexes {
4633                if idx.label() == label {
4634                    let _ = schema_manager.update_index_metadata(idx.name(), |m| {
4635                        m.status = uni_common::core::schema::IndexStatus::Stale;
4636                    });
4637                }
4638            }
4639        }
4640
4641        tokio::spawn(async move {
4642            if let Err(e) = rebuild_mgr.schedule(labels).await {
4643                tracing::warn!("Failed to schedule index rebuild: {e}");
4644            }
4645        });
4646    }
4647}
4648
4649/// `FinalizeFn` implementation that the `FlushCoordinator` invokes from
4650/// its single-task finalizer loop. Unit struct on purpose: it must NOT
4651/// hold `Arc<Writer>` (that would create a reference cycle Writer ->
4652/// FlushCoordinator -> Arc<dyn FinalizeFn> -> Writer). All state needed
4653/// for finalize travels in via `SharedFlushCtx`.
4654pub(crate) struct WriterFinalizer;
4655
4656impl FinalizeFn for WriterFinalizer {
4657    fn finalize<'a>(
4658        &'a self,
4659        rotated: RotatedFlush,
4660        outcome: AsyncFlushOutcome,
4661        shared: SharedFlushCtx,
4662    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<String>> + Send + 'a>> {
4663        Box::pin(async move {
4664            // Read initial_size / initial_count from the rotated L0 so
4665            // we don't have to plumb them through the coordinator
4666            // submission. The buffer is still alive in pending_flush
4667            // until `complete_flush` (J) below pops it.
4668            let (initial_size, initial_count) = {
4669                let l0 = rotated.old_l0_arc.read();
4670                (l0.estimated_size, l0.mutation_count)
4671            };
4672            let result = Writer::flush_finalize_now(
4673                shared,
4674                rotated.old_l0_arc.clone(),
4675                rotated.wal_lsn,
4676                outcome.new_manifest,
4677                outcome.snapshot_id,
4678                initial_size,
4679                initial_count,
4680                std::time::Instant::now(),
4681            )
4682            .await;
4683            // `rotated` (permit + flush_in_progress_guard) drops here.
4684            drop(rotated.permit);
4685            result
4686        })
4687    }
4688
4689    fn finalize_failure<'a>(
4690        &'a self,
4691        rotated: RotatedFlush,
4692        err: anyhow::Error,
4693        _shared: SharedFlushCtx,
4694    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Error> + Send + 'a>> {
4695        Box::pin(async move {
4696            tracing::warn!(
4697                error = %err,
4698                seq = rotated.seq,
4699                "async flush stream failed; old L0 remains in pending_flush, \
4700                 WAL retains its data, recovery via WAL replay on restart"
4701            );
4702            metrics::counter!("uni_flush_failures_total").increment(1);
4703            // Permit + guard drop here so back-pressure releases even on
4704            // failure.
4705            drop(rotated.permit);
4706            err
4707        })
4708    }
4709}
4710
4711#[cfg(test)]
4712mod tests {
4713    use super::*;
4714    use tempfile::tempdir;
4715
4716    /// Test that commit_transaction writes mutations to WAL before merging to main L0.
4717    /// This verifies fix for issue #137 (transaction commit atomicity).
4718    #[tokio::test]
4719    async fn test_commit_transaction_wal_before_merge() -> Result<()> {
4720        use crate::runtime::wal::WriteAheadLog;
4721        use crate::storage::manager::StorageManager;
4722        use object_store::local::LocalFileSystem;
4723        use object_store::path::Path as ObjectStorePath;
4724        use uni_common::core::schema::SchemaManager;
4725
4726        let dir = tempdir()?;
4727        let path = dir.path().to_str().unwrap();
4728        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
4729        let schema_path = ObjectStorePath::from("schema.json");
4730
4731        let schema_manager =
4732            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
4733        let _label_id = schema_manager.add_label("Test")?;
4734        schema_manager.save().await?;
4735
4736        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
4737
4738        // Create WAL for main L0
4739        let wal_path = ObjectStorePath::from("wal");
4740        let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
4741
4742        let writer = Writer::new_with_config(
4743            storage.clone(),
4744            schema_manager.clone(),
4745            1,
4746            UniConfig::default(),
4747            Some(wal),
4748            None,
4749        )
4750        .await?;
4751
4752        // Begin transaction — create a transaction L0
4753        let tx_l0 = writer.create_transaction_l0();
4754
4755        // Insert data in transaction
4756        let vid_a = writer.next_vid().await?;
4757        let vid_b = writer.next_vid().await?;
4758
4759        let mut props = std::collections::HashMap::new();
4760        props.insert("test".to_string(), Value::String("data".to_string()));
4761
4762        writer
4763            .insert_vertex_with_labels(vid_a, props.clone(), &["Test".to_string()], Some(&tx_l0))
4764            .await?;
4765        writer
4766            .insert_vertex_with_labels(
4767                vid_b,
4768                std::collections::HashMap::new(),
4769                &["Test".to_string()],
4770                Some(&tx_l0),
4771            )
4772            .await?;
4773
4774        let eid = writer.next_eid(1).await?;
4775        writer
4776            .insert_edge(
4777                vid_a,
4778                vid_b,
4779                1,
4780                eid,
4781                std::collections::HashMap::new(),
4782                None,
4783                Some(&tx_l0),
4784            )
4785            .await?;
4786
4787        // Get WAL before commit
4788        let l0 = writer.l0_manager.get_current();
4789        let wal = l0.read().wal.clone().expect("Main L0 should have WAL");
4790        let mutations_before = wal.replay().await?;
4791        let count_before = mutations_before.len();
4792
4793        // Commit transaction - this should write to WAL first
4794        let writer = Arc::new(writer);
4795        writer.commit_transaction_l0(tx_l0).await?;
4796
4797        // Verify WAL has the new mutations
4798        let mutations_after = wal.replay().await?;
4799        assert!(
4800            mutations_after.len() > count_before,
4801            "WAL should contain transaction mutations after commit"
4802        );
4803
4804        // Verify mutations are in correct order: vertices first, then edges
4805        let new_mutations: Vec<_> = mutations_after.into_iter().skip(count_before).collect();
4806
4807        let mut saw_vertex_a = false;
4808        let mut saw_vertex_b = false;
4809        let mut saw_edge = false;
4810
4811        for mutation in &new_mutations {
4812            match mutation {
4813                crate::runtime::wal::Mutation::InsertVertex { vid, .. } => {
4814                    if *vid == vid_a {
4815                        saw_vertex_a = true;
4816                    }
4817                    if *vid == vid_b {
4818                        saw_vertex_b = true;
4819                    }
4820                    // Vertices should come before edges
4821                    assert!(!saw_edge, "Vertices should be logged to WAL before edges");
4822                }
4823                crate::runtime::wal::Mutation::InsertEdge { eid: e, .. } => {
4824                    if *e == eid {
4825                        saw_edge = true;
4826                    }
4827                    // Edges should come after vertices
4828                    assert!(
4829                        saw_vertex_a && saw_vertex_b,
4830                        "Edge should be logged after both vertices"
4831                    );
4832                }
4833                _ => {}
4834            }
4835        }
4836
4837        assert!(saw_vertex_a, "Vertex A should be in WAL");
4838        assert!(saw_vertex_b, "Vertex B should be in WAL");
4839        assert!(saw_edge, "Edge should be in WAL");
4840
4841        // Verify data is also in main L0
4842        let l0_read = l0.read();
4843        assert!(
4844            l0_read.vertex_properties.contains_key(&vid_a),
4845            "Vertex A should be in main L0"
4846        );
4847        assert!(
4848            l0_read.vertex_properties.contains_key(&vid_b),
4849            "Vertex B should be in main L0"
4850        );
4851        assert!(
4852            l0_read.edge_endpoints.contains_key(&eid),
4853            "Edge should be in main L0"
4854        );
4855
4856        Ok(())
4857    }
4858
4859    /// Test that failed WAL flush leaves transaction intact for retry or rollback.
4860    #[tokio::test]
4861    async fn test_commit_transaction_wal_failure_rollback() -> Result<()> {
4862        use crate::runtime::wal::WriteAheadLog;
4863        use crate::storage::manager::StorageManager;
4864        use object_store::local::LocalFileSystem;
4865        use object_store::path::Path as ObjectStorePath;
4866        use uni_common::core::schema::SchemaManager;
4867
4868        let dir = tempdir()?;
4869        let path = dir.path().to_str().unwrap();
4870        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
4871        let schema_path = ObjectStorePath::from("schema.json");
4872
4873        let schema_manager =
4874            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
4875        let _label_id = schema_manager.add_label("Test")?;
4876        let _baseline_label_id = schema_manager.add_label("Baseline")?;
4877        let _txdata_label_id = schema_manager.add_label("TxData")?;
4878        schema_manager.save().await?;
4879
4880        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
4881
4882        // Create WAL for main L0
4883        let wal_path = ObjectStorePath::from("wal");
4884        let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
4885
4886        let writer = Writer::new_with_config(
4887            storage.clone(),
4888            schema_manager.clone(),
4889            1,
4890            UniConfig::default(),
4891            Some(wal),
4892            None,
4893        )
4894        .await?;
4895
4896        // Insert baseline data (outside transaction)
4897        let baseline_vid = writer.next_vid().await?;
4898        writer
4899            .insert_vertex_with_labels(
4900                baseline_vid,
4901                [("baseline".to_string(), Value::Bool(true))]
4902                    .into_iter()
4903                    .collect(),
4904                &["Baseline".to_string()],
4905                None,
4906            )
4907            .await?;
4908
4909        // Begin transaction — create a transaction L0
4910        let tx_l0 = writer.create_transaction_l0();
4911
4912        // Insert data in transaction
4913        let tx_vid = writer.next_vid().await?;
4914        writer
4915            .insert_vertex_with_labels(
4916                tx_vid,
4917                [("tx_data".to_string(), Value::Bool(true))]
4918                    .into_iter()
4919                    .collect(),
4920                &["TxData".to_string()],
4921                Some(&tx_l0),
4922            )
4923            .await?;
4924
4925        // Capture main L0 state before rollback
4926        let l0 = writer.l0_manager.get_current();
4927        let vertex_count_before = l0.read().vertex_properties.len();
4928
4929        // Rollback transaction (simulating what would happen after WAL flush failure)
4930        drop(tx_l0);
4931
4932        // Verify main L0 is unchanged
4933        let vertex_count_after = l0.read().vertex_properties.len();
4934        assert_eq!(
4935            vertex_count_before, vertex_count_after,
4936            "Main L0 should not change after rollback"
4937        );
4938
4939        // Baseline should still be present
4940        assert!(
4941            l0.read().vertex_properties.contains_key(&baseline_vid),
4942            "Baseline data should remain"
4943        );
4944
4945        // Transaction data should NOT be in main L0
4946        assert!(
4947            !l0.read().vertex_properties.contains_key(&tx_vid),
4948            "Transaction data should not be in main L0 after rollback"
4949        );
4950
4951        Ok(())
4952    }
4953
4954    /// Test that batch insert with shared labels does not clone labels per vertex.
4955    /// This verifies fix for issue #161 (redundant label cloning).
4956    #[tokio::test]
4957    async fn test_batch_insert_shared_labels() -> Result<()> {
4958        use crate::storage::manager::StorageManager;
4959        use object_store::local::LocalFileSystem;
4960        use object_store::path::Path as ObjectStorePath;
4961        use uni_common::core::schema::SchemaManager;
4962
4963        let dir = tempdir()?;
4964        let path = dir.path().to_str().unwrap();
4965        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
4966        let schema_path = ObjectStorePath::from("schema.json");
4967
4968        let schema_manager =
4969            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
4970        let _label_id = schema_manager.add_label("Person")?;
4971        schema_manager.save().await?;
4972
4973        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
4974
4975        let writer = Writer::new(storage.clone(), schema_manager.clone(), 1).await?;
4976
4977        // Shared labels - should not be cloned per vertex
4978        let labels = &["Person".to_string()];
4979
4980        // Insert batch of vertices with same labels
4981        let mut vids = Vec::new();
4982        for i in 0..100 {
4983            let vid = writer.next_vid().await?;
4984            let mut props = std::collections::HashMap::new();
4985            props.insert("id".to_string(), Value::Int(i));
4986            writer
4987                .insert_vertex_with_labels(vid, props, labels, None)
4988                .await?;
4989            vids.push(vid);
4990        }
4991
4992        // Verify all vertices have the correct labels
4993        let l0 = writer.l0_manager.get_current();
4994        for vid in vids {
4995            let l0_guard = l0.read();
4996            let vertex_labels = l0_guard.vertex_labels.get(&vid);
4997            assert!(vertex_labels.is_some(), "Vertex should have labels");
4998            assert_eq!(
4999                vertex_labels.unwrap(),
5000                &vec!["Person".to_string()],
5001                "Labels should match"
5002            );
5003        }
5004
5005        Ok(())
5006    }
5007
5008    /// Test that estimated_size tracks mutations correctly and approximates size_bytes().
5009    /// This verifies fix for issue #147 (O(V+E) size_bytes() in metrics).
5010    #[tokio::test]
5011    async fn test_estimated_size_tracks_mutations() -> Result<()> {
5012        use crate::storage::manager::StorageManager;
5013        use object_store::local::LocalFileSystem;
5014        use object_store::path::Path as ObjectStorePath;
5015        use uni_common::core::schema::SchemaManager;
5016
5017        let dir = tempdir()?;
5018        let path = dir.path().to_str().unwrap();
5019        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5020        let schema_path = ObjectStorePath::from("schema.json");
5021
5022        let schema_manager =
5023            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5024        let _label_id = schema_manager.add_label("Test")?;
5025        schema_manager.save().await?;
5026
5027        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5028
5029        let writer = Writer::new(storage.clone(), schema_manager.clone(), 1).await?;
5030
5031        let l0 = writer.l0_manager.get_current();
5032
5033        // Initial state should be empty
5034        let initial_estimated = l0.read().estimated_size;
5035        let initial_actual = l0.read().size_bytes();
5036        assert_eq!(initial_estimated, 0, "Initial estimated_size should be 0");
5037        assert_eq!(initial_actual, 0, "Initial size_bytes should be 0");
5038
5039        // Insert vertices with properties
5040        let mut vids = Vec::new();
5041        for i in 0..10 {
5042            let vid = writer.next_vid().await?;
5043            let mut props = std::collections::HashMap::new();
5044            props.insert("name".to_string(), Value::String(format!("vertex_{}", i)));
5045            props.insert("index".to_string(), Value::Int(i));
5046            writer
5047                .insert_vertex_with_labels(vid, props, &[], None)
5048                .await?;
5049            vids.push(vid);
5050        }
5051
5052        // Verify estimated_size grew
5053        let after_vertices_estimated = l0.read().estimated_size;
5054        let after_vertices_actual = l0.read().size_bytes();
5055        assert!(
5056            after_vertices_estimated > 0,
5057            "estimated_size should grow after insertions"
5058        );
5059
5060        // Verify estimated_size is within reasonable bounds of actual size (within 2x)
5061        let ratio = after_vertices_estimated as f64 / after_vertices_actual as f64;
5062        assert!(
5063            (0.5..=2.0).contains(&ratio),
5064            "estimated_size ({}) should be within 2x of size_bytes ({}), ratio: {}",
5065            after_vertices_estimated,
5066            after_vertices_actual,
5067            ratio
5068        );
5069
5070        // Insert edges with a simple edge type
5071        let edge_type = 1u32;
5072        for i in 0..9 {
5073            let eid = writer.next_eid(edge_type).await?;
5074            writer
5075                .insert_edge(
5076                    vids[i],
5077                    vids[i + 1],
5078                    edge_type,
5079                    eid,
5080                    std::collections::HashMap::new(),
5081                    Some("NEXT".to_string()),
5082                    None,
5083                )
5084                .await?;
5085        }
5086
5087        // Verify estimated_size grew further
5088        let after_edges_estimated = l0.read().estimated_size;
5089        let after_edges_actual = l0.read().size_bytes();
5090        assert!(
5091            after_edges_estimated > after_vertices_estimated,
5092            "estimated_size should grow after edge insertions"
5093        );
5094
5095        // Verify still within reasonable bounds
5096        let ratio = after_edges_estimated as f64 / after_edges_actual as f64;
5097        assert!(
5098            (0.5..=2.0).contains(&ratio),
5099            "estimated_size ({}) should be within 2x of size_bytes ({}), ratio: {}",
5100            after_edges_estimated,
5101            after_edges_actual,
5102            ratio
5103        );
5104
5105        Ok(())
5106    }
5107
5108    /// Test that flushing WAL on a writer with no mutations succeeds cleanly.
5109    #[tokio::test]
5110    async fn test_flush_wal_empty_l0_is_noop() -> Result<()> {
5111        use crate::runtime::wal::WriteAheadLog;
5112        use crate::storage::manager::StorageManager;
5113        use object_store::local::LocalFileSystem;
5114        use object_store::path::Path as ObjectStorePath;
5115        use uni_common::core::schema::SchemaManager;
5116
5117        let dir = tempdir()?;
5118        let path = dir.path().to_str().unwrap();
5119        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5120        let schema_path = ObjectStorePath::from("schema.json");
5121
5122        let schema_manager =
5123            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5124        schema_manager.save().await?;
5125
5126        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5127
5128        let wal_path = ObjectStorePath::from("wal");
5129        let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
5130
5131        let writer = Writer::new_with_config(
5132            storage.clone(),
5133            schema_manager.clone(),
5134            1,
5135            UniConfig::default(),
5136            Some(wal.clone()),
5137            None,
5138        )
5139        .await?;
5140
5141        // Flush with no mutations — should succeed cleanly
5142        let lsn = writer.flush_wal().await?;
5143        // LSN should be 0 or 1 (no real mutations flushed)
5144        assert!(lsn <= 1, "Empty flush should produce low LSN, got {}", lsn);
5145
5146        Ok(())
5147    }
5148
5149    /// Test that transaction data does not leak into main L0 without commit.
5150    #[tokio::test]
5151    async fn test_transaction_isolation_without_commit() -> Result<()> {
5152        use crate::runtime::wal::WriteAheadLog;
5153        use crate::storage::manager::StorageManager;
5154        use object_store::local::LocalFileSystem;
5155        use object_store::path::Path as ObjectStorePath;
5156        use uni_common::core::schema::SchemaManager;
5157
5158        let dir = tempdir()?;
5159        let path = dir.path().to_str().unwrap();
5160        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5161        let schema_path = ObjectStorePath::from("schema.json");
5162
5163        let schema_manager =
5164            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5165        let _label_id = schema_manager.add_label("Person")?;
5166        schema_manager.save().await?;
5167
5168        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5169
5170        let wal_path = ObjectStorePath::from("wal");
5171        let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
5172
5173        let writer = Writer::new_with_config(
5174            storage.clone(),
5175            schema_manager.clone(),
5176            1,
5177            UniConfig::default(),
5178            Some(wal),
5179            None,
5180        )
5181        .await?;
5182
5183        // Create transaction L0
5184        let tx_l0 = writer.create_transaction_l0();
5185
5186        // Insert vertex into transaction L0
5187        let vid = writer.next_vid().await?;
5188        writer
5189            .insert_vertex_with_labels(
5190                vid,
5191                [("name".to_string(), Value::String("Ghost".to_string()))]
5192                    .into_iter()
5193                    .collect(),
5194                &["Person".to_string()],
5195                Some(&tx_l0),
5196            )
5197            .await?;
5198
5199        // Verify data is in transaction L0
5200        assert!(
5201            tx_l0.read().vertex_properties.contains_key(&vid),
5202            "Transaction L0 should contain the vertex"
5203        );
5204
5205        // Verify data is NOT in main L0
5206        let main_l0 = writer.l0_manager.get_current();
5207        assert!(
5208            !main_l0.read().vertex_properties.contains_key(&vid),
5209            "Main L0 should NOT contain uncommitted transaction data"
5210        );
5211
5212        // Drop transaction without committing — data should be lost
5213        drop(tx_l0);
5214
5215        // Main L0 still should not have it
5216        assert!(
5217            !main_l0.read().vertex_properties.contains_key(&vid),
5218            "Main L0 should remain clean after dropped transaction"
5219        );
5220
5221        Ok(())
5222    }
5223
5224    /// Phase 2 Day 12: the fork-fragment warn fires exactly once when
5225    /// the flush count crosses the configured threshold and stays
5226    /// silent on subsequent flushes for the lifetime of the writer.
5227    /// Primary writers (`fork_id == None`) never fire it.
5228    ///
5229    /// Tested directly against `tick_fork_fragment_observability` so
5230    /// the contract is locked in independently of the broader
5231    /// `flush_to_l1` path (the end-to-end fork-flush path is blocked
5232    /// on Day 10's on-the-fly schema overlay growth).
5233    #[tokio::test]
5234    async fn fork_fragment_warn_fires_once_then_silences() -> Result<()> {
5235        use crate::storage::manager::StorageManager;
5236        use object_store::local::LocalFileSystem;
5237        use object_store::path::Path as ObjectStorePath;
5238        use uni_common::core::fork::ForkId;
5239        use uni_common::core::schema::SchemaManager;
5240
5241        let dir = tempdir()?;
5242        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5243        let schema_path = ObjectStorePath::from("schema.json");
5244        let schema_manager =
5245            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5246        let storage = Arc::new(
5247            StorageManager::new(dir.path().to_str().unwrap(), schema_manager.clone()).await?,
5248        );
5249
5250        let config = UniConfig {
5251            fork_fragment_warn_threshold: 3,
5252            ..Default::default()
5253        };
5254        let mut writer =
5255            Writer::new_with_config(storage, schema_manager, 1, config, None, None).await?;
5256
5257        // Primary path: never fires.
5258        for _ in 0..10 {
5259            writer.tick_fork_fragment_observability();
5260        }
5261        assert!(!writer.fork_fragment_warn_fired.load(Ordering::Relaxed));
5262        assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 0);
5263
5264        // Fork path: tag and tick. Below threshold → no fire.
5265        writer.fork_id = Some(ForkId::new());
5266        writer.tick_fork_fragment_observability();
5267        writer.tick_fork_fragment_observability();
5268        assert!(!writer.fork_fragment_warn_fired.load(Ordering::Relaxed));
5269        assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 2);
5270
5271        // Crossing threshold → fires once.
5272        writer.tick_fork_fragment_observability();
5273        assert!(writer.fork_fragment_warn_fired.load(Ordering::Relaxed));
5274        assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 3);
5275
5276        // Subsequent ticks bump the gauge but do not re-fire.
5277        let fired_after = writer.fork_fragment_warn_fired.load(Ordering::Relaxed);
5278        for _ in 0..5 {
5279            writer.tick_fork_fragment_observability();
5280        }
5281        assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 8);
5282        assert_eq!(
5283            writer.fork_fragment_warn_fired.load(Ordering::Relaxed),
5284            fired_after
5285        );
5286
5287        Ok(())
5288    }
5289
5290    /// The hot-path mutators must not write to any `Writer` struct field.
5291    /// Phase 2 of the refactor
5292    /// gave them `&self` receivers, which the compiler enforces against
5293    /// direct `self.x = y` assignment — but interior-mutable writes
5294    /// (Mutex/Atomic/OnceLock) still compile. This regression test snapshots
5295    /// every potentially-writable field, calls each hot-path mutator, and
5296    /// asserts no field changed.
5297    ///
5298    /// Cold-path methods (`flush_to_l1`, `commit_transaction_l0`,
5299    /// `tick_fork_fragment_observability`) DO mutate fields by design and
5300    /// are intentionally out of scope here.
5301    #[tokio::test]
5302    async fn hot_path_mutators_do_not_change_writer_fields() -> Result<()> {
5303        use crate::storage::manager::StorageManager;
5304        use object_store::local::LocalFileSystem;
5305        use object_store::path::Path as ObjectStorePath;
5306        use uni_common::core::schema::SchemaManager;
5307
5308        let dir = tempdir()?;
5309        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5310        let schema_path = ObjectStorePath::from("schema.json");
5311        let schema_manager =
5312            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5313        schema_manager.add_label("Person")?;
5314        schema_manager.save().await?;
5315        let storage = Arc::new(
5316            StorageManager::new(dir.path().to_str().unwrap(), schema_manager.clone()).await?,
5317        );
5318
5319        let writer =
5320            Writer::new_with_config(storage, schema_manager, 1, UniConfig::default(), None, None)
5321                .await?;
5322
5323        /// Captures every `Writer` field that *could* be written by a
5324        /// hot-path mutator (i.e., every non-Arc, non-immutable-after-
5325        /// construction field). Arc'd substructures (`l0_manager`,
5326        /// `storage`, etc.) are intentionally not checked — they are
5327        /// re-pointed only at construction.
5328        #[derive(Debug, PartialEq)]
5329        struct Snapshot {
5330            last_flush_time: std::time::Instant,
5331            cached_manifest_some: bool,
5332            fork_flush_count: u64,
5333            fork_fragment_warn_fired: bool,
5334            xervo_runtime_some: bool,
5335            index_rebuild_manager_some: bool,
5336            fork_id: Option<ForkId>,
5337        }
5338
5339        fn snap(w: &Writer) -> Snapshot {
5340            Snapshot {
5341                last_flush_time: *w.last_flush_time.lock(),
5342                cached_manifest_some: w.cached_manifest.lock().is_some(),
5343                fork_flush_count: w.fork_flush_count.load(Ordering::Relaxed),
5344                fork_fragment_warn_fired: w.fork_fragment_warn_fired.load(Ordering::Relaxed),
5345                xervo_runtime_some: w.xervo_runtime.get().is_some(),
5346                index_rebuild_manager_some: w.index_rebuild_manager.get().is_some(),
5347                fork_id: w.fork_id,
5348            }
5349        }
5350
5351        // 1. insert_vertex_with_labels
5352        let before = snap(&writer);
5353        let vid = writer.next_vid().await?;
5354        writer
5355            .insert_vertex_with_labels(vid, Properties::new(), &["Person".to_string()], None)
5356            .await?;
5357        assert_eq!(
5358            snap(&writer),
5359            before,
5360            "insert_vertex_with_labels mutated a Writer field"
5361        );
5362
5363        // 2. insert_vertices_batch
5364        let before = snap(&writer);
5365        let vids = writer.allocate_vids(2).await?;
5366        writer
5367            .insert_vertices_batch(
5368                vids,
5369                vec![Properties::new(), Properties::new()],
5370                vec!["Person".into()],
5371                None,
5372            )
5373            .await?;
5374        assert_eq!(
5375            snap(&writer),
5376            before,
5377            "insert_vertices_batch mutated a Writer field"
5378        );
5379
5380        // 3. delete_vertex
5381        let before = snap(&writer);
5382        writer.delete_vertex(vid, None, None).await?;
5383        assert_eq!(
5384            snap(&writer),
5385            before,
5386            "delete_vertex mutated a Writer field"
5387        );
5388
5389        // (insert_edge / delete_edge are skipped here: their fixture cost is
5390        // disproportionate to the audit's marginal value, and the same
5391        // structural argument plus the compiler-enforced `&self` covers them.)
5392
5393        Ok(())
5394    }
5395}