Skip to main content

uni_store/runtime/
writer.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::runtime::context::QueryContext;
5use crate::runtime::flush_coordinator::{
6    FinalizeFn, FlushCoordinator, FlushOutcome as AsyncFlushOutcome, RotatedFlush, SharedFlushCtx,
7};
8use crate::runtime::id_allocator::IdAllocator;
9use crate::runtime::l0::{L0Buffer, serialize_constraint_key};
10use crate::runtime::l0_manager::L0Manager;
11use crate::runtime::property_manager::PropertyManager;
12use crate::runtime::wal::WriteAheadLog;
13use crate::storage::adjacency_manager::AdjacencyManager;
14use crate::storage::delta::{L1Entry, Op};
15use crate::storage::main_edge::MainEdgeDataset;
16use crate::storage::main_vertex::MainVertexDataset;
17use crate::storage::manager::StorageManager;
18use anyhow::{Result, anyhow};
19use chrono::Utc;
20use metrics;
21use parking_lot::{Mutex as PlMutex, RwLock};
22use std::collections::{HashMap, HashSet};
23use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
24use std::sync::{Arc, OnceLock};
25use tracing::{debug, info, instrument};
26use uni_common::Properties;
27use uni_common::Value;
28use uni_common::config::UniConfig;
29use uni_common::core::fork::ForkId;
30use uni_common::core::id::{Eid, Vid};
31use uni_common::core::schema::{ConstraintTarget, ConstraintType, IndexDefinition};
32use uni_common::core::snapshot::{EdgeSnapshot, LabelSnapshot, SnapshotManifest};
33use uni_xervo::runtime::ModelRuntime;
34use uuid::Uuid;
35
36#[derive(Clone, Debug)]
37pub struct WriterConfig {
38    pub max_mutations: usize,
39    /// Enable the partial-column MergeInsert path for SET-only flushes.
40    ///
41    /// When `true`, `Writer::insert_vertex_partial` records the touched
42    /// property keys into `L0Buffer::vertex_partial_keys` and the flush
43    /// routes those VIDs through Lance `MergeInsertBuilder` with a
44    /// subset-of-schema source, skipping the read of (and write of)
45    /// the unchanged columns — including wide ones like embeddings.
46    ///
47    /// When `false`, `insert_vertex_partial` falls back to the
48    /// read-modify-write `insert_vertex_with_labels` path (preserving
49    /// bit-for-bit equivalence with prior releases). Default `false`
50    /// for the first release; flip to `true` after telemetry on the
51    /// issue #72 ingest workload confirms the win.
52    ///
53    /// See the soundness probe at
54    /// `crates/uni-store/tests/common/storage/lance_merge_insert_probe.rs`.
55    pub partial_lance_writes: bool,
56}
57
58impl Default for WriterConfig {
59    fn default() -> Self {
60        Self {
61            max_mutations: 10_000,
62            partial_lance_writes: false,
63        }
64    }
65}
66
67/// RAII latch on [`StorageManager::flush_in_progress`].
68///
69/// Sets the flag to `true` on construction (via CAS) and back to `false` on
70/// drop, so any `?` early-exit inside `flush_to_l1` cannot leave the flag
71/// stuck. Returns `None` if a flush is already in progress, providing
72/// forward-compatible exclusion once the outer writer-RwLock is removed in
73/// Phase 4 of the concurrent-writer refactor.
74// FlushInProgressGuard moved to storage/manager.rs so flush_coordinator.rs
75// can hold it on RotatedFlush without a writer.rs back-import cycle.
76pub use crate::storage::manager::FlushInProgressGuard;
77
78/// Output of [`Writer::flush_l0_rotate`]: the to-be-flushed L0 buffer,
79/// captured WAL LSN, current_version, and the in-progress guard whose
80/// lifetime spans the full flush (including the future async stream
81/// phase that runs on a spawned task).
82struct RotateOutput {
83    old_l0_arc: Arc<RwLock<L0Buffer>>,
84    wal_lsn: u64,
85    current_version: u64,
86    flush_in_progress_guard: FlushInProgressGuard,
87}
88
89/// Project a property map to a subset selected by `keys`. Used to
90/// run `touched_needs_full_read` against just the SET-touched keys
91/// when the caller passes a fully-merged `props` map.
92fn props_subset(props: &Properties, keys: &HashSet<String>) -> Properties {
93    let mut out = Properties::new();
94    for k in keys {
95        if let Some(v) = props.get(k) {
96            out.insert(k.clone(), v.clone());
97        }
98    }
99    out
100}
101
102/// Output of [`Writer::flush_stream_l1`]: the built (but not yet
103/// published) snapshot manifest and its id. Finalize is responsible
104/// for `save_snapshot` + `set_latest_snapshot` + `cached_manifest`
105/// update.
106struct FlushOutcome {
107    manifest: SnapshotManifest,
108    snapshot_id: String,
109}
110
111pub struct Writer {
112    pub l0_manager: Arc<L0Manager>,
113    pub storage: Arc<StorageManager>,
114    pub schema_manager: Arc<uni_common::core::schema::SchemaManager>,
115    pub allocator: Arc<IdAllocator>,
116    pub config: UniConfig,
117    /// Optional embedding runtime. `OnceLock` so the initializer can run
118    /// on `&self` after the `Writer` has been wrapped in `Arc<Writer>`
119    /// (Phase 4 of concurrent_writer.md). Read through
120    /// [`Writer::xervo_runtime`] — the field itself is private to keep
121    /// callers oblivious to the OnceLock representation.
122    xervo_runtime: OnceLock<Arc<ModelRuntime>>,
123    /// Property manager for cache invalidation after flush
124    pub property_manager: Option<Arc<PropertyManager>>,
125    /// Adjacency manager for dual-write (edges survive flush).
126    adjacency_manager: Arc<AdjacencyManager>,
127    /// Timestamp of last flush or creation. Interior-mutable so that
128    /// `&self` callers can update it; uncontended in practice because all
129    /// writes happen inside the single-flusher critical section.
130    /// Arc-wrapped so it can travel into the SharedFlushCtx that the
131    /// async-flush coordinator passes to spawned stream/finalize tasks.
132    last_flush_time: Arc<PlMutex<std::time::Instant>>,
133    /// Background compaction task handle (prevents concurrent compaction races)
134    compaction_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
135    /// Optional index rebuild manager for post-flush automatic rebuild scheduling.
136    /// `OnceLock` for the same reason as `xervo_runtime`.
137    /// Wrapped in `Arc` so the async-flush finalize path can read it
138    /// from a spawned task via `SharedFlushCtx`.
139    index_rebuild_manager: Arc<OnceLock<Arc<crate::storage::index_rebuild::IndexRebuildManager>>>,
140    /// Cached snapshot manifest from the last flush. Avoids re-reading from
141    /// object store on every flush_to_l1 call. Wrapped in a `Mutex` for
142    /// `&self` access; uncontended because all access is inside the
143    /// single-flusher critical section.
144    cached_manifest: Arc<PlMutex<Option<SnapshotManifest>>>,
145    /// Identifier of the fork this writer serves, if any. `None` for
146    /// primary's writer. Set by [`crate::fork::writer_factory::new_for_fork`]
147    /// and read in `flush_to_l1` to emit fork-tagged metrics and to fire
148    /// the fragment-count guard rail (Phase 2 Day 12).
149    pub fork_id: Option<ForkId>,
150    /// Number of `flush_to_l1` calls since this writer was constructed.
151    /// Used as a proxy for L1 fragment growth on the fork's branches:
152    /// each flush typically appends ~1 fragment per touched dataset, so
153    /// the count tracks the order of magnitude of fragment accumulation.
154    /// Reading the actual `Dataset::manifest().fragments.len()` per
155    /// flush would add a per-dataset object-store roundtrip on the hot
156    /// commit path; the proxy keeps the guard rail purely observational
157    /// (Phase 5 introduces fork compaction proper). Only meaningful when
158    /// `fork_id.is_some()`. `Relaxed` is sufficient — observational only.
159    fork_flush_count: Arc<AtomicU64>,
160    /// Whether the fork-fragment warning has already fired at the
161    /// configured threshold. One-shot per writer lifetime. `Relaxed` is
162    /// sufficient — observational only.
163    fork_fragment_warn_fired: Arc<AtomicBool>,
164    /// Dedicated lock for the genuinely-exclusive flush path. Acquired by
165    /// the [`Writer::flush_to_l1`] entry and by `commit_transaction_l0`
166    /// across its WAL-append + L0-merge window. Replaces the outer
167    /// `Arc<RwLock<Writer>>` for flush exclusion once Phase 4 drops it.
168    /// Arc-wrapped so async-flush coordinator's finalize path can
169    /// re-acquire it from a spawned task via SharedFlushCtx.
170    flush_lock: Arc<tokio::sync::Mutex<()>>,
171    /// Coordinator for async-flush pipeline. Owns the back-pressure
172    /// semaphore, rotate-order sequence, single-finalizer task, and
173    /// pending-flush counter. Always present even when async flush is
174    /// disabled — the sync `flush_to_l1` path uses it for the future
175    /// `FlushInProgressGuard`/permit ownership model.
176    /// Coordinator is `None` when `async_flush_enabled = false`. The
177    /// coordinator's finalizer task captures `SharedFlushCtx` which
178    /// includes `Arc<StorageManager>`; on a fork-scoped Writer that
179    /// also pins the fork's `ForkScope` via `storage.fork_scope`, so
180    /// the holder count never drops. Constructing it only when the
181    /// feature is actually on avoids that side-effect for all
182    /// existing sync-flush paths. When async-flush graduates from
183    /// opt-in to default (Commit 12), `drop_fork` (Commit 8) handles
184    /// the drain explicitly.
185    #[allow(dead_code)] // first production use lands in Commit 6/7
186    pub(crate) flush_coordinator: Option<Arc<crate::runtime::flush_coordinator::FlushCoordinator>>,
187    /// Optimistic-concurrency commit-sequence counter (SSI). Incremented once
188    /// per successful commit under `flush_lock`; a transaction captures the
189    /// current value at begin as its read sequence (`L0Buffer::occ_read_seq`).
190    ///
191    /// Always allocated; consulted only when `config.ssi_enabled` is `true`.
192    commit_sequence: Arc<AtomicU64>,
193    /// Bounded log of recently-committed write-sets for OCC conflict detection.
194    /// Read and updated only under `flush_lock`.
195    ///
196    /// Always allocated; consulted only when `config.ssi_enabled` is `true`.
197    committed_writes: Arc<PlMutex<crate::runtime::occ::CommitRegistry>>,
198    /// Per-row pessimistic locks for `FOR UPDATE` (SSI escape hatch), keyed by
199    /// canonical (label, key-props) bytes. A transaction holds the lock from
200    /// MATCH until commit/rollback, serializing concurrent `FOR UPDATE` writers
201    /// on the same key (avoiding optimistic abort-retry on hot keys).
202    ///
203    /// Always allocated; populated only when `config.ssi_enabled` is `true`.
204    for_update_locks: Arc<dashmap::DashMap<Vec<u8>, Arc<tokio::sync::Mutex<()>>>>,
205}
206
207/// Number of recent commits retained for OCC conflict detection. Large enough
208/// that under-run — and the resulting conservative abort — is rare in practice;
209/// each entry is a small set of touched ids.
210const OCC_REGISTRY_CAPACITY: usize = 4096;
211
212impl Writer {
213    pub async fn new(
214        storage: Arc<StorageManager>,
215        schema_manager: Arc<uni_common::core::schema::SchemaManager>,
216        start_version: u64,
217    ) -> Result<Self> {
218        Self::new_with_config(
219            storage,
220            schema_manager,
221            start_version,
222            UniConfig::default(),
223            None,
224            None,
225        )
226        .await
227    }
228
229    pub async fn new_with_config(
230        storage: Arc<StorageManager>,
231        schema_manager: Arc<uni_common::core::schema::SchemaManager>,
232        start_version: u64,
233        config: UniConfig,
234        wal: Option<Arc<WriteAheadLog>>,
235        allocator: Option<Arc<IdAllocator>>,
236    ) -> Result<Self> {
237        let allocator = if let Some(a) = allocator {
238            a
239        } else {
240            let store = storage.store();
241            let path = object_store::path::Path::from("id_allocator.json");
242            Arc::new(IdAllocator::new(store, path, 1000).await?)
243        };
244
245        let l0_manager = Arc::new(L0Manager::new(start_version, wal));
246
247        let property_manager = Some(Arc::new(PropertyManager::new(
248            storage.clone(),
249            schema_manager.clone(),
250            1000,
251        )));
252
253        let adjacency_manager = storage.adjacency_manager();
254
255        // Hoist the Arc'd fields so we can both stash them on Writer and
256        // hand the same Arcs to the SharedFlushCtx that FlushCoordinator
257        // captures. Single-source-of-truth for each piece of mutable
258        // shared state.
259        let last_flush_time = Arc::new(PlMutex::new(std::time::Instant::now()));
260        let cached_manifest = Arc::new(PlMutex::new(None));
261        let fork_flush_count = Arc::new(AtomicU64::new(0));
262        let fork_fragment_warn_fired = Arc::new(AtomicBool::new(false));
263        let flush_lock = Arc::new(tokio::sync::Mutex::new(()));
264        let compaction_handle = Arc::new(RwLock::new(None));
265        let index_rebuild_manager: Arc<
266            OnceLock<Arc<crate::storage::index_rebuild::IndexRebuildManager>>,
267        > = Arc::new(OnceLock::new());
268
269        let flush_coordinator = if config.async_flush_enabled {
270            let shared = SharedFlushCtx {
271                storage: storage.clone(),
272                l0_manager: l0_manager.clone(),
273                adjacency_manager: adjacency_manager.clone(),
274                property_manager: property_manager.clone(),
275                schema_manager: schema_manager.clone(),
276                cached_manifest: cached_manifest.clone(),
277                last_flush_time: last_flush_time.clone(),
278                fork_id: None,
279                fork_flush_count: fork_flush_count.clone(),
280                fork_fragment_warn_fired: fork_fragment_warn_fired.clone(),
281                fork_fragment_warn_threshold: config.fork_fragment_warn_threshold,
282                flush_lock: flush_lock.clone(),
283                index_rebuild_manager: index_rebuild_manager.clone(),
284                compaction_handle: compaction_handle.clone(),
285                compaction_config: config.compaction.clone(),
286                index_rebuild_config: config.index_rebuild.clone(),
287                auto_rebuild_enabled: config.index_rebuild.auto_rebuild_enabled,
288            };
289            let finalize_fn: Arc<dyn FinalizeFn> = Arc::new(WriterFinalizer);
290            Some(Arc::new(FlushCoordinator::new(
291                config.max_pending_flushes,
292                shared,
293                finalize_fn,
294            )))
295        } else {
296            None
297        };
298
299        let commit_sequence = Arc::new(AtomicU64::new(0));
300        let committed_writes = Arc::new(PlMutex::new(crate::runtime::occ::CommitRegistry::new(
301            OCC_REGISTRY_CAPACITY,
302        )));
303        let for_update_locks = Arc::new(dashmap::DashMap::new());
304
305        Ok(Self {
306            l0_manager,
307            storage,
308            schema_manager,
309            allocator,
310            config,
311            xervo_runtime: OnceLock::new(),
312            property_manager,
313            adjacency_manager,
314            last_flush_time,
315            compaction_handle,
316            index_rebuild_manager,
317            cached_manifest,
318            fork_id: None,
319            fork_flush_count,
320            fork_fragment_warn_fired,
321            flush_lock,
322            flush_coordinator,
323            commit_sequence,
324            committed_writes,
325            for_update_locks,
326        })
327    }
328
329    /// Returns the shared pessimistic lock handle for a `FOR UPDATE` row key,
330    /// creating it on first use. The caller `.lock_owned().await`s the returned
331    /// mutex and holds the guard for the transaction's lifetime.
332    pub fn row_lock_handle(&self, key: &[u8]) -> Arc<tokio::sync::Mutex<()>> {
333        self.for_update_locks
334            .entry(key.to_vec())
335            .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
336            .clone()
337    }
338
339    /// Prunes `FOR UPDATE` lock-map entries for `keys` that no live transaction
340    /// holds anymore, so the map does not grow without bound across the keyspace.
341    ///
342    /// Called when a transaction ends, **after** its guards have been dropped.
343    /// `remove_if` evaluates its predicate under the DashMap shard lock, which is
344    /// the same lock `row_lock_handle` takes to clone an entry — so the check
345    /// `strong_count == 1` (only the map holds the `Arc`) is race-free: a
346    /// concurrent acquirer either already cloned the `Arc` (count ≥ 2 → we skip
347    /// removal) or has not yet taken the shard lock (it will mint a fresh entry
348    /// after we remove). Either way no two transactions ever lock different
349    /// `Mutex` instances for the same key.
350    pub fn release_for_update_locks(&self, keys: &[Vec<u8>]) {
351        for key in keys {
352            self.for_update_locks
353                .remove_if(key, |_, handle| Arc::strong_count(handle) == 1);
354        }
355    }
356
357    /// Number of live entries in the `FOR UPDATE` lock map. Introspection for
358    /// tests that the map does not leak entries across transactions (G5).
359    pub fn for_update_lock_count(&self) -> usize {
360        self.for_update_locks.len()
361    }
362
363    /// The current OCC commit sequence. A `FOR UPDATE` acquisition re-stamps a
364    /// fresh transaction's `occ_read_seq` to this so its conflict-detection
365    /// baseline advances to lock-acquisition time (read-latest under the lock).
366    pub fn current_commit_sequence(&self) -> u64 {
367        self.commit_sequence.load(Ordering::Relaxed)
368    }
369
370    /// Build a fresh `SharedFlushCtx` from this Writer's current state.
371    /// Used by the async-flush stream/finalize paths to pass into spawned
372    /// tasks without smuggling `Arc<Writer>` (which would create a cycle
373    /// with `flush_coordinator -> FinalizeFn -> Writer`).
374    pub(crate) fn shared_ctx(&self) -> SharedFlushCtx {
375        SharedFlushCtx {
376            storage: self.storage.clone(),
377            l0_manager: self.l0_manager.clone(),
378            adjacency_manager: self.adjacency_manager.clone(),
379            property_manager: self.property_manager.clone(),
380            schema_manager: self.schema_manager.clone(),
381            cached_manifest: self.cached_manifest.clone(),
382            last_flush_time: self.last_flush_time.clone(),
383            fork_id: self.fork_id,
384            fork_flush_count: self.fork_flush_count.clone(),
385            fork_fragment_warn_fired: self.fork_fragment_warn_fired.clone(),
386            fork_fragment_warn_threshold: self.config.fork_fragment_warn_threshold,
387            flush_lock: self.flush_lock.clone(),
388            index_rebuild_manager: self.index_rebuild_manager.clone(),
389            compaction_handle: self.compaction_handle.clone(),
390            compaction_config: self.config.compaction.clone(),
391            index_rebuild_config: self.config.index_rebuild.clone(),
392            auto_rebuild_enabled: self.config.index_rebuild.auto_rebuild_enabled,
393        }
394    }
395
396    /// Borrow the flush coordinator if async flush is enabled.
397    /// Returns `None` when `config.async_flush_enabled = false`.
398    /// External callers (`drop_fork`) use this to drain pending streams.
399    pub fn flush_coordinator(
400        &self,
401    ) -> Option<&Arc<crate::runtime::flush_coordinator::FlushCoordinator>> {
402        self.flush_coordinator.as_ref()
403    }
404
405    /// Set the index rebuild manager for post-flush automatic rebuild scheduling.
406    ///
407    /// One-shot: returns `Err` if already set. The receiver is `&self` so this
408    /// can be called after the `Writer` has been wrapped in `Arc<Writer>`.
409    pub fn set_index_rebuild_manager(
410        &self,
411        manager: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
412    ) -> Result<()> {
413        self.index_rebuild_manager
414            .set(manager)
415            .map_err(|_| anyhow!("index_rebuild_manager already set"))
416    }
417
418    /// Replay WAL mutations into the current L0 buffer.
419    pub async fn replay_wal(&self, wal_high_water_mark: u64) -> Result<usize> {
420        let l0 = self.l0_manager.get_current();
421        let wal = l0.read().wal.clone();
422
423        if let Some(wal) = wal {
424            wal.initialize().await?;
425            let mutations = wal.replay_since(wal_high_water_mark).await?;
426            let count = mutations.len();
427
428            if count > 0 {
429                log::info!(
430                    "Replaying {} mutations from WAL (LSN > {})",
431                    count,
432                    wal_high_water_mark
433                );
434                let mut l0_guard = l0.write();
435                l0_guard.replay_mutations(mutations)?;
436                // Rebuild the UNIQUE constraint index over the recovered rows
437                // (Bug #9 Mechanism B). `replay_mutations` restores
438                // vertices/properties/labels but never repopulates
439                // `constraint_index` (its only other caller is the live insert
440                // path). Without this, a unique key that lives only in the WAL
441                // (committed but not yet flushed to Lance) is invisible to
442                // `check_unique_constraint_multi` after recovery and a
443                // duplicate of it could be created.
444                self.rebuild_constraint_index(&mut l0_guard);
445            }
446
447            Ok(count)
448        } else {
449            Ok(0)
450        }
451    }
452
453    /// Rebuild the UNIQUE constraint index on a recovered L0 buffer.
454    ///
455    /// Scans every recovered vertex's properties and, for each enabled UNIQUE
456    /// constraint whose target label the vertex carries and whose member
457    /// properties are all present, inserts the same constraint key the live
458    /// insert path builds (`serialize_constraint_key`). Tombstoned vertices are
459    /// skipped. Called after [`L0Buffer::replay_mutations`] under the buffer's
460    /// write lock; the schema is already loaded on the `Writer`.
461    fn rebuild_constraint_index(&self, l0_guard: &mut L0Buffer) {
462        let schema = self.schema_manager.schema();
463        // Collect entries first to avoid borrowing `vertex_properties`
464        // immutably while mutating `constraint_index` through the same guard.
465        let mut keys: Vec<(Vec<u8>, Vid)> = Vec::new();
466        for (&vid, props) in &l0_guard.vertex_properties {
467            if l0_guard.vertex_tombstones.contains(&vid) {
468                continue;
469            }
470            let Some(labels) = l0_guard.vertex_labels.get(&vid) else {
471                continue;
472            };
473            for label in labels {
474                for constraint in &schema.constraints {
475                    if !constraint.enabled {
476                        continue;
477                    }
478                    let ConstraintTarget::Label(l) = &constraint.target else {
479                        continue;
480                    };
481                    if l != label {
482                        continue;
483                    }
484                    let ConstraintType::Unique {
485                        properties: unique_props,
486                    } = &constraint.constraint_type
487                    else {
488                        continue;
489                    };
490                    let mut key_values = Vec::new();
491                    let mut all_present = true;
492                    for prop in unique_props {
493                        if let Some(val) = props.get(prop) {
494                            key_values.push((prop.clone(), val.clone()));
495                        } else {
496                            all_present = false;
497                            break;
498                        }
499                    }
500                    if all_present {
501                        keys.push((serialize_constraint_key(label, &key_values), vid));
502                    }
503                }
504            }
505        }
506        for (key, vid) in keys {
507            l0_guard.insert_constraint_key(key, vid);
508        }
509    }
510
511    /// Allocates the next VID (pure auto-increment).
512    pub async fn next_vid(&self) -> Result<Vid> {
513        self.allocator.allocate_vid().await
514    }
515
516    /// Allocates multiple VIDs at once for bulk operations.
517    /// This is more efficient than calling next_vid() in a loop.
518    pub async fn allocate_vids(&self, count: usize) -> Result<Vec<Vid>> {
519        self.allocator.allocate_vids(count).await
520    }
521
522    /// Allocates the next EID (pure auto-increment).
523    pub async fn next_eid(&self, _type_id: u32) -> Result<Eid> {
524        self.allocator.allocate_eid().await
525    }
526
527    /// Allocates multiple EIDs at once for bulk operations.
528    /// This is more efficient than calling next_eid() in a loop.
529    pub async fn allocate_eids(&self, count: usize) -> Result<Vec<Eid>> {
530        self.allocator.allocate_eids(count).await
531    }
532
533    /// Install the embedding runtime exactly once. Receiver is `&self` so it
534    /// can be called after the `Writer` has been wrapped in `Arc<Writer>`.
535    pub fn set_xervo_runtime(&self, runtime: Arc<ModelRuntime>) -> Result<()> {
536        self.xervo_runtime
537            .set(runtime)
538            .map_err(|_| anyhow!("xervo_runtime already set"))
539    }
540
541    pub fn xervo_runtime(&self) -> Option<Arc<ModelRuntime>> {
542        self.xervo_runtime.get().cloned()
543    }
544
545    /// Create a new empty L0 buffer for transaction-scoped mutations.
546    ///
547    /// Only reads the current version — no exclusive lock required on Writer.
548    /// The returned buffer has no WAL reference; mutations are logged at
549    /// commit time via [`Self::commit_transaction_l0`].
550    pub fn create_transaction_l0(&self) -> Arc<RwLock<L0Buffer>> {
551        let current_version = self.l0_manager.get_current().read().current_version;
552        // Transaction mutations are logged to WAL at COMMIT time, not during the transaction.
553        let buf = L0Buffer::new(current_version, None);
554        // SSI: stamp the OCC read sequence at begin so commit can detect any
555        // transaction that committed since. Gated on the runtime `ssi_enabled`
556        // toggle — when off, `occ_read_set` stays `None` and every downstream
557        // read-set recording / commit validation self-gates to a no-op.
558        let buf = if self.config.ssi_enabled {
559            let mut buf = buf;
560            buf.occ_read_seq = self.commit_sequence.load(Ordering::Relaxed);
561            // The read path records observed ids here for SSI antidependency
562            // detection; commit consults it.
563            buf.occ_read_set = Some(Arc::new(parking_lot::Mutex::new(
564                crate::runtime::l0::OccReadSet::default(),
565            )));
566            buf
567        } else {
568            buf
569        };
570        Arc::new(RwLock::new(buf))
571    }
572
573    /// Resolve the target L0 buffer for a mutation.
574    ///
575    /// When `tx_l0` is `Some`, the mutation targets a transaction-private buffer.
576    /// When `None`, it targets the global L0 from the manager.
577    fn resolve_l0(&self, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) -> Arc<RwLock<L0Buffer>> {
578        tx_l0
579            .cloned()
580            .unwrap_or_else(|| self.l0_manager.get_current())
581    }
582
583    fn update_metrics(&self) {
584        let l0 = self.l0_manager.get_current();
585        let size = l0.read().estimated_size;
586        metrics::gauge!("l0_buffer_size_bytes").set(size as f64);
587    }
588
589    /// Overlay-aware issue-#77 edge-endpoint validation.
590    ///
591    /// The current buffer alone does not hold all committed-but-unflushed
592    /// tombstones — a flush rotation moves them onto `pending_flush` until
593    /// the Lance write completes. A vertex is effectively deleted iff,
594    /// walking newest-first (tx → current → pending newest→oldest), the
595    /// first buffer that knows the vid says "tombstoned" (an insert clears
596    /// the tombstone within a buffer, so props/tombstone are mutually
597    /// exclusive per buffer).
598    ///
599    /// Must run under `flush_lock` so the overlay cannot change before the
600    /// merge, and BEFORE the durable WAL flush (see the call site).
601    fn validate_edge_endpoints_overlay(&self, tx_l0: &L0Buffer) -> Result<()> {
602        let chain = self.l0_manager.get_pending_flush();
603        let current = self.l0_manager.get_current();
604        let effectively_deleted = |vid: &Vid| -> bool {
605            if tx_l0.vertex_properties.contains_key(vid) {
606                return false;
607            }
608            if tx_l0.vertex_tombstones.contains(vid) {
609                return true;
610            }
611            {
612                let cur = current.read();
613                if cur.vertex_properties.contains_key(vid) {
614                    return false;
615                }
616                if cur.vertex_tombstones.contains(vid) {
617                    return true;
618                }
619            }
620            for frozen in chain.iter().rev() {
621                let g = frozen.read();
622                if g.vertex_properties.contains_key(vid) {
623                    return false;
624                }
625                if g.vertex_tombstones.contains(vid) {
626                    return true;
627                }
628            }
629            false
630        };
631        for (eid, (src_vid, dst_vid, _etype)) in &tx_l0.edge_endpoints {
632            if tx_l0.tombstones.contains_key(eid) {
633                continue; // a deletion, not an insertion — never resurrects a vertex
634            }
635            if effectively_deleted(src_vid) {
636                anyhow::bail!(
637                    "Cannot insert edge {}: source vertex {} has been deleted (issue #77)",
638                    eid,
639                    src_vid
640                );
641            }
642            if effectively_deleted(dst_vid) {
643                anyhow::bail!(
644                    "Cannot insert edge {}: destination vertex {} has been deleted (issue #77)",
645                    eid,
646                    dst_vid
647                );
648            }
649        }
650        Ok(())
651    }
652
653    /// Seed `main_l0` (the current buffer) with the newest pending-overlay
654    /// value for each CRDT property the transaction writes, so the commit
655    /// merge MERGES against the committed CRDT state instead of shadowing it
656    /// (the carve-out lets concurrent CRDT writers commit on the assumption
657    /// that the merge sees the committed value — true only while that value
658    /// lives in current, not mid-flush on `pending_flush`). No-op when
659    /// nothing is pending or the property already exists in current. Vertex
660    /// properties only, mirroring the carve-out itself.
661    fn seed_crdt_state_from_chain(&self, tx_l0: &L0Buffer, main_l0: &mut L0Buffer) {
662        let chain = self.l0_manager.get_pending_flush();
663        if chain.is_empty() {
664            return;
665        }
666        for (vid, props) in &tx_l0.vertex_properties {
667            Self::seed_crdt_props(&chain, *vid, props, main_l0);
668        }
669    }
670
671    /// Per-vertex CRDT seeding (see [`Self::seed_crdt_state_from_chain`]).
672    /// Also used by the non-transactional vertex write path, which CRDT-merges
673    /// into the current buffer directly and has the same shadowing hazard
674    /// during a flush window.
675    fn seed_crdt_props(
676        chain: &[Arc<RwLock<L0Buffer>>],
677        vid: Vid,
678        props: &Properties,
679        target: &mut L0Buffer,
680    ) {
681        for (key, value) in props {
682            if crate::runtime::l0::try_as_crdt(value).is_none() {
683                continue;
684            }
685            if target
686                .vertex_properties
687                .get(&vid)
688                .is_some_and(|p| p.contains_key(key))
689            {
690                continue;
691            }
692            // Newest generation first: the first hit is the live state.
693            for frozen in chain.iter().rev() {
694                let g = frozen.read();
695                if let Some(v) = g.vertex_properties.get(&vid).and_then(|p| p.get(key)) {
696                    if crate::runtime::l0::try_as_crdt(v).is_some() {
697                        target
698                            .vertex_properties
699                            .entry(vid)
700                            .or_default()
701                            .insert(key.clone(), v.clone());
702                    }
703                    break;
704                }
705            }
706        }
707    }
708
709    /// Commit an externally-owned transaction L0 buffer.
710    ///
711    /// Writes mutations to WAL, flushes, merges into main L0, and replays
712    /// edges into the AdjacencyManager. Returns the WAL LSN of the commit
713    /// (0 when no WAL is configured).
714    /// Commit a transaction's private L0 buffer into main L0.
715    ///
716    /// Returns `(wal_lsn, flush_pending)`. When `flush_pending == true`, the
717    /// post-commit `should_flush()` predicate fired but no flush ran — the
718    /// caller is expected to spawn a background `flush_to_l1`. This is the
719    /// shape used when `UniConfig::async_flush_enabled` is set, so commits
720    /// don't block on L1-streaming I/O.
721    pub async fn commit_transaction_l0(
722        self: &Arc<Self>,
723        tx_l0_arc: Arc<RwLock<L0Buffer>>,
724    ) -> Result<(u64, bool)> {
725        // Hold `flush_lock` across WAL append + flush + main-L0 merge.
726        // Two concurrent commits serialize here; in Phase 3 the outer
727        // `Arc<RwLock<Writer>>` already provides this exclusion, so the
728        // acquisition is uncontended. Phase 4 drops the outer lock and
729        // this becomes the load-bearing serialization point.
730        let _flush_lock_guard = self.flush_lock.lock().await;
731
732        // Crash-recovery seam: simulate process death immediately after winning
733        // the commit serialization point but before any durable work. No-op
734        // unless built with `--features failpoints`. (See ssi_resilience tests.)
735        fail::fail_point!("commit::after-flush-lock");
736
737        // SSI: optimistic conflict detection. This MUST run before any WAL
738        // write — `flush_wal()` below is the durable commit point and the WAL
739        // has no abort marker, so aborting after it would resurrect this
740        // transaction on crash recovery. The write-set is reused for
741        // registration after a successful merge.
742        // Runtime-gated on `config.ssi_enabled`. When off, no validation runs
743        // and `occ_write_set` is `None`, so the post-merge registration below
744        // is skipped — reproducing last-writer-wins exactly.
745        let occ_write_set: Option<crate::runtime::occ::WriteSet> = if self.config.ssi_enabled {
746            let tx_l0 = tx_l0_arc.read();
747            let read_seq = tx_l0.occ_read_seq;
748            let write_set = crate::runtime::occ::WriteSet::from_l0(&tx_l0);
749            if !write_set.is_empty() {
750                // Telemetry: one validation per non-empty (writing) commit. The
751                // ratio of conflicts to validations is the headline abort rate.
752                metrics::counter!("uni_ssi_commit_validations_total").increment(1);
753                // Read-set is consulted only for writing transactions, so a
754                // read-only commit (empty write-set) runs at snapshot isolation.
755                let read_guard = tx_l0.occ_read_set.as_ref().map(|rs| rs.lock());
756                if let Some(conflict) =
757                    self.committed_writes
758                        .lock()
759                        .check(read_seq, &write_set, read_guard.as_deref())
760                {
761                    use crate::runtime::occ::Conflict;
762                    match &conflict {
763                        Conflict::WriteWrite { .. } => metrics::counter!(
764                            "uni_ssi_serialization_conflicts_total",
765                            "kind" => "write_write",
766                        )
767                        .increment(1),
768                        Conflict::ReadWrite { .. } => metrics::counter!(
769                            "uni_ssi_serialization_conflicts_total",
770                            "kind" => "read_write",
771                        )
772                        .increment(1),
773                        Conflict::HistoryTruncated { .. } => {
774                            metrics::counter!("uni_ssi_history_truncated_total").increment(1)
775                        }
776                    }
777                    return Err(anyhow::Error::new(
778                        uni_common::UniError::SerializationConflict {
779                            message: conflict.to_string(),
780                        },
781                    ));
782                }
783            }
784
785            // Validate against the committed-but-unflushed overlay under
786            // `flush_lock`: serializable MERGE uniqueness + CRDT carve-out
787            // soundness. The current buffer alone does not hold all committed
788            // state — a flush rotation moves it onto `pending_flush` until the
789            // Lance write completes (the Bug #9A window, here at the
790            // commit-time layer) — so every check walks [current, pending…].
791            {
792                let pending = self.l0_manager.get_pending_flush();
793                let main_l0 = self.l0_manager.get_current();
794                let overlay: Vec<Arc<RwLock<L0Buffer>>> =
795                    std::iter::once(main_l0).chain(pending).collect();
796
797                // SSI / serializable MERGE: abort if a concurrent transaction has
798                // already committed a row with one of this transaction's unique
799                // keys. Commits serialize here, so this closes the race window
800                // left by the per-insert check. (Empty index → no iterations.)
801                for (key, vid) in &tx_l0.constraint_index {
802                    if overlay
803                        .iter()
804                        .any(|b| b.read().has_constraint_key(key, *vid))
805                    {
806                        metrics::counter!("uni_ssi_constraint_conflicts_total").increment(1);
807                        return Err(anyhow::Error::new(
808                            uni_common::UniError::ConstraintConflict {
809                                message: "unique key already committed by a concurrent \
810                                          transaction"
811                                    .to_string(),
812                            },
813                        ));
814                    }
815                }
816
817                // Same race window for global ext_id uniqueness: the per-insert
818                // check ran against an older main L0; re-probe the committed
819                // index here, where commits serialize.
820                for (ext_id, vid) in &tx_l0.extid_index {
821                    let taken = overlay.iter().any(|b| {
822                        matches!(b.read().extid_index.get(ext_id), Some(&owner) if owner != *vid)
823                    });
824                    if taken {
825                        metrics::counter!("uni_ssi_constraint_conflicts_total").increment(1);
826                        return Err(anyhow::Error::new(
827                            uni_common::UniError::ConstraintConflict {
828                                message: format!(
829                                    "ext_id '{ext_id}' already committed by a concurrent \
830                                     transaction"
831                                ),
832                            },
833                        ));
834                    }
835                }
836
837                // CRDT carve-out soundness: a pure-CRDT write was dropped from the
838                // write-set assuming its merge commutes. If the overlay holds a
839                // *different* CRDT variant for the same property, the merge would
840                // silently overwrite it — abort instead of losing the update.
841                // (Checked against every overlay buffer: conservative if an old
842                // generation held a different variant that a newer commit already
843                // replaced, but an abort+retry is always sound.)
844                for buf in &overlay {
845                    if let Some(conflict) =
846                        crate::runtime::occ::crdt_carveout_overwrite(&tx_l0, &buf.read())
847                    {
848                        metrics::counter!("uni_ssi_crdt_aborts_total").increment(1);
849                        return Err(anyhow::Error::new(
850                            uni_common::UniError::SerializationConflict {
851                                message: conflict.to_string(),
852                            },
853                        ));
854                    }
855                }
856            }
857            Some(write_set)
858        } else {
859            None
860        };
861
862        // Issue #77: an edge whose endpoint is effectively deleted makes the
863        // merge below bail. That bail MUST happen before the durable WAL flush —
864        // after it the transaction is committed-but-unmerged (a ghost commit),
865        // and WAL replay re-hits the same bail, making the database unopenable.
866        // SSI validation was deliberately placed before the flush for exactly
867        // this reason; the endpoint check belongs here too. Runs unconditionally
868        // (issue #77 is not SSI-gated) under `flush_lock`, so the overlay
869        // tombstone state cannot change between here and the merge. A
870        // tombstone may live in a flush-rotated pending buffer rather than
871        // the current buffer, so the check walks the overlay newest-first.
872        {
873            let tx_l0 = tx_l0_arc.read();
874            self.validate_edge_endpoints_overlay(&tx_l0)?;
875        }
876
877        // Crash-recovery seam: SSI validation has passed; the transaction is
878        // about to become durable. A crash here must leave NO trace (validation
879        // happens before the WAL is touched). No-op unless `failpoints`.
880        fail::fail_point!("commit::after-validate");
881
882        // 1. Write transaction mutations to WAL BEFORE merging into main L0
883        // This ensures durability before visibility.
884        {
885            let tx_l0 = tx_l0_arc.read();
886            let main_l0_arc = self.l0_manager.get_current();
887            let main_l0 = main_l0_arc.read();
888
889            // If WAL exists, write mutations to it for durability
890            if let Some(wal) = main_l0.wal.as_ref() {
891                // Order: vertices first, then edges (to ensure src/dst exist on replay)
892
893                // Vertex insertions
894                for (vid, properties) in &tx_l0.vertex_properties {
895                    if !tx_l0.vertex_tombstones.contains(vid) {
896                        let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
897                        wal.append(crate::runtime::wal::Mutation::InsertVertex {
898                            vid: *vid,
899                            properties: properties.clone(),
900                            labels,
901                        })?;
902                    }
903                }
904
905                // Vertex deletions
906                for vid in &tx_l0.vertex_tombstones {
907                    let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
908                    wal.append(crate::runtime::wal::Mutation::DeleteVertex { vid: *vid, labels })?;
909                }
910
911                // Label-only mutations (SET n:Label / REMOVE n:Label). After
912                // vertex inserts (so the vertex exists on replay), before edges,
913                // and skipping vertices deleted in this same commit.
914                for vid in &tx_l0.vertex_label_overwrites {
915                    if tx_l0.vertex_tombstones.contains(vid) {
916                        continue;
917                    }
918                    let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
919                    wal.append(crate::runtime::wal::Mutation::SetVertexLabels {
920                        vid: *vid,
921                        labels,
922                    })?;
923                }
924
925                // Crash-recovery seam: vertices appended, edges not yet. Tests
926                // assert that a crash here (before `flush_wal`) recovers NOTHING
927                // — the durable commit point is the flush below, not append.
928                fail::fail_point!("commit::mid-wal");
929
930                // Edge insertions and deletions from edge_endpoints
931                for (eid, (src_vid, dst_vid, edge_type)) in &tx_l0.edge_endpoints {
932                    if tx_l0.tombstones.contains_key(eid) {
933                        let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
934                        wal.append(crate::runtime::wal::Mutation::DeleteEdge {
935                            eid: *eid,
936                            src_vid: *src_vid,
937                            dst_vid: *dst_vid,
938                            edge_type: *edge_type,
939                            version,
940                        })?;
941                    } else {
942                        let properties =
943                            tx_l0.edge_properties.get(eid).cloned().unwrap_or_default();
944                        let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
945                        let edge_type_name = tx_l0.edge_types.get(eid).cloned();
946                        wal.append(crate::runtime::wal::Mutation::InsertEdge {
947                            src_vid: *src_vid,
948                            dst_vid: *dst_vid,
949                            edge_type: *edge_type,
950                            eid: *eid,
951                            version,
952                            properties,
953                            edge_type_name,
954                        })?;
955                    }
956                }
957
958                // Tombstones for edges that only exist in the global L0 (not in
959                // this transaction's edge_endpoints).  Without this, deletes of
960                // pre-existing edges would be silently lost.
961                for (eid, tombstone) in &tx_l0.tombstones {
962                    if !tx_l0.edge_endpoints.contains_key(eid) {
963                        let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
964                        wal.append(crate::runtime::wal::Mutation::DeleteEdge {
965                            eid: *eid,
966                            src_vid: tombstone.src_vid,
967                            dst_vid: tombstone.dst_vid,
968                            edge_type: tombstone.edge_type,
969                            version,
970                        })?;
971                    }
972                }
973            }
974        }
975
976        // 2. Flush WAL to durable storage - THIS IS THE COMMIT POINT
977        let wal_lsn = self.flush_wal().await?;
978
979        // Crash-recovery seam: the WAL is durable but main L0 has NOT merged.
980        // A crash here must RECOVER the transaction on replay (it is committed),
981        // even though it was never made visible in-process. No-op unless `failpoints`.
982        fail::fail_point!("commit::after-wal-flush");
983
984        // Component C1: if an outstanding snapshot pins the current generation,
985        // clone it aside (lazy copy-on-write) before merging, so the pinning
986        // transaction's reads stay isolated from this commit. No-op — and zero
987        // cost — when nothing is pinned (the common case). We hold `flush_lock`,
988        // so this cannot race a flush rotate or another commit's merge; the merge
989        // below re-fetches `get_current()`, landing in the fresh post-freeze buffer.
990        // Self-gates on the runtime SSI toggle: a snapshot is only ever pinned by
991        // a transaction begun under `ssi_enabled`, so `is_current_pinned()` is
992        // always false when SSI is off and this is a zero-cost no-op.
993        if self.l0_manager.is_current_pinned() {
994            self.l0_manager.freeze_current_for_snapshot();
995            metrics::counter!("uni_l0_snapshot_freezes_total").increment(1);
996        }
997
998        // 3. Merge into main L0 and make visible
999        {
1000            // Write-lock the tx buffer: `merge_take` moves its property maps
1001            // into main L0 instead of cloning them. The commit consumes the
1002            // transaction, so the drained maps are never observed afterwards;
1003            // everything read below (endpoints, versions, tombstones) is left
1004            // intact.
1005            let mut tx_l0 = tx_l0_arc.write();
1006            let main_l0_arc = self.l0_manager.get_current();
1007            let mut main_l0 = main_l0_arc.write();
1008            // A CRDT property's committed state may live only in a
1009            // flush-rotated pending buffer (the post-rotation current is
1010            // empty until the Lance write completes — the Bug #9A window).
1011            // `merge_crdt_properties` merges against the CURRENT buffer's
1012            // value — without seeding, the tx's CRDT state would SHADOW the
1013            // pending buffer's at read time (newest buffer wins per property)
1014            // and concurrent increments would be lost. Seed the newest
1015            // overlay value for each CRDT property the tx writes that current
1016            // lacks, so the merge below merges instead of replaces.
1017            self.seed_crdt_state_from_chain(&tx_l0, &mut main_l0);
1018            main_l0.merge_take(&mut tx_l0)?;
1019
1020            // Replay transaction edges into the AdjacencyManager overlay
1021            for (eid, (src, dst, etype)) in &tx_l0.edge_endpoints {
1022                let edge_version = tx_l0
1023                    .edge_versions
1024                    .get(eid)
1025                    .copied()
1026                    .unwrap_or(main_l0.current_version);
1027                if tx_l0.tombstones.contains_key(eid) {
1028                    self.adjacency_manager
1029                        .add_tombstone(*eid, *src, *dst, *etype, edge_version);
1030                } else {
1031                    self.adjacency_manager
1032                        .insert_edge(*src, *dst, *eid, *etype, edge_version);
1033                }
1034            }
1035
1036            // Replay tombstones for edges that only exist in the global L0
1037            // (not in this transaction's edge_endpoints).
1038            for (eid, tombstone) in &tx_l0.tombstones {
1039                if !tx_l0.edge_endpoints.contains_key(eid) {
1040                    let edge_version = tx_l0
1041                        .edge_versions
1042                        .get(eid)
1043                        .copied()
1044                        .unwrap_or(main_l0.current_version);
1045                    self.adjacency_manager.add_tombstone(
1046                        *eid,
1047                        tombstone.src_vid,
1048                        tombstone.dst_vid,
1049                        tombstone.edge_type,
1050                        edge_version,
1051                    );
1052                }
1053            }
1054        }
1055
1056        // Crash-recovery seam: durable AND merged, but the in-memory commit
1057        // registry has not recorded this write-set yet. A crash here is
1058        // indistinguishable from one at `after-wal-flush` on reopen (the
1059        // registry is in-memory and rebuilt empty); the tx still recovers.
1060        fail::fail_point!("commit::after-merge");
1061
1062        // SSI: register this commit's write-set under a fresh commit sequence so
1063        // later transactions detect conflicts against it. Still under
1064        // `flush_lock`, before the async-flush branch can drop the guard.
1065        // `occ_write_set` is `Some` only when `config.ssi_enabled`.
1066        if let Some(write_set) = occ_write_set
1067            && !write_set.is_empty()
1068        {
1069            let seq = self.commit_sequence.fetch_add(1, Ordering::Relaxed) + 1;
1070            self.committed_writes.lock().record(seq, write_set);
1071        }
1072
1073        self.update_metrics();
1074
1075        // 4. Best-effort post-commit auto-flush.
1076        //
1077        // Two paths:
1078        // - async_flush_enabled = false (default): inline under our
1079        //   existing flush_lock guard via flush_inline_under_lock.
1080        // - async_flush_enabled = true: rotate inline, drop flush_lock,
1081        //   then submit the stream phase to the coordinator. Gated on
1082        //   `pending_flush_count() < max_pending_flushes` so we don't
1083        //   stack up rotations beyond the configured pipeline depth.
1084        //   `try_acquire_permit` is non-blocking: if we lose the race
1085        //   for the last permit, we just skip this trigger (the next
1086        //   commit retries).
1087        let mut flush_pending = false;
1088        if self.should_flush() {
1089            if self.config.async_flush_enabled
1090                && let Some(coord) = self.flush_coordinator.as_ref()
1091                && coord.pending_flush_count() < self.config.max_pending_flushes
1092            {
1093                match coord.try_acquire_permit() {
1094                    Some(permit) => {
1095                        match self.flush_l0_rotate().await {
1096                            Ok(rotate_out) => {
1097                                // Allocate the rotate seq and bump pending ONLY
1098                                // after the rotate succeeds (Bug #3). A failed
1099                                // rotate must consume neither: the finalizer
1100                                // advances strictly in consecutive seq order and
1101                                // only decrements pending on finalize, so a
1102                                // leaked seq/pending from a failed rotate would
1103                                // wedge the finalizer forever and climb pending
1104                                // toward `max_pending_flushes`. The seq is still
1105                                // allocated under `flush_lock` (immediately after
1106                                // the rotate, before the guard drops below), so
1107                                // concurrent rotates keep seq order == rotation
1108                                // order, and the seq is not used until submit.
1109                                let seq = coord.next_rotate_seq();
1110                                coord.note_pending();
1111                                // Release flush_lock BEFORE the spawn so concurrent
1112                                // commits can proceed while the stream runs.
1113                                drop(_flush_lock_guard);
1114                                let parent_manifest = self.cached_manifest.lock().clone();
1115                                let rotated = crate::runtime::flush_coordinator::RotatedFlush {
1116                                    seq,
1117                                    old_l0_arc: rotate_out.old_l0_arc.clone(),
1118                                    wal_lsn: rotate_out.wal_lsn,
1119                                    current_version: rotate_out.current_version,
1120                                    name: None,
1121                                    parent_manifest,
1122                                    permit,
1123                                    flush_in_progress_guard: rotate_out.flush_in_progress_guard,
1124                                };
1125                                let writer = self.clone();
1126                                let _ticket = coord.submit_for_stream(
1127                                    rotated,
1128                                    move |old_l0, wal, ver, n| async move {
1129                                        let outcome =
1130                                            writer.flush_stream_l1(old_l0, wal, ver, n).await?;
1131                                        Ok(crate::runtime::flush_coordinator::FlushOutcome {
1132                                            new_manifest: outcome.manifest,
1133                                            snapshot_id: outcome.snapshot_id,
1134                                        })
1135                                    },
1136                                );
1137                                flush_pending = true;
1138                                // Early return — flush_lock already dropped.
1139                                return Ok((wal_lsn, flush_pending));
1140                            }
1141                            Err(e) => {
1142                                tracing::warn!("Async rotate failed (non-critical): {}", e);
1143                                // No seq was allocated and pending was not
1144                                // bumped (both moved into the Ok arm for Bug
1145                                // #3), so the finalizer is not wedged. The
1146                                // permit drops here, freeing the slot.
1147                            }
1148                        }
1149                    }
1150                    None => {
1151                        // Race: someone else grabbed the last permit. Skip;
1152                        // next commit will retry should_flush().
1153                        metrics::counter!("uni_flush_trigger_skipped_total").increment(1);
1154                    }
1155                }
1156            } else if let Err(e) = self.flush_inline_under_lock(None).await {
1157                tracing::warn!("Post-commit flush check failed (non-critical): {}", e);
1158            }
1159        }
1160
1161        Ok((wal_lsn, flush_pending))
1162    }
1163
1164    /// Flush the WAL buffer to durable storage.
1165    ///
1166    /// Returns the LSN of the flushed segment, or `0` when no WAL is configured.
1167    pub async fn flush_wal(&self) -> Result<u64> {
1168        let l0 = self.l0_manager.get_current();
1169        let wal = l0.read().wal.clone();
1170
1171        match wal {
1172            Some(wal) => Ok(wal.flush().await?),
1173            None => Ok(0),
1174        }
1175    }
1176
1177    /// Record property removals in the active L0 mutation stats.
1178    ///
1179    /// Routes to the transaction L0 if provided, otherwise to the main L0.
1180    pub fn track_properties_removed(&self, count: usize, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) {
1181        if count == 0 {
1182            return;
1183        }
1184        let l0 = self.resolve_l0(tx_l0);
1185        l0.write().mutation_stats.properties_removed += count;
1186    }
1187
1188    /// Validates vertex constraints for the given properties.
1189    /// In the new design, label is passed as a parameter since VID no longer embeds label.
1190    async fn validate_vertex_constraints_for_label(
1191        &self,
1192        vid: Vid,
1193        properties: &Properties,
1194        label: &str,
1195        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1196    ) -> Result<()> {
1197        self.validate_vertex_constraints_for_label_impl(vid, properties, label, tx_l0, false)
1198            .await
1199    }
1200
1201    /// Partial-update sibling: validates only constraints touching keys
1202    /// present in `properties` (the touched set). NOT NULL is checked
1203    /// only for touched keys; multi-key UNIQUE / CHECK / EXISTS are
1204    /// skipped when any referenced key is absent (the caller is
1205    /// expected to have routed to the full-row path in that case via
1206    /// `touched_needs_full_read`).
1207    async fn validate_vertex_constraints_for_label_partial(
1208        &self,
1209        vid: Vid,
1210        properties: &Properties,
1211        label: &str,
1212        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1213    ) -> Result<()> {
1214        self.validate_vertex_constraints_for_label_impl(vid, properties, label, tx_l0, true)
1215            .await
1216    }
1217
1218    async fn validate_vertex_constraints_for_label_impl(
1219        &self,
1220        vid: Vid,
1221        properties: &Properties,
1222        label: &str,
1223        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1224        partial: bool,
1225    ) -> Result<()> {
1226        let schema = self.schema_manager.schema();
1227
1228        {
1229            // 1. Check NOT NULL constraints (from Property definitions).
1230            //    Under partial-update mode, skip properties NOT in
1231            //    `properties` — they retain their previous (already-
1232            //    validated) value.
1233            if let Some(props_meta) = schema.properties.get(label) {
1234                for (prop_name, meta) in props_meta {
1235                    if !meta.nullable {
1236                        let present = properties.get(prop_name);
1237                        if partial && present.is_none() {
1238                            continue;
1239                        }
1240                        if present.is_none_or(|v| v.is_null()) {
1241                            log::warn!(
1242                                "Constraint violation: Property '{}' cannot be null for label '{}'",
1243                                prop_name,
1244                                label
1245                            );
1246                            return Err(anyhow!(
1247                                "Constraint violation: Property '{}' cannot be null",
1248                                prop_name
1249                            ));
1250                        }
1251                    }
1252                }
1253            }
1254
1255            // 2. Check Explicit Constraints (Unique, Check, etc.)
1256            for constraint in &schema.constraints {
1257                if !constraint.enabled {
1258                    continue;
1259                }
1260                match &constraint.target {
1261                    ConstraintTarget::Label(l) if l == label => {}
1262                    _ => continue,
1263                }
1264
1265                match &constraint.constraint_type {
1266                    ConstraintType::Unique {
1267                        properties: unique_props,
1268                    } => {
1269                        // Support single and multi-property unique constraints
1270                        if !unique_props.is_empty() {
1271                            let mut key_values = Vec::new();
1272                            let mut missing = false;
1273                            for prop in unique_props {
1274                                if let Some(val) = properties.get(prop) {
1275                                    key_values.push((prop.clone(), val.clone()));
1276                                } else {
1277                                    missing = true; // Can't enforce if property missing (partial update?)
1278                                    // For INSERT, missing means null?
1279                                    // If property is nullable, unique constraint typically allows multiple nulls or ignores?
1280                                    // For now, only check if ALL keys are present
1281                                }
1282                            }
1283
1284                            if !missing {
1285                                self.check_unique_constraint_multi(label, &key_values, vid, tx_l0)
1286                                    .await?;
1287                            }
1288                        }
1289                    }
1290                    ConstraintType::Exists { property } => {
1291                        if properties.get(property).is_none_or(|v| v.is_null()) {
1292                            log::warn!(
1293                                "Constraint violation: Property '{}' must exist for label '{}'",
1294                                property,
1295                                label
1296                            );
1297                            return Err(anyhow!(
1298                                "Constraint violation: Property '{}' must exist",
1299                                property
1300                            ));
1301                        }
1302                    }
1303                    ConstraintType::Check { expression } => {
1304                        if !self.evaluate_check_constraint(expression, properties)? {
1305                            return Err(anyhow!(
1306                                "CHECK constraint '{}' violated: expression '{}' evaluated to false",
1307                                constraint.name,
1308                                expression
1309                            ));
1310                        }
1311                    }
1312                    _ => {
1313                        return Err(anyhow!("Unsupported constraint type"));
1314                    }
1315                }
1316            }
1317        }
1318        Ok(())
1319    }
1320
1321    /// Validates vertex constraints for a vertex with the given labels.
1322    /// Labels must be passed explicitly since the vertex may not yet be in L0.
1323    /// Unknown labels (not in schema) are skipped.
1324    async fn validate_vertex_constraints(
1325        &self,
1326        vid: Vid,
1327        properties: &Properties,
1328        labels: &[String],
1329        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1330    ) -> Result<()> {
1331        let schema = self.schema_manager.schema();
1332
1333        // Validate constraints only for known labels
1334        for label in labels {
1335            // Skip unknown labels (schemaless support)
1336            if schema.get_label_case_insensitive(label).is_none() {
1337                continue;
1338            }
1339            self.validate_vertex_constraints_for_label(vid, properties, label, tx_l0)
1340                .await?;
1341        }
1342
1343        // Check global ext_id uniqueness if ext_id is provided
1344        if let Some(ext_id) = properties.get("ext_id").and_then(|v| v.as_str()) {
1345            self.check_extid_globally_unique(ext_id, vid, tx_l0).await?;
1346        }
1347
1348        Ok(())
1349    }
1350
1351    /// Partial sibling of `validate_vertex_constraints` — validates only
1352    /// constraints touching keys present in `properties`. Used by
1353    /// `insert_vertex_partial`'s fast path; the caller pre-screens for
1354    /// multi-key UNIQUE constraints via `touched_needs_full_read`.
1355    async fn validate_vertex_constraints_partial(
1356        &self,
1357        vid: Vid,
1358        touched: &Properties,
1359        labels: &[String],
1360        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1361    ) -> Result<()> {
1362        let schema = self.schema_manager.schema();
1363        for label in labels {
1364            if schema.get_label_case_insensitive(label).is_none() {
1365                continue;
1366            }
1367            self.validate_vertex_constraints_for_label_partial(vid, touched, label, tx_l0)
1368                .await?;
1369        }
1370        if let Some(ext_id) = touched.get("ext_id").and_then(|v| v.as_str()) {
1371            self.check_extid_globally_unique(ext_id, vid, tx_l0).await?;
1372        }
1373        Ok(())
1374    }
1375
1376    /// Collect ext_ids and unique constraint keys from an iterator of vertex properties.
1377    ///
1378    /// Used to build a constraint key index from L0 buffers for batch validation.
1379    fn collect_constraint_keys_from_properties<'a>(
1380        properties_iter: impl Iterator<Item = &'a Properties>,
1381        label: &str,
1382        constraints: &[uni_common::core::schema::Constraint],
1383        existing_keys: &mut HashMap<String, HashSet<String>>,
1384        existing_extids: &mut HashSet<String>,
1385    ) {
1386        for props in properties_iter {
1387            if let Some(ext_id) = props.get("ext_id").and_then(|v| v.as_str()) {
1388                existing_extids.insert(ext_id.to_string());
1389            }
1390
1391            for constraint in constraints {
1392                if !constraint.enabled {
1393                    continue;
1394                }
1395                if let ConstraintTarget::Label(l) = &constraint.target {
1396                    if l != label {
1397                        continue;
1398                    }
1399                } else {
1400                    continue;
1401                }
1402
1403                if let ConstraintType::Unique {
1404                    properties: unique_props,
1405                } = &constraint.constraint_type
1406                {
1407                    let mut key_parts = Vec::new();
1408                    let mut all_present = true;
1409                    for prop in unique_props {
1410                        if let Some(val) = props.get(prop) {
1411                            key_parts.push(format!("{}:{}", prop, val));
1412                        } else {
1413                            all_present = false;
1414                            break;
1415                        }
1416                    }
1417                    if all_present {
1418                        let key = key_parts.join("|");
1419                        existing_keys
1420                            .entry(constraint.name.clone())
1421                            .or_default()
1422                            .insert(key);
1423                    }
1424                }
1425            }
1426        }
1427    }
1428
1429    /// Validates constraints for a batch of vertices efficiently.
1430    ///
1431    /// This method builds an in-memory index from L0 buffers ONCE instead of scanning
1432    /// per vertex, reducing complexity from O(n²) to O(n) for bulk inserts.
1433    ///
1434    /// # Arguments
1435    /// * `vids` - VIDs of vertices being inserted
1436    /// * `properties_batch` - Properties for each vertex
1437    /// * `label` - Label for all vertices (assumes single label for now)
1438    ///
1439    /// # Performance
1440    /// For N vertices with unique constraints:
1441    /// - Old approach: O(N²) - scan L0 buffer N times
1442    /// - New approach: O(N) - scan L0 buffer once, build HashSet, check each vertex in O(1)
1443    async fn validate_vertex_batch_constraints(
1444        &self,
1445        vids: &[Vid],
1446        properties_batch: &[Properties],
1447        label: &str,
1448        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1449    ) -> Result<()> {
1450        if vids.len() != properties_batch.len() {
1451            return Err(anyhow!("VID/properties length mismatch"));
1452        }
1453
1454        let schema = self.schema_manager.schema();
1455
1456        // 1. Validate NOT NULL constraints for each vertex
1457        if let Some(props_meta) = schema.properties.get(label) {
1458            for (idx, properties) in properties_batch.iter().enumerate() {
1459                for (prop_name, meta) in props_meta {
1460                    if !meta.nullable && properties.get(prop_name).is_none_or(|v| v.is_null()) {
1461                        return Err(anyhow!(
1462                            "Constraint violation at index {}: Property '{}' cannot be null",
1463                            idx,
1464                            prop_name
1465                        ));
1466                    }
1467                }
1468            }
1469        }
1470
1471        // 2. Build constraint key index from L0 buffers (ONCE for entire batch)
1472        let mut existing_keys: HashMap<String, HashSet<String>> = HashMap::new();
1473        let mut existing_extids: HashSet<String> = HashSet::new();
1474
1475        // Scan current L0 buffer
1476        {
1477            let l0 = self.l0_manager.get_current();
1478            let l0_guard = l0.read();
1479            Self::collect_constraint_keys_from_properties(
1480                l0_guard.vertex_properties.values(),
1481                label,
1482                &schema.constraints,
1483                &mut existing_keys,
1484                &mut existing_extids,
1485            );
1486        }
1487
1488        // Scan transaction L0 if present
1489        if let Some(tx_l0) = tx_l0 {
1490            let tx_l0_guard = tx_l0.read();
1491            Self::collect_constraint_keys_from_properties(
1492                tx_l0_guard.vertex_properties.values(),
1493                label,
1494                &schema.constraints,
1495                &mut existing_keys,
1496                &mut existing_extids,
1497            );
1498        }
1499
1500        // 3. Check batch vertices against index AND check for duplicates within batch
1501        let mut batch_keys: HashMap<String, HashMap<String, usize>> = HashMap::new();
1502        let mut batch_extids: HashMap<String, usize> = HashMap::new();
1503
1504        for (idx, (_vid, properties)) in vids.iter().zip(properties_batch.iter()).enumerate() {
1505            // Check ext_id uniqueness
1506            if let Some(ext_id) = properties.get("ext_id").and_then(|v| v.as_str()) {
1507                if existing_extids.contains(ext_id) {
1508                    return Err(anyhow!(
1509                        "Constraint violation at index {}: ext_id '{}' already exists",
1510                        idx,
1511                        ext_id
1512                    ));
1513                }
1514                if let Some(first_idx) = batch_extids.get(ext_id) {
1515                    return Err(anyhow!(
1516                        "Constraint violation: ext_id '{}' duplicated in batch at indices {} and {}",
1517                        ext_id,
1518                        first_idx,
1519                        idx
1520                    ));
1521                }
1522                batch_extids.insert(ext_id.to_string(), idx);
1523            }
1524
1525            // Check unique constraints
1526            for constraint in &schema.constraints {
1527                if !constraint.enabled {
1528                    continue;
1529                }
1530                if let ConstraintTarget::Label(l) = &constraint.target {
1531                    if l != label {
1532                        continue;
1533                    }
1534                } else {
1535                    continue;
1536                }
1537
1538                match &constraint.constraint_type {
1539                    ConstraintType::Unique {
1540                        properties: unique_props,
1541                    } => {
1542                        let mut key_parts = Vec::new();
1543                        let mut all_present = true;
1544                        for prop in unique_props {
1545                            if let Some(val) = properties.get(prop) {
1546                                key_parts.push(format!("{}:{}", prop, val));
1547                            } else {
1548                                all_present = false;
1549                                break;
1550                            }
1551                        }
1552
1553                        if all_present {
1554                            let key = key_parts.join("|");
1555
1556                            // Check against existing L0 keys
1557                            if let Some(keys) = existing_keys.get(&constraint.name)
1558                                && keys.contains(&key)
1559                            {
1560                                return Err(anyhow!(
1561                                    "Constraint violation at index {}: Duplicate composite key for label '{}' (constraint '{}')",
1562                                    idx,
1563                                    label,
1564                                    constraint.name
1565                                ));
1566                            }
1567
1568                            // Check for duplicates within batch
1569                            let batch_constraint_keys =
1570                                batch_keys.entry(constraint.name.clone()).or_default();
1571                            if let Some(first_idx) = batch_constraint_keys.get(&key) {
1572                                return Err(anyhow!(
1573                                    "Constraint violation: Duplicate key '{}' in batch at indices {} and {}",
1574                                    key,
1575                                    first_idx,
1576                                    idx
1577                                ));
1578                            }
1579                            batch_constraint_keys.insert(key, idx);
1580                        }
1581                    }
1582                    ConstraintType::Exists { property }
1583                        if properties.get(property).is_none_or(|v| v.is_null()) =>
1584                    {
1585                        return Err(anyhow!(
1586                            "Constraint violation at index {}: Property '{}' must exist",
1587                            idx,
1588                            property
1589                        ));
1590                    }
1591                    ConstraintType::Check { expression }
1592                        if !self.evaluate_check_constraint(expression, properties)? =>
1593                    {
1594                        return Err(anyhow!(
1595                            "Constraint violation at index {}: CHECK constraint '{}' violated",
1596                            idx,
1597                            constraint.name
1598                        ));
1599                    }
1600                    _ => {}
1601                }
1602            }
1603        }
1604
1605        // 4. Check storage for unique constraints (can batch this into a single query)
1606        for constraint in &schema.constraints {
1607            if !constraint.enabled {
1608                continue;
1609            }
1610            if let ConstraintTarget::Label(l) = &constraint.target {
1611                if l != label {
1612                    continue;
1613                }
1614            } else {
1615                continue;
1616            }
1617
1618            if let ConstraintType::Unique {
1619                properties: unique_props,
1620            } = &constraint.constraint_type
1621            {
1622                // Build compound OR filter for all batch vertices
1623                let mut or_filters = Vec::new();
1624                for properties in properties_batch.iter() {
1625                    let mut and_parts = Vec::new();
1626                    let mut all_present = true;
1627                    for prop in unique_props {
1628                        if let Some(val) = properties.get(prop) {
1629                            let val_str = match val {
1630                                Value::String(s) => format!("'{}'", s.replace('\'', "''")),
1631                                Value::Int(n) => n.to_string(),
1632                                Value::Float(f) => f.to_string(),
1633                                Value::Bool(b) => b.to_string(),
1634                                _ => {
1635                                    all_present = false;
1636                                    break;
1637                                }
1638                            };
1639                            and_parts.push(format!("{} = {}", prop, val_str));
1640                        } else {
1641                            all_present = false;
1642                            break;
1643                        }
1644                    }
1645                    if all_present {
1646                        or_filters.push(format!("({})", and_parts.join(" AND ")));
1647                    }
1648                }
1649
1650                #[cfg(feature = "lance-backend")]
1651                if !or_filters.is_empty() {
1652                    let vid_list: Vec<String> =
1653                        vids.iter().map(|v| v.as_u64().to_string()).collect();
1654                    let filter = format!(
1655                        "({}) AND _deleted = false AND _vid NOT IN ({})",
1656                        or_filters.join(" OR "),
1657                        vid_list.join(", ")
1658                    );
1659
1660                    if let Ok(ds) = self.storage.vertex_dataset(label)
1661                        && let Ok(lance_ds) = ds.open_raw().await
1662                    {
1663                        let count = lance_ds.count_rows(Some(filter.clone())).await?;
1664                        if count > 0 {
1665                            return Err(anyhow!(
1666                                "Constraint violation: Duplicate composite key for label '{}' in storage (constraint '{}')",
1667                                label,
1668                                constraint.name
1669                            ));
1670                        }
1671                    }
1672                }
1673            }
1674        }
1675
1676        Ok(())
1677    }
1678
1679    /// Checks that ext_id is globally unique across all vertices.
1680    ///
1681    /// Searches L0 buffers (current, transaction, pending) and the main vertices table
1682    /// to ensure no other vertex uses this ext_id.
1683    ///
1684    /// # Errors
1685    ///
1686    /// Returns error if another vertex with the same ext_id exists.
1687    async fn check_extid_globally_unique(
1688        &self,
1689        ext_id: &str,
1690        current_vid: Vid,
1691        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1692    ) -> Result<()> {
1693        // Check L0 buffers: current, transaction, and pending flush
1694        let l0_buffers_to_check: Vec<Arc<RwLock<L0Buffer>>> = {
1695            let mut buffers = vec![self.l0_manager.get_current()];
1696            if let Some(tx_l0) = tx_l0 {
1697                buffers.push(tx_l0.clone());
1698            }
1699            buffers.extend(self.l0_manager.get_pending_flush());
1700            buffers
1701        };
1702
1703        for l0 in &l0_buffers_to_check {
1704            // O(1) per buffer via the maintained `extid_index` (the previous
1705            // full `vertex_properties` scan made constrained ingest O(n²)).
1706            if let Some(&vid) = l0.read().extid_index.get(ext_id)
1707                && vid != current_vid
1708            {
1709                return Err(anyhow!(
1710                    "Constraint violation: ext_id '{}' already exists (vertex {:?})",
1711                    ext_id,
1712                    vid
1713                ));
1714            }
1715        }
1716
1717        // Check main vertices table (if it exists)
1718        // Pass None for global uniqueness check (not snapshot-isolated)
1719        let backend = self.storage.backend();
1720        if let Ok(Some(found_vid)) = MainVertexDataset::find_by_ext_id(backend, ext_id, None).await
1721            && found_vid != current_vid
1722        {
1723            return Err(anyhow!(
1724                "Constraint violation: ext_id '{}' already exists (vertex {:?})",
1725                ext_id,
1726                found_vid
1727            ));
1728        }
1729
1730        Ok(())
1731    }
1732
1733    /// Helper to get vertex labels from L0 buffer.
1734    fn get_vertex_labels_from_l0(&self, vid: Vid) -> Option<Vec<String>> {
1735        let l0 = self.l0_manager.get_current();
1736        let l0_guard = l0.read();
1737        // Check if vertex is tombstoned (deleted) - if so, return None
1738        if l0_guard.vertex_tombstones.contains(&vid) {
1739            return None;
1740        }
1741        l0_guard.get_vertex_labels(vid).map(|l| l.to_vec())
1742    }
1743
1744    /// Get vertex labels from all sources: current L0, pending L0s, and storage.
1745    /// This is the proper way to read vertex labels after a flush, as it checks both
1746    /// in-memory buffers and persisted storage.
1747    pub async fn get_vertex_labels(
1748        &self,
1749        vid: Vid,
1750        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1751    ) -> Option<Vec<String>> {
1752        // 1. Check current L0
1753        if let Some(labels) = self.get_vertex_labels_from_l0(vid) {
1754            return Some(labels);
1755        }
1756
1757        // 2. Check transaction L0 if present
1758        if let Some(tx_l0) = tx_l0 {
1759            let guard = tx_l0.read();
1760            if guard.vertex_tombstones.contains(&vid) {
1761                return None;
1762            }
1763            if let Some(labels) = guard.get_vertex_labels(vid) {
1764                return Some(labels.to_vec());
1765            }
1766        }
1767
1768        // 3. Check pending flush L0s
1769        for pending_l0 in self.l0_manager.get_pending_flush() {
1770            let guard = pending_l0.read();
1771            if guard.vertex_tombstones.contains(&vid) {
1772                return None;
1773            }
1774            if let Some(labels) = guard.get_vertex_labels(vid) {
1775                return Some(labels.to_vec());
1776            }
1777        }
1778
1779        // 4. Check storage
1780        self.find_vertex_labels_in_storage(vid).await.ok().flatten()
1781    }
1782
1783    /// Helper to get edge type from L0 buffer.
1784    fn get_edge_type_from_l0(&self, eid: Eid) -> Option<String> {
1785        let l0 = self.l0_manager.get_current();
1786        let l0_guard = l0.read();
1787        l0_guard.get_edge_type(eid).map(|s| s.to_string())
1788    }
1789
1790    /// Look up the edge type ID (u32) for an EID from the L0 buffer's edge endpoints.
1791    /// Falls back to the transaction L0 if available.
1792    pub fn get_edge_type_id_from_l0(
1793        &self,
1794        eid: Eid,
1795        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1796    ) -> Option<u32> {
1797        // Check transaction L0 first
1798        if let Some(tx_l0) = tx_l0 {
1799            let guard = tx_l0.read();
1800            if let Some((_, _, etype)) = guard.get_edge_endpoint_full(eid) {
1801                return Some(etype);
1802            }
1803        }
1804        // Fall back to main L0
1805        let l0 = self.l0_manager.get_current();
1806        let l0_guard = l0.read();
1807        l0_guard
1808            .get_edge_endpoint_full(eid)
1809            .map(|(_, _, etype)| etype)
1810    }
1811
1812    /// Set the type name for an edge (used for schemaless edge types).
1813    /// This is called during CREATE for edge types not found in the schema.
1814    pub fn set_edge_type(
1815        &self,
1816        eid: Eid,
1817        type_name: String,
1818        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1819    ) {
1820        self.resolve_l0(tx_l0).write().set_edge_type(eid, type_name);
1821    }
1822
1823    /// Evaluate a simple CHECK constraint expression.
1824    /// Supports: "property op value" (e.g., "age > 18", "status = 'active'")
1825    fn evaluate_check_constraint(&self, expression: &str, properties: &Properties) -> Result<bool> {
1826        let parts: Vec<&str> = expression.split_whitespace().collect();
1827        if parts.len() != 3 {
1828            // For now, only support "prop op val"
1829            // Fallback to true if too complex to avoid breaking, but warn
1830            log::warn!(
1831                "Complex CHECK constraint expression '{}' not fully supported yet; allowing write.",
1832                expression
1833            );
1834            return Ok(true);
1835        }
1836
1837        let prop_part = parts[0].trim_start_matches('(');
1838        // Handle "variable.property" format - take the part after the dot
1839        let prop_name = if let Some(idx) = prop_part.find('.') {
1840            &prop_part[idx + 1..]
1841        } else {
1842            prop_part
1843        };
1844
1845        let op = parts[1];
1846        let val_str = parts[2].trim_end_matches(')');
1847
1848        let prop_val = match properties.get(prop_name) {
1849            Some(v) => v,
1850            None => return Ok(true), // If property missing, CHECK usually passes (unless NOT NULL)
1851        };
1852
1853        // Parse value string (handle quotes for strings)
1854        let target_val = if (val_str.starts_with('\'') && val_str.ends_with('\''))
1855            || (val_str.starts_with('"') && val_str.ends_with('"'))
1856        {
1857            Value::String(val_str[1..val_str.len() - 1].to_string())
1858        } else if let Ok(n) = val_str.parse::<i64>() {
1859            Value::Int(n)
1860        } else if let Ok(n) = val_str.parse::<f64>() {
1861            Value::Float(n)
1862        } else if let Ok(b) = val_str.parse::<bool>() {
1863            Value::Bool(b)
1864        } else {
1865            // Check for internal format wrappers if they somehow leaked through
1866            if val_str.starts_with("Number(") && val_str.ends_with(')') {
1867                let n_str = &val_str[7..val_str.len() - 1];
1868                if let Ok(n) = n_str.parse::<i64>() {
1869                    Value::Int(n)
1870                } else if let Ok(n) = n_str.parse::<f64>() {
1871                    Value::Float(n)
1872                } else {
1873                    Value::String(val_str.to_string())
1874                }
1875            } else {
1876                Value::String(val_str.to_string())
1877            }
1878        };
1879
1880        match op {
1881            "=" | "==" => Ok(prop_val == &target_val),
1882            "!=" | "<>" => Ok(prop_val != &target_val),
1883            ">" => self
1884                .compare_values(prop_val, &target_val)
1885                .map(|o| o.is_gt()),
1886            "<" => self
1887                .compare_values(prop_val, &target_val)
1888                .map(|o| o.is_lt()),
1889            ">=" => self
1890                .compare_values(prop_val, &target_val)
1891                .map(|o| o.is_ge()),
1892            "<=" => self
1893                .compare_values(prop_val, &target_val)
1894                .map(|o| o.is_le()),
1895            _ => {
1896                log::warn!("Unsupported operator '{}' in CHECK constraint", op);
1897                Ok(true)
1898            }
1899        }
1900    }
1901
1902    fn compare_values(&self, a: &Value, b: &Value) -> Result<std::cmp::Ordering> {
1903        use std::cmp::Ordering;
1904
1905        fn cmp_f64(x: f64, y: f64) -> Ordering {
1906            x.partial_cmp(&y).unwrap_or(Ordering::Equal)
1907        }
1908
1909        match (a, b) {
1910            (Value::Int(n1), Value::Int(n2)) => Ok(n1.cmp(n2)),
1911            (Value::Float(f1), Value::Float(f2)) => Ok(cmp_f64(*f1, *f2)),
1912            (Value::Int(n), Value::Float(f)) => Ok(cmp_f64(*n as f64, *f)),
1913            (Value::Float(f), Value::Int(n)) => Ok(cmp_f64(*f, *n as f64)),
1914            (Value::String(s1), Value::String(s2)) => Ok(s1.cmp(s2)),
1915            _ => Err(anyhow!(
1916                "Cannot compare incompatible types: {:?} vs {:?}",
1917                a,
1918                b
1919            )),
1920        }
1921    }
1922
1923    async fn check_unique_constraint_multi(
1924        &self,
1925        label: &str,
1926        key_values: &[(String, Value)],
1927        current_vid: Vid,
1928        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1929    ) -> Result<()> {
1930        // Serialize constraint key once for O(1) lookups
1931        let key = serialize_constraint_key(label, key_values);
1932
1933        // 1. Check L0 (in-memory) using O(1) constraint index
1934        {
1935            let l0 = self.l0_manager.get_current();
1936            let l0_guard = l0.read();
1937            if l0_guard.has_constraint_key(&key, current_vid) {
1938                return Err(anyhow!(
1939                    "Constraint violation: Duplicate composite key for label '{}'",
1940                    label
1941                ));
1942            }
1943        }
1944
1945        // 1b. Check pending-flush buffers (Bug #9A). A flush rotates a key's
1946        // buffer onto `pending_flush` and installs a fresh empty current
1947        // buffer; until the rotated rows reach Lance the key is invisible to
1948        // both the current-buffer check above and the storage check below, so
1949        // a duplicate could slip through that flush window. Mirror the read
1950        // paths (e.g. `check_extid_globally_unique`, `get_vertex_labels`) that
1951        // already consult `pending_flush`.
1952        for pending_l0 in self.l0_manager.get_pending_flush() {
1953            if pending_l0.read().has_constraint_key(&key, current_vid) {
1954                return Err(anyhow!(
1955                    "Constraint violation: Duplicate composite key for label '{}' (in pending flush)",
1956                    label
1957                ));
1958            }
1959        }
1960
1961        // Check Transaction L0
1962        if let Some(tx_l0) = tx_l0 {
1963            let tx_l0_guard = tx_l0.read();
1964            if tx_l0_guard.has_constraint_key(&key, current_vid) {
1965                return Err(anyhow!(
1966                    "Constraint violation: Duplicate composite key for label '{}' (in tx)",
1967                    label
1968                ));
1969            }
1970        }
1971
1972        // 2. Check Storage (L1/L2)
1973        let filters: Vec<String> = key_values
1974            .iter()
1975            .map(|(prop, val)| {
1976                let val_str = match val {
1977                    Value::String(s) => format!("'{}'", s.replace('\'', "''")),
1978                    Value::Int(n) => n.to_string(),
1979                    Value::Float(f) => f.to_string(),
1980                    Value::Bool(b) => b.to_string(),
1981                    _ => "NULL".to_string(),
1982                };
1983                format!("{} = {}", prop, val_str)
1984            })
1985            .collect();
1986
1987        let mut filter = filters.join(" AND ");
1988        filter.push_str(&format!(
1989            " AND _deleted = false AND _vid != {}",
1990            current_vid.as_u64()
1991        ));
1992
1993        #[cfg(feature = "lance-backend")]
1994        if let Ok(ds) = self.storage.vertex_dataset(label)
1995            && let Ok(lance_ds) = ds.open_raw().await
1996        {
1997            let count = lance_ds.count_rows(Some(filter.clone())).await?;
1998            if count > 0 {
1999                return Err(anyhow!(
2000                    "Constraint violation: Duplicate composite key for label '{}' (in storage). Filter: {}",
2001                    label,
2002                    filter
2003                ));
2004            }
2005        }
2006
2007        Ok(())
2008    }
2009
2010    async fn check_write_pressure(&self) -> Result<()> {
2011        let status = self
2012            .storage
2013            .compaction_status()
2014            .map_err(|e| anyhow::anyhow!("Failed to get compaction status: {}", e))?;
2015        let l1_runs = status.l1_runs;
2016        let throttle = &self.config.throttle;
2017
2018        if l1_runs >= throttle.hard_limit {
2019            log::warn!("Write stalled: L1 runs ({}) at hard limit", l1_runs);
2020            // Simple polling for now
2021            while self
2022                .storage
2023                .compaction_status()
2024                .map_err(|e| anyhow::anyhow!("Failed to get compaction status: {}", e))?
2025                .l1_runs
2026                >= throttle.hard_limit
2027            {
2028                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2029            }
2030        } else if l1_runs >= throttle.soft_limit {
2031            let excess = l1_runs - throttle.soft_limit;
2032            // Cap multiplier to avoid overflow
2033            let excess = std::cmp::min(excess, 31);
2034            let multiplier = 2_u32.pow(excess as u32);
2035            let delay = throttle.base_delay * multiplier;
2036            tokio::time::sleep(delay).await;
2037        }
2038        Ok(())
2039    }
2040
2041    /// Check transaction memory limit to prevent OOM.
2042    /// No-op when no transaction is active.
2043    fn check_transaction_memory(&self, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) -> Result<()> {
2044        if let Some(tx_l0) = tx_l0 {
2045            let size = tx_l0.read().estimated_size;
2046            if size > self.config.max_transaction_memory {
2047                return Err(anyhow!(
2048                    "Transaction memory limit exceeded: {} bytes used, limit is {} bytes. \
2049                     Roll back or commit the current transaction.",
2050                    size,
2051                    self.config.max_transaction_memory
2052                ));
2053            }
2054        }
2055        Ok(())
2056    }
2057
2058    async fn get_query_context(
2059        &self,
2060        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2061    ) -> Option<QueryContext> {
2062        Some(QueryContext::new_with_pending(
2063            self.l0_manager.get_current(),
2064            tx_l0.cloned(),
2065            self.l0_manager.get_pending_flush(),
2066        ))
2067    }
2068
2069    /// Layer-1 CRDT variant enforcement, shared by the single-vertex and batch
2070    /// write paths.
2071    ///
2072    /// Rejects a declared CRDT property written as a parsed CRDT value
2073    /// (`Value::Map`) whose variant differs from the schema's declared variant.
2074    /// A mismatch would make the commit-time merge silently overwrite instead of
2075    /// merge, and the OCC CRDT carve-out (`occ::crdt_carveout_overwrite` /
2076    /// `WriteSet::from_l0`) would hide it as a lost update — so it must be caught
2077    /// at write time, on *every* write path. `try_as_crdt` is `Map`-gated, so the
2078    /// JSON-string (Cypher) form and non-CRDT values pass through untouched: they
2079    /// are never carved out and stay conflictable.
2080    fn enforce_crdt_variants(
2081        props_meta: &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
2082        properties: &Properties,
2083    ) -> Result<()> {
2084        for (key, value) in properties {
2085            let Some(meta) = props_meta.get(key) else {
2086                continue;
2087            };
2088            let uni_common::core::schema::DataType::Crdt(expected) = &meta.r#type else {
2089                continue;
2090            };
2091            if let Some(crdt) = crate::runtime::l0::try_as_crdt(value)
2092                && crdt.type_name() != expected.type_name()
2093            {
2094                return Err(anyhow::Error::new(uni_common::UniError::Constraint {
2095                    message: format!(
2096                        "CRDT property '{key}' must be written as a {} value",
2097                        expected.type_name()
2098                    ),
2099                }));
2100            }
2101        }
2102        Ok(())
2103    }
2104
2105    /// Prepare a vertex for upsert by merging CRDT properties with existing values.
2106    ///
2107    /// When `label` is provided, uses it directly to look up property metadata.
2108    /// Otherwise falls back to discovering the label from L0 buffers and storage.
2109    ///
2110    /// # Errors
2111    ///
2112    /// Returns an error if CRDT property merging fails.
2113    async fn prepare_vertex_upsert(
2114        &self,
2115        vid: Vid,
2116        properties: &mut Properties,
2117        label: Option<&str>,
2118        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2119    ) -> Result<()> {
2120        let Some(pm) = &self.property_manager else {
2121            return Ok(());
2122        };
2123
2124        let schema = self.schema_manager.schema();
2125
2126        // Resolve label: use provided label or discover from L0/storage
2127        let discovered_labels;
2128        let label_name = if let Some(l) = label {
2129            Some(l)
2130        } else {
2131            discovered_labels = self.get_vertex_labels(vid, tx_l0).await;
2132            discovered_labels
2133                .as_ref()
2134                .and_then(|l| l.first().map(|s| s.as_str()))
2135        };
2136
2137        let Some(label_str) = label_name else {
2138            return Ok(());
2139        };
2140        let Some(props_meta) = schema.properties.get(label_str) else {
2141            return Ok(());
2142        };
2143
2144        // Identify CRDT properties in the insert data
2145        let crdt_keys: Vec<String> = properties
2146            .keys()
2147            .filter(|key| {
2148                props_meta.get(*key).is_some_and(|meta| {
2149                    matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2150                })
2151            })
2152            .cloned()
2153            .collect();
2154
2155        if crdt_keys.is_empty() {
2156            return Ok(());
2157        }
2158
2159        // Enforce that each declared CRDT property written as a parsed CRDT value
2160        // (`Value::Map`) carries its declared variant. A mismatched variant makes
2161        // `merge_crdt_properties` overwrite rather than merge at commit, and the
2162        // OCC carve-out (`occ::crdt_carveout_overwrite` / `WriteSet::from_l0`)
2163        // would hide that as a silent lost update — reject it at the source.
2164        //
2165        // Only the `Map` form is checked: it is exactly the form the carve-out
2166        // applies to (`try_as_crdt` is `Map`-gated). A CRDT written as a JSON
2167        // string (the Cypher form) or a non-CRDT value is never carved out — it
2168        // stays conflictable — so it poses no carve-out soundness risk and is left
2169        // to the existing merge/parse path. This is the declared-property half of
2170        // the layered fix; the commit-time check covers undeclared CRDT-shaped values.
2171        Self::enforce_crdt_variants(props_meta, properties)?;
2172
2173        let ctx = self.get_query_context(tx_l0).await;
2174        for key in crdt_keys {
2175            let existing = pm.get_vertex_prop_with_ctx(vid, &key, ctx.as_ref()).await?;
2176            if !existing.is_null()
2177                && let Some(val) = properties.get_mut(&key)
2178            {
2179                *val = pm.merge_crdt_values(&existing, val)?;
2180            }
2181        }
2182
2183        Ok(())
2184    }
2185
2186    async fn prepare_edge_upsert(
2187        &self,
2188        eid: Eid,
2189        properties: &mut Properties,
2190        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2191    ) -> Result<()> {
2192        if let Some(pm) = &self.property_manager {
2193            let schema = self.schema_manager.schema();
2194            // Get edge type from L0 buffer instead of from EID
2195            let type_name = self.get_edge_type_from_l0(eid);
2196
2197            if let Some(ref t_name) = type_name
2198                && let Some(props_meta) = schema.properties.get(t_name)
2199            {
2200                let mut crdt_keys = Vec::new();
2201                for (key, _) in properties.iter() {
2202                    if let Some(meta) = props_meta.get(key)
2203                        && matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2204                    {
2205                        crdt_keys.push(key.clone());
2206                    }
2207                }
2208
2209                if !crdt_keys.is_empty() {
2210                    let ctx = self.get_query_context(tx_l0).await;
2211                    for key in crdt_keys {
2212                        let existing = pm.get_edge_prop(eid, &key, ctx.as_ref()).await?;
2213
2214                        if !existing.is_null()
2215                            && let Some(val) = properties.get_mut(&key)
2216                        {
2217                            *val = pm.merge_crdt_values(&existing, val)?;
2218                        }
2219                    }
2220                }
2221            }
2222        }
2223        Ok(())
2224    }
2225
2226    #[instrument(skip(self, properties), level = "trace")]
2227    pub async fn insert_vertex(
2228        &self,
2229        vid: Vid,
2230        properties: Properties,
2231        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2232    ) -> Result<()> {
2233        self.insert_vertex_with_labels(vid, properties, &[], tx_l0)
2234            .await?;
2235        Ok(())
2236    }
2237
2238    /// Component C1 (G4): before a non-transactional mutation merges into main
2239    /// L0, if an outstanding snapshot pins the current generation, freeze it
2240    /// aside so snapshots taken *before* this write stay isolated from it.
2241    ///
2242    /// `flush_lock` (acquired and released here) serializes the freeze against
2243    /// concurrent commit-time freezes/merges, matching the atomicity the tx
2244    /// commit path gets. No-op for transactional writes (their freeze happens at
2245    /// commit) and — the common case — when nothing is pinned, where it costs one
2246    /// atomic load. Freezes at most once per pinned generation: the freeze
2247    /// installs a fresh unpinned `current`, so later writes in the same bulk
2248    /// import see no pin and merge in place, and the snapshot keeps reading the
2249    /// frozen pre-import buffer.
2250    async fn freeze_for_non_tx_write_if_pinned(&self, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) {
2251        // Self-gates on the runtime SSI toggle: nothing pins a snapshot unless a
2252        // transaction began under `ssi_enabled`, so `is_current_pinned()` is
2253        // always false (one atomic load) when SSI is off.
2254        if tx_l0.is_none() && self.l0_manager.is_current_pinned() {
2255            let _flush_lock_guard = self.flush_lock.lock().await;
2256            // Re-check under the lock: a concurrent commit may have frozen first.
2257            if self.l0_manager.is_current_pinned() {
2258                self.l0_manager.freeze_current_for_snapshot();
2259                metrics::counter!("uni_l0_snapshot_freezes_total").increment(1);
2260            }
2261        }
2262    }
2263
2264    #[instrument(skip(self, properties, labels), level = "trace")]
2265    pub async fn insert_vertex_with_labels(
2266        &self,
2267        vid: Vid,
2268        mut properties: Properties,
2269        labels: &[String],
2270        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2271    ) -> Result<Properties> {
2272        let start = std::time::Instant::now();
2273        self.check_write_pressure().await?;
2274        self.check_transaction_memory(tx_l0)?;
2275
2276        // Component C1 (G4): a non-transactional write (`tx_l0 == None`, e.g. bulk
2277        // import / LOAD CSV) mutates main L0 directly, outside the commit-time
2278        // snapshot freeze. Freeze the pinned generation aside first so snapshots
2279        // taken before this write stay isolated from it.
2280        self.freeze_for_non_tx_write_if_pinned(tx_l0).await;
2281
2282        if !self.try_defer_embedding(labels, &properties, vid, tx_l0) {
2283            self.process_embeddings_for_labels(labels, &mut properties)
2284                .await?;
2285        }
2286        self.validate_vertex_constraints(vid, &properties, labels, tx_l0)
2287            .await?;
2288        self.prepare_vertex_upsert(
2289            vid,
2290            &mut properties,
2291            labels.first().map(|s| s.as_str()),
2292            tx_l0,
2293        )
2294        .await?;
2295
2296        // Clone properties and labels before moving into L0 to return them and populate constraint index
2297        let properties_copy = properties.clone();
2298        let labels_copy = labels.to_vec();
2299
2300        {
2301            // For a non-tx write, re-resolve the live `current` buffer and hold
2302            // `flush_lock` across the (synchronous) write so a concurrent flush
2303            // rotate (which takes `flush_lock` via `begin_flush`) cannot install
2304            // a fresh `current` between resolve and write, dropping our write
2305            // (Bug #4). For a tx write `resolve_l0` returns the tx-private
2306            // buffer, never the rotating `current`, so no `flush_lock` is needed
2307            // (and taking it would risk re-entrancy with the commit path).
2308            let _flush_lock_guard = if tx_l0.is_none() {
2309                Some(self.flush_lock.lock().await)
2310            } else {
2311                None
2312            };
2313            let l0 = self.resolve_l0(tx_l0);
2314            let mut l0_guard = l0.write();
2315            // Generation chaining: a non-tx CRDT write into the (post-freeze,
2316            // possibly empty) current buffer must merge against the chained
2317            // committed state, not shadow it. No-op when the chain is empty
2318            // or this is a tx-private write.
2319            if tx_l0.is_none() {
2320                let pending = self.l0_manager.get_pending_flush();
2321                if !pending.is_empty() {
2322                    Self::seed_crdt_props(&pending, vid, &properties, &mut l0_guard);
2323                }
2324            }
2325            l0_guard.insert_vertex_with_labels(vid, properties, labels);
2326
2327            // Populate constraint index for O(1) duplicate detection
2328            let schema = self.schema_manager.schema();
2329            for label in &labels_copy {
2330                if schema.get_label_case_insensitive(label).is_none() {
2331                    if self.config.strict_schema {
2332                        return Err(anyhow::anyhow!(
2333                            "Label '{}' is not defined in the schema \
2334                             (strict_schema is enabled).",
2335                            label
2336                        ));
2337                    }
2338                    continue; // Schemaless: skip unknown labels.
2339                }
2340
2341                // For each unique constraint on this label, insert into constraint index
2342                for constraint in &schema.constraints {
2343                    if !constraint.enabled {
2344                        continue;
2345                    }
2346                    if let ConstraintTarget::Label(l) = &constraint.target {
2347                        if l != label {
2348                            continue;
2349                        }
2350                    } else {
2351                        continue;
2352                    }
2353
2354                    if let ConstraintType::Unique {
2355                        properties: unique_props,
2356                    } = &constraint.constraint_type
2357                    {
2358                        let mut key_values = Vec::new();
2359                        let mut all_present = true;
2360                        for prop in unique_props {
2361                            if let Some(val) = properties_copy.get(prop) {
2362                                key_values.push((prop.clone(), val.clone()));
2363                            } else {
2364                                all_present = false;
2365                                break;
2366                            }
2367                        }
2368
2369                        if all_present {
2370                            let key = serialize_constraint_key(label, &key_values);
2371                            l0_guard.insert_constraint_key(key, vid);
2372                        }
2373                    }
2374                }
2375            }
2376        }
2377
2378        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2379        self.update_metrics();
2380
2381        if tx_l0.is_none() {
2382            self.check_flush().await?;
2383        }
2384        if start.elapsed().as_millis() > 100 {
2385            log::warn!("Slow insert_vertex: {}ms", start.elapsed().as_millis());
2386        }
2387        Ok(properties_copy)
2388    }
2389
2390    /// True iff routing this partial write through MergeInsert would
2391    /// miss a constraint check. Specifically: a multi-key UNIQUE
2392    /// constraint where the touched-set doesn't cover all member keys
2393    /// requires the unchanged keys from the existing row to compute
2394    /// the composite. Conservative: also returns true if any touched
2395    /// key is `ext_id` (uniqueness checked globally — handled in the
2396    /// full-row path).
2397    fn touched_needs_full_read(&self, touched: &Properties, labels: &[String]) -> bool {
2398        if touched.contains_key("ext_id") {
2399            return true;
2400        }
2401        let schema = self.schema_manager.schema();
2402        for label in labels {
2403            if schema.get_label_case_insensitive(label).is_none() {
2404                continue;
2405            }
2406            for constraint in &schema.constraints {
2407                if !constraint.enabled {
2408                    continue;
2409                }
2410                if let ConstraintTarget::Label(l) = &constraint.target {
2411                    if !l.eq_ignore_ascii_case(label) {
2412                        continue;
2413                    }
2414                } else {
2415                    continue;
2416                }
2417                if let ConstraintType::Unique {
2418                    properties: unique_props,
2419                } = &constraint.constraint_type
2420                {
2421                    if unique_props.len() < 2 {
2422                        continue; // single-key UNIQUE — partial path sees the key
2423                    }
2424                    if unique_props.iter().any(|p| touched.contains_key(p)) {
2425                        return true;
2426                    }
2427                }
2428            }
2429        }
2430        false
2431    }
2432
2433    /// Insert a vertex's FULL property row plus a touched-keys hint so
2434    /// the flush emits ONLY those columns via Lance MergeInsert.
2435    ///
2436    /// Caller must have read the full row (via PropertyManager) and
2437    /// applied SET-touched values on top before calling — same input
2438    /// shape as `insert_vertex_with_labels`. The new arg `touched_keys`
2439    /// is the set of property keys this SET statement actually
2440    /// assigned; L0 records it in `vertex_partial_keys[vid]` and the
2441    /// flush filters the MergeInsert source schema down to those keys.
2442    /// When `UniConfig::partial_lance_writes == false`, falls through
2443    /// to `insert_vertex_with_labels` (Append) — preserving bit-for-bit
2444    /// equivalence with prior releases.
2445    #[instrument(skip(self, props, touched_keys, labels), level = "trace")]
2446    pub async fn insert_vertex_partial_full(
2447        &self,
2448        vid: Vid,
2449        mut props: Properties,
2450        touched_keys: HashSet<String>,
2451        labels: &[String],
2452        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2453    ) -> Result<()> {
2454        if !self.config.partial_lance_writes
2455            || self.touched_needs_full_read(&props_subset(&props, &touched_keys), labels)
2456        {
2457            self.insert_vertex_with_labels(vid, props, labels, tx_l0)
2458                .await?;
2459            return Ok(());
2460        }
2461
2462        self.check_write_pressure().await?;
2463        self.check_transaction_memory(tx_l0)?;
2464        if !self.try_defer_embedding(labels, &props, vid, tx_l0) {
2465            self.process_embeddings_for_labels(labels, &mut props)
2466                .await?;
2467        }
2468        // Full-row validation runs because we have the complete map;
2469        // no need for the partial-only validator.
2470        self.validate_vertex_constraints(vid, &props, labels, tx_l0)
2471            .await?;
2472        {
2473            let l0 = self.resolve_l0(tx_l0);
2474            let mut l0_guard = l0.write();
2475            l0_guard.insert_vertex_partial_full(vid, props, touched_keys, labels);
2476        }
2477        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2478        metrics::counter!("uni_partial_writes_total").increment(1);
2479        self.update_metrics();
2480        if tx_l0.is_none() {
2481            self.check_flush().await?;
2482        }
2483        Ok(())
2484    }
2485
2486    /// Insert a vertex's *partial* property set without first reading the
2487    /// full row.
2488    ///
2489    /// When `WriterConfig::partial_lance_writes` is `true`, the touched
2490    /// keys flow into `L0Buffer::vertex_partial_keys` so the next flush
2491    /// emits them via Lance `MergeInsertBuilder` against a subset-of-
2492    /// schema source — preserving untouched columns (e.g., embeddings)
2493    /// byte-equal in Lance with no read at the caller and no write of
2494    /// those columns.
2495    ///
2496    /// When the flag is `false`, this falls back to the existing
2497    /// `insert_vertex_with_labels` path after merging `touched` with
2498    /// the current properties from L0/storage. The caller can therefore
2499    /// use this entry point unconditionally; the optimization activates
2500    /// only when the flag is on.
2501    #[instrument(skip(self, touched, labels), level = "trace")]
2502    pub async fn insert_vertex_partial(
2503        &self,
2504        vid: Vid,
2505        touched: Properties,
2506        labels: &[String],
2507        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2508    ) -> Result<()> {
2509        let needs_full_read =
2510            !self.config.partial_lance_writes || self.touched_needs_full_read(&touched, labels);
2511        if needs_full_read {
2512            // Flag-off fallback (or constraint-driven fallback): merge
2513            // `touched` with the current full property snapshot from
2514            // L0/storage and route through the existing path. Preserves
2515            // bit-for-bit equivalence with the pre-Round-11 release.
2516            let existing = if let Some(pm) = &self.property_manager {
2517                pm.get_all_vertex_props_with_ctx(vid, None)
2518                    .await
2519                    .unwrap_or_default()
2520                    .unwrap_or_default()
2521            } else {
2522                Properties::new()
2523            };
2524            let mut merged = existing;
2525            for (k, v) in touched {
2526                merged.insert(k, v);
2527            }
2528            self.insert_vertex_with_labels(vid, merged, labels, tx_l0)
2529                .await?;
2530            return Ok(());
2531        }
2532
2533        // Flag-on fast path: stage the partial update directly. Pressure
2534        // checks, embedding generation, constraint validation all still
2535        // run — but the validator is the partial-aware variant that
2536        // skips NOT NULL / multi-key UNIQUE / CHECK / EXISTS for
2537        // properties not present in `touched`. Multi-key UNIQUE that
2538        // overlaps the touched set forces a fallback above via
2539        // `touched_needs_full_read`.
2540        let mut touched = touched;
2541        self.check_write_pressure().await?;
2542        self.check_transaction_memory(tx_l0)?;
2543        if !self.try_defer_embedding(labels, &touched, vid, tx_l0) {
2544            self.process_embeddings_for_labels(labels, &mut touched)
2545                .await?;
2546        }
2547        self.validate_vertex_constraints_partial(vid, &touched, labels, tx_l0)
2548            .await?;
2549
2550        {
2551            let l0 = self.resolve_l0(tx_l0);
2552            let mut l0_guard = l0.write();
2553            l0_guard.insert_vertex_partial(vid, touched, labels);
2554        }
2555
2556        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2557        metrics::counter!("uni_partial_writes_total").increment(1);
2558        self.update_metrics();
2559        if tx_l0.is_none() {
2560            self.check_flush().await?;
2561        }
2562        Ok(())
2563    }
2564
2565    /// Insert multiple vertices with batched operations.
2566    ///
2567    /// This method uses batched operations to achieve O(N) complexity instead of O(N²)
2568    /// for bulk inserts with unique constraints.
2569    ///
2570    /// # Performance Improvements
2571    /// - Batch VID allocation: 1 call instead of N calls
2572    /// - Batch constraint validation: O(N) instead of O(N²)
2573    /// - Batch embedding generation: 1 API call per config instead of N calls
2574    /// - Transaction wrapping: Automatic flush deferral, atomicity
2575    ///
2576    /// # Arguments
2577    /// * `vids` - Pre-allocated VIDs for the vertices
2578    /// * `properties_batch` - Properties for each vertex
2579    /// * `labels` - Labels for all vertices (assumes single label for simplicity)
2580    ///
2581    /// # Errors
2582    /// Returns error if:
2583    /// - VID/properties length mismatch
2584    /// - Constraint violation detected
2585    /// - Embedding generation fails
2586    /// - Transaction commit fails
2587    ///
2588    /// # Atomicity
2589    /// If this method fails, all changes are rolled back (if transaction was started here).
2590    pub async fn insert_vertices_batch(
2591        &self,
2592        vids: Vec<Vid>,
2593        mut properties_batch: Vec<Properties>,
2594        labels: Vec<String>,
2595        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2596    ) -> Result<Vec<Properties>> {
2597        let start = std::time::Instant::now();
2598
2599        // Validate inputs
2600        if vids.len() != properties_batch.len() {
2601            return Err(anyhow!(
2602                "VID/properties size mismatch: {} vids, {} properties",
2603                vids.len(),
2604                properties_batch.len()
2605            ));
2606        }
2607
2608        if vids.is_empty() {
2609            return Ok(Vec::new());
2610        }
2611
2612        // Batch operations — writes go directly to the resolved L0.
2613        // Atomicity is guaranteed by the caller holding the writer lock.
2614        let result = async {
2615            self.check_write_pressure().await?;
2616            self.check_transaction_memory(tx_l0)?;
2617
2618            // Component C1 (G4): batch bulk-import is the canonical non-tx write —
2619            // freeze the pinned generation aside before merging so snapshot
2620            // readers stay isolated. No-op when unpinned or transactional.
2621            self.freeze_for_non_tx_write_if_pinned(tx_l0).await;
2622
2623            // Batch embedding generation (1 API call per config)
2624            self.process_embeddings_for_batch(&labels, &mut properties_batch)
2625                .await?;
2626
2627            // Batch constraint validation (O(N) instead of O(N²))
2628            let label = labels
2629                .first()
2630                .ok_or_else(|| anyhow!("No labels provided"))?;
2631            self.validate_vertex_batch_constraints(&vids, &properties_batch, label, tx_l0)
2632                .await?;
2633
2634            // Batch prepare (CRDT merging if needed)
2635            // Check schema once: skip entirely if no CRDT properties for this label.
2636            // For new vertices (freshly allocated VIDs), there are no existing CRDT
2637            // values to merge, so the per-vertex lookup is unnecessary in that case.
2638            let has_crdt_fields = {
2639                let schema = self.schema_manager.schema();
2640                schema
2641                    .properties
2642                    .get(label.as_str())
2643                    .is_some_and(|props_meta| {
2644                        props_meta.values().any(|meta| {
2645                            matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2646                        })
2647                    })
2648            };
2649
2650            if has_crdt_fields {
2651                // Layer-1 variant enforcement (G3): the batch path must reject a
2652                // declared-CRDT variant mismatch exactly as the single-vertex
2653                // `prepare_vertex_upsert` does. Without this, a wrong-variant CRDT
2654                // written via batch import slips past write-time validation and
2655                // the OCC carve-out then masks the overwrite as a lost update.
2656                {
2657                    let schema = self.schema_manager.schema();
2658                    if let Some(props_meta) = schema.properties.get(label.as_str()) {
2659                        for props in &properties_batch {
2660                            Self::enforce_crdt_variants(props_meta, props)?;
2661                        }
2662                    }
2663                }
2664
2665                // Batch fetch existing CRDT values: collect VIDs that need merging,
2666                // then query once via PropertyManager instead of per-vertex lookups.
2667                let schema = self.schema_manager.schema();
2668                let crdt_keys: Vec<String> = schema
2669                    .properties
2670                    .get(label.as_str())
2671                    .map(|props_meta| {
2672                        props_meta
2673                            .iter()
2674                            .filter(|(_, meta)| {
2675                                matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2676                            })
2677                            .map(|(key, _)| key.clone())
2678                            .collect()
2679                    })
2680                    .unwrap_or_default();
2681
2682                if let Some(pm) = &self.property_manager {
2683                    let ctx = self.get_query_context(tx_l0).await;
2684                    for (vid, props) in vids.iter().zip(&mut properties_batch) {
2685                        for key in &crdt_keys {
2686                            if props.contains_key(key) {
2687                                let existing =
2688                                    pm.get_vertex_prop_with_ctx(*vid, key, ctx.as_ref()).await?;
2689                                if !existing.is_null()
2690                                    && let Some(val) = props.get_mut(key)
2691                                {
2692                                    *val = pm.merge_crdt_values(&existing, val)?;
2693                                }
2694                            }
2695                        }
2696                    }
2697                }
2698            }
2699
2700            // Batch L0 writes — route to active L0 (transaction L0 if active, else current).
2701            let target_l0 = self.resolve_l0(tx_l0);
2702
2703            let properties_result = properties_batch.clone();
2704            {
2705                let mut l0_guard = target_l0.write();
2706                for (vid, props) in vids.iter().zip(properties_batch.iter()) {
2707                    l0_guard.insert_vertex_with_labels(*vid, props.clone(), &labels);
2708                }
2709            }
2710
2711            // Update metrics (batch increment)
2712            metrics::counter!("uni_l0_buffer_mutations_total").increment(vids.len() as u64);
2713            self.update_metrics();
2714
2715            Ok::<Vec<Properties>, anyhow::Error>(properties_result)
2716        }
2717        .await;
2718
2719        let props = result?;
2720
2721        if start.elapsed().as_millis() > 100 {
2722            log::warn!(
2723                "Slow insert_vertices_batch ({} vertices): {}ms",
2724                vids.len(),
2725                start.elapsed().as_millis()
2726            );
2727        }
2728
2729        Ok(props)
2730    }
2731
2732    /// Delete a vertex by VID.
2733    ///
2734    /// When `labels` is provided, uses them directly to populate L0 for
2735    /// correct tombstone flushing. Otherwise discovers labels from L0
2736    /// buffers and storage (which can be slow for many vertices).
2737    ///
2738    /// # Errors
2739    ///
2740    /// Returns an error if write pressure stalls, label lookup fails, or
2741    /// the L0 delete operation fails.
2742    #[instrument(skip(self, labels), level = "trace")]
2743    pub async fn delete_vertex(
2744        &self,
2745        vid: Vid,
2746        labels: Option<Vec<String>>,
2747        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2748    ) -> Result<()> {
2749        let start = std::time::Instant::now();
2750        self.check_write_pressure().await?;
2751        self.check_transaction_memory(tx_l0)?;
2752        self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
2753
2754        // Before deleting, ensure we have the vertex's labels stored in L0 so
2755        // the tombstone can be flushed to the correct label datasets. Discover
2756        // them up front (this may await storage) WITHOUT pinning the buffer we
2757        // will eventually mutate — for non-tx writes the live `current` buffer
2758        // is re-resolved below under `flush_lock`, so a concurrent rotate can't
2759        // drop our write (Bug #4). `resolve_l0` here is only used for cheap
2760        // reads that tolerate a racing rotate.
2761        let has_labels = {
2762            let l0_guard = self.resolve_l0(tx_l0);
2763            let guard = l0_guard.read();
2764            guard.vertex_labels.contains_key(&vid)
2765        };
2766
2767        let backfill_labels = if has_labels {
2768            None
2769        } else if let Some(provided) = labels {
2770            // Caller provided labels — skip the lookup entirely
2771            Some(provided)
2772        } else {
2773            // Discover labels from pending flush L0s, then storage
2774            let mut found = None;
2775            for pending_l0 in self.l0_manager.get_pending_flush() {
2776                let pending_guard = pending_l0.read();
2777                if let Some(l) = pending_guard.get_vertex_labels(vid) {
2778                    found = Some(l.to_vec());
2779                    break;
2780                }
2781            }
2782            if found.is_none() {
2783                found = self.find_vertex_labels_in_storage(vid).await?;
2784            }
2785            found
2786        };
2787
2788        // Test-only seam (no-op without the `failpoints` feature): pause a
2789        // non-transactional delete AFTER the awaited label discovery but BEFORE
2790        // it re-resolves the live buffer and writes the tombstone. A concurrent
2791        // flush can rotate+complete a buffer in this window; the fix re-resolves
2792        // `get_current()` and mutates it under `flush_lock`, so the tombstone
2793        // always lands in the live buffer (Bug #4 — silent lost delete across
2794        // L0 rotation).
2795        fail::fail_point!("nontx::after-capture");
2796
2797        // Apply the label backfill and the tombstone together. For a non-tx
2798        // write, hold `flush_lock` across the (synchronous) re-resolve + write
2799        // so a concurrent flush rotate (which takes `flush_lock` via
2800        // `begin_flush`) cannot install a fresh `current` between our resolve
2801        // and our write. For a tx write `resolve_l0` returns the tx-private
2802        // buffer (never the rotating `current`), so no `flush_lock` is needed
2803        // — and taking it there would risk re-entrancy with the commit path.
2804        if tx_l0.is_none() {
2805            let _flush_lock_guard = self.flush_lock.lock().await;
2806            let l0 = self.l0_manager.get_current();
2807            let mut guard = l0.write();
2808            if let Some(found_labels) = backfill_labels {
2809                guard.vertex_labels.insert(vid, found_labels);
2810            }
2811            guard.delete_vertex(vid)?;
2812        } else {
2813            let l0 = self.resolve_l0(tx_l0);
2814            let mut guard = l0.write();
2815            if let Some(found_labels) = backfill_labels {
2816                guard.vertex_labels.insert(vid, found_labels);
2817            }
2818            guard.delete_vertex(vid)?;
2819        }
2820        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2821        self.update_metrics();
2822
2823        if tx_l0.is_none() {
2824            self.check_flush().await?;
2825        }
2826        if start.elapsed().as_millis() > 100 {
2827            log::warn!("Slow delete_vertex: {}ms", start.elapsed().as_millis());
2828        }
2829        Ok(())
2830    }
2831
2832    /// Find vertex labels from storage by querying the main vertices table.
2833    /// Returns the labels from the latest non-deleted version of the vertex.
2834    async fn find_vertex_labels_in_storage(&self, vid: Vid) -> Result<Option<Vec<String>>> {
2835        use crate::backend::types::ScanRequest;
2836        use arrow_array::Array;
2837        use arrow_array::cast::AsArray;
2838
2839        let backend = self.storage.backend();
2840        let table_name = MainVertexDataset::table_name();
2841
2842        // Check if table exists first; if not, vertex hasn't been flushed to storage yet
2843        if !backend.table_exists(table_name).await? {
2844            return Ok(None);
2845        }
2846
2847        // Query for this specific vid (don't filter by _deleted yet - we need to find the latest version first)
2848        let filter = format!("_vid = {}", vid.as_u64());
2849        let batches = backend
2850            .scan(
2851                ScanRequest::all(table_name)
2852                    .with_filter(filter)
2853                    .with_columns(vec![
2854                        "_vid".to_string(),
2855                        "labels".to_string(),
2856                        "_version".to_string(),
2857                        "_deleted".to_string(),
2858                    ]),
2859            )
2860            .await
2861            .unwrap_or_default();
2862
2863        // Find the row with the highest version number
2864        let mut max_version: Option<u64> = None;
2865        let mut labels: Option<Vec<String>> = None;
2866        let mut is_deleted = false;
2867
2868        for batch in batches {
2869            if batch.num_rows() == 0 {
2870                continue;
2871            }
2872
2873            let version_array = batch
2874                .column_by_name("_version")
2875                .unwrap()
2876                .as_primitive::<arrow_array::types::UInt64Type>();
2877
2878            let deleted_array = batch.column_by_name("_deleted").unwrap().as_boolean();
2879
2880            let labels_array = batch.column_by_name("labels").unwrap().as_list::<i32>();
2881
2882            for row_idx in 0..batch.num_rows() {
2883                let version = version_array.value(row_idx);
2884
2885                if max_version.is_none_or(|mv| version > mv) {
2886                    is_deleted = deleted_array.value(row_idx);
2887
2888                    let labels_list = labels_array.value(row_idx);
2889                    let string_array = labels_list.as_string::<i32>();
2890                    let vertex_labels: Vec<String> = (0..string_array.len())
2891                        .filter(|&i| !string_array.is_null(i))
2892                        .map(|i| string_array.value(i).to_string())
2893                        .collect();
2894
2895                    max_version = Some(version);
2896                    labels = Some(vertex_labels);
2897                }
2898            }
2899        }
2900
2901        // If the latest version is deleted, return None
2902        if is_deleted { Ok(None) } else { Ok(labels) }
2903    }
2904
2905    #[expect(clippy::too_many_arguments)]
2906    #[instrument(skip(self, props, touched_keys), level = "trace")]
2907    pub async fn insert_edge_partial_full(
2908        &self,
2909        src_vid: Vid,
2910        dst_vid: Vid,
2911        edge_type: u32,
2912        eid: Eid,
2913        props: Properties,
2914        edge_type_name: Option<String>,
2915        touched_keys: HashSet<String>,
2916        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2917    ) -> Result<()> {
2918        self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
2919        if !self.config.partial_lance_writes {
2920            return self
2921                .insert_edge(
2922                    src_vid,
2923                    dst_vid,
2924                    edge_type,
2925                    eid,
2926                    props,
2927                    edge_type_name,
2928                    tx_l0,
2929                )
2930                .await;
2931        }
2932
2933        let start = std::time::Instant::now();
2934        self.check_write_pressure().await?;
2935        self.check_transaction_memory(tx_l0)?;
2936        let mut props = props;
2937        self.prepare_edge_upsert(eid, &mut props, tx_l0).await?;
2938
2939        let l0 = self.resolve_l0(tx_l0);
2940        l0.write().insert_edge_partial_full(
2941            src_vid,
2942            dst_vid,
2943            edge_type,
2944            eid,
2945            props,
2946            edge_type_name,
2947            touched_keys,
2948        )?;
2949
2950        if tx_l0.is_none() {
2951            let version = l0.read().current_version;
2952            self.adjacency_manager
2953                .insert_edge(src_vid, dst_vid, eid, edge_type, version);
2954        }
2955
2956        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2957        metrics::counter!("uni_partial_writes_total").increment(1);
2958        self.update_metrics();
2959        if tx_l0.is_none() {
2960            self.check_flush().await?;
2961        }
2962        if start.elapsed().as_millis() > 100 {
2963            log::warn!(
2964                "Slow insert_edge_partial_full: {}ms",
2965                start.elapsed().as_millis()
2966            );
2967        }
2968        Ok(())
2969    }
2970
2971    #[expect(clippy::too_many_arguments)]
2972    pub async fn insert_edge(
2973        &self,
2974        src_vid: Vid,
2975        dst_vid: Vid,
2976        edge_type: u32,
2977        eid: Eid,
2978        mut properties: Properties,
2979        edge_type_name: Option<String>,
2980        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2981    ) -> Result<()> {
2982        let start = std::time::Instant::now();
2983        self.check_write_pressure().await?;
2984        self.check_transaction_memory(tx_l0)?;
2985        self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
2986        self.prepare_edge_upsert(eid, &mut properties, tx_l0)
2987            .await?;
2988
2989        let l0 = self.resolve_l0(tx_l0);
2990        l0.write()
2991            .insert_edge(src_vid, dst_vid, edge_type, eid, properties, edge_type_name)?;
2992
2993        // Dual-write to AdjacencyManager overlay (survives flush).
2994        // Skip for transaction-local L0 -- transaction edges are overlaid separately.
2995        if tx_l0.is_none() {
2996            let version = l0.read().current_version;
2997            self.adjacency_manager
2998                .insert_edge(src_vid, dst_vid, eid, edge_type, version);
2999        }
3000
3001        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
3002        self.update_metrics();
3003
3004        if tx_l0.is_none() {
3005            self.check_flush().await?;
3006        }
3007        if start.elapsed().as_millis() > 100 {
3008            log::warn!("Slow insert_edge: {}ms", start.elapsed().as_millis());
3009        }
3010        Ok(())
3011    }
3012
3013    #[instrument(skip(self), level = "trace")]
3014    pub async fn delete_edge(
3015        &self,
3016        eid: Eid,
3017        src_vid: Vid,
3018        dst_vid: Vid,
3019        edge_type: u32,
3020        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
3021    ) -> Result<()> {
3022        let start = std::time::Instant::now();
3023        self.check_write_pressure().await?;
3024        self.check_transaction_memory(tx_l0)?;
3025        self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
3026        let l0 = self.resolve_l0(tx_l0);
3027
3028        l0.write().delete_edge(eid, src_vid, dst_vid, edge_type)?;
3029
3030        // Dual-write tombstone to AdjacencyManager overlay.
3031        if tx_l0.is_none() {
3032            let version = l0.read().current_version;
3033            self.adjacency_manager
3034                .add_tombstone(eid, src_vid, dst_vid, edge_type, version);
3035        }
3036        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
3037        self.update_metrics();
3038
3039        if tx_l0.is_none() {
3040            self.check_flush().await?;
3041        }
3042        if start.elapsed().as_millis() > 100 {
3043            log::warn!("Slow delete_edge: {}ms", start.elapsed().as_millis());
3044        }
3045        Ok(())
3046    }
3047
3048    /// Decide whether a flush should be triggered based on mutation count
3049    /// or elapsed time since the last flush.
3050    ///
3051    /// Extracted from [`Writer::check_flush`] so `commit_transaction_l0` can
3052    /// reuse the decision while bypassing the lock-acquiring entry point
3053    /// (it already holds `flush_lock`).
3054    fn should_flush(&self) -> bool {
3055        let count = self.l0_manager.get_current().read().mutation_count;
3056        if count == 0 {
3057            return false;
3058        }
3059        if count >= self.config.auto_flush_threshold {
3060            return true;
3061        }
3062        if let Some(interval) = self.config.auto_flush_interval
3063            && self.last_flush_time.lock().elapsed() >= interval
3064            && count >= self.config.auto_flush_min_mutations
3065        {
3066            return true;
3067        }
3068        false
3069    }
3070
3071    /// Check if flush should be triggered based on mutation count or time elapsed.
3072    /// This method is called after each write operation and can also be called
3073    /// by a background task for time-based flushing.
3074    pub async fn check_flush(&self) -> Result<()> {
3075        if self.should_flush() {
3076            self.flush_to_l1(None).await?;
3077        }
3078        Ok(())
3079    }
3080
3081    /// Process embeddings for a vertex using labels passed directly.
3082    /// Use this when labels haven't been stored to L0 yet.
3083    async fn process_embeddings_for_labels(
3084        &self,
3085        labels: &[String],
3086        properties: &mut Properties,
3087    ) -> Result<()> {
3088        let label_name = labels.first().map(|s| s.as_str());
3089        self.process_embeddings_impl(label_name, properties).await
3090    }
3091
3092    /// Phase B: if `defer_embeddings` is enabled in `UniConfig` and the
3093    /// vertex has an embedding config that hasn't been satisfied by the
3094    /// caller-provided properties, enqueue the VID in
3095    /// `L0Buffer::pending_embeddings` and return `true`. The caller then
3096    /// skips `process_embeddings_for_labels` and the embedding is computed
3097    /// in a single batched call at flush time via
3098    /// `drain_pending_embeddings`.
3099    ///
3100    /// Returns `false` (caller falls back to today's per-row eager embed)
3101    /// if any of:
3102    ///  - the flag is off,
3103    ///  - no label has an embedding config,
3104    ///  - the user already provided the target property (matches the
3105    ///    existing skip-if-present semantics at writer.rs:2727).
3106    ///
3107    /// Trade-off: when deferral is active, in-tx reads of the embedding
3108    /// column return only what was already in storage (or nothing for
3109    /// brand-new vertices). Existing tests that RETURN n.embedding in
3110    /// the same tx as a SET on the source column must run with the flag
3111    /// off; opt in only when no such reads happen between write and
3112    /// commit.
3113    fn try_defer_embedding(
3114        &self,
3115        labels: &[String],
3116        properties: &Properties,
3117        vid: Vid,
3118        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
3119    ) -> bool {
3120        if !self.config.defer_embeddings {
3121            return false;
3122        }
3123        let Some(label) = labels.first() else {
3124            return false;
3125        };
3126
3127        let schema = self.schema_manager.schema();
3128        let mut has_unsatisfied_cfg = false;
3129        for idx in &schema.indexes {
3130            if let IndexDefinition::Vector(v_cfg) = idx
3131                && v_cfg.label == *label
3132                && v_cfg.embedding_config.is_some()
3133                && !properties.contains_key(&v_cfg.property)
3134            {
3135                has_unsatisfied_cfg = true;
3136                break;
3137            }
3138        }
3139        if !has_unsatisfied_cfg {
3140            return false;
3141        }
3142
3143        let l0 = self.resolve_l0(tx_l0);
3144        let mut guard = l0.write();
3145        guard.pending_embeddings.insert(vid, label.clone());
3146        true
3147    }
3148
3149    /// Drain `pending_embeddings` from the rotated old-L0 right before
3150    /// `flush_stream_l1` reads it. Groups by label, issues one batched
3151    /// `process_embeddings_for_batch` call per label, and writes the
3152    /// resulting embedding vectors into each VID's `vertex_properties`
3153    /// map. After this returns, the flush proceeds against an L0 that
3154    /// looks no different from one whose embeddings were generated
3155    /// per-row at insert.
3156    ///
3157    /// Idempotent: a VID whose embedding was already materialized
3158    /// (e.g., by on-demand read paths in a future Phase B revision) is
3159    /// detected via `properties.contains_key(target_prop)` inside
3160    /// `process_embeddings_for_batch` (writer.rs:~2650), so re-running
3161    /// the drain is safe.
3162    async fn drain_pending_embeddings(&self, old_l0_arc: &Arc<RwLock<L0Buffer>>) -> Result<()> {
3163        let by_label: HashMap<String, Vec<Vid>> = {
3164            let guard = old_l0_arc.read();
3165            if guard.pending_embeddings.is_empty() {
3166                return Ok(());
3167            }
3168            let mut m: HashMap<String, Vec<Vid>> = HashMap::new();
3169            for (vid, label) in &guard.pending_embeddings {
3170                m.entry(label.clone()).or_default().push(*vid);
3171            }
3172            m
3173        };
3174
3175        for (label, vids) in by_label {
3176            let mut properties_batch: Vec<Properties> = {
3177                let guard = old_l0_arc.read();
3178                vids.iter()
3179                    .map(|vid| {
3180                        guard
3181                            .vertex_properties
3182                            .get(vid)
3183                            .cloned()
3184                            .unwrap_or_default()
3185                    })
3186                    .collect()
3187            };
3188
3189            self.process_embeddings_for_batch(std::slice::from_ref(&label), &mut properties_batch)
3190                .await?;
3191
3192            let mut guard = old_l0_arc.write();
3193            for (vid, props) in vids.iter().zip(properties_batch) {
3194                let target = guard.vertex_properties.entry(*vid).or_default();
3195                for (k, v) in props {
3196                    target.insert(k, v);
3197                }
3198                guard.pending_embeddings.remove(vid);
3199            }
3200        }
3201        Ok(())
3202    }
3203
3204    /// Process embeddings for a batch of vertices efficiently.
3205    ///
3206    /// Groups vertices by embedding config and makes batched API calls to the
3207    /// embedding service instead of calling once per vertex.
3208    ///
3209    /// # Performance
3210    /// For N vertices with embedding config:
3211    /// - Old approach: N API calls to embedding service
3212    /// - New approach: 1 API call per embedding config (usually 1 total)
3213    async fn process_embeddings_for_batch(
3214        &self,
3215        labels: &[String],
3216        properties_batch: &mut [Properties],
3217    ) -> Result<()> {
3218        let label_name = labels.first().map(|s| s.as_str());
3219        let schema = self.schema_manager.schema();
3220
3221        if let Some(label) = label_name {
3222            // Find vector indexes with embedding config for this label
3223            let mut configs = Vec::new();
3224            for idx in &schema.indexes {
3225                if let IndexDefinition::Vector(v_config) = idx
3226                    && v_config.label == label
3227                    && let Some(emb_config) = &v_config.embedding_config
3228                {
3229                    configs.push((v_config.property.clone(), emb_config.clone()));
3230                }
3231            }
3232
3233            if configs.is_empty() {
3234                return Ok(());
3235            }
3236
3237            for (target_prop, emb_config) in configs {
3238                // Collect input texts from all vertices that need embeddings
3239                let mut input_texts: Vec<String> = Vec::new();
3240                let mut needs_embedding: Vec<usize> = Vec::new();
3241
3242                for (idx, properties) in properties_batch.iter().enumerate() {
3243                    // Skip if target property already exists
3244                    if properties.contains_key(&target_prop) {
3245                        continue;
3246                    }
3247
3248                    // Check if source properties exist
3249                    let mut inputs = Vec::new();
3250                    for src_prop in &emb_config.source_properties {
3251                        if let Some(val) = properties.get(src_prop)
3252                            && let Some(s) = val.as_str()
3253                        {
3254                            inputs.push(s.to_string());
3255                        }
3256                    }
3257
3258                    if !inputs.is_empty() {
3259                        let input_text = inputs.join(" ");
3260                        let input_text = match &emb_config.document_prefix {
3261                            Some(prefix) => format!("{prefix}{input_text}"),
3262                            None => input_text,
3263                        };
3264                        input_texts.push(input_text);
3265                        needs_embedding.push(idx);
3266                    }
3267                }
3268
3269                if input_texts.is_empty() {
3270                    continue;
3271                }
3272
3273                let runtime = self.xervo_runtime.get().ok_or_else(|| {
3274                    anyhow!("Uni-Xervo runtime not configured for auto-embedding")
3275                })?;
3276                let embedder = runtime.embedding(&emb_config.alias).await?;
3277
3278                // Batch generate embeddings (single API call)
3279                let input_refs: Vec<&str> = input_texts.iter().map(|s| s.as_str()).collect();
3280                let embeddings = embedder.embed(input_refs).await?;
3281
3282                // Distribute results back to properties
3283                for (embedding_idx, &prop_idx) in needs_embedding.iter().enumerate() {
3284                    if let Some(vec) = embeddings.get(embedding_idx) {
3285                        let vals: Vec<Value> =
3286                            vec.iter().map(|f| Value::Float(*f as f64)).collect();
3287                        properties_batch[prop_idx].insert(target_prop.clone(), Value::List(vals));
3288                    }
3289                }
3290            }
3291        }
3292
3293        Ok(())
3294    }
3295
3296    async fn process_embeddings_impl(
3297        &self,
3298        label_name: Option<&str>,
3299        properties: &mut Properties,
3300    ) -> Result<()> {
3301        let schema = self.schema_manager.schema();
3302
3303        if let Some(label) = label_name {
3304            // Find vector indexes with embedding config for this label
3305            let mut configs = Vec::new();
3306            for idx in &schema.indexes {
3307                if let IndexDefinition::Vector(v_config) = idx
3308                    && v_config.label == label
3309                    && let Some(emb_config) = &v_config.embedding_config
3310                {
3311                    configs.push((v_config.property.clone(), emb_config.clone()));
3312                }
3313            }
3314
3315            if configs.is_empty() {
3316                log::info!("No embedding config found for label {}", label);
3317            }
3318
3319            for (target_prop, emb_config) in configs {
3320                // If target property already exists, skip (assume user provided it)
3321                if properties.contains_key(&target_prop) {
3322                    continue;
3323                }
3324
3325                // Check if source properties exist
3326                let mut inputs = Vec::new();
3327                for src_prop in &emb_config.source_properties {
3328                    if let Some(val) = properties.get(src_prop)
3329                        && let Some(s) = val.as_str()
3330                    {
3331                        inputs.push(s.to_string());
3332                    }
3333                }
3334
3335                if inputs.is_empty() {
3336                    continue;
3337                }
3338
3339                let input_text = inputs.join(" ");
3340                let input_text = match &emb_config.document_prefix {
3341                    Some(prefix) => format!("{prefix}{input_text}"),
3342                    None => input_text,
3343                };
3344
3345                let runtime = self.xervo_runtime.get().ok_or_else(|| {
3346                    anyhow!("Uni-Xervo runtime not configured for auto-embedding")
3347                })?;
3348                let embedder = runtime.embedding(&emb_config.alias).await?;
3349
3350                // Generate
3351                let embeddings = embedder.embed(vec![input_text.as_str()]).await?;
3352                if let Some(vec) = embeddings.first() {
3353                    // Store as array of floats
3354                    let vals: Vec<Value> = vec.iter().map(|f| Value::Float(*f as f64)).collect();
3355                    properties.insert(target_prop.clone(), Value::List(vals));
3356                }
3357            }
3358        }
3359        Ok(())
3360    }
3361
3362    /// Flushes the current in-memory L0 buffer to L1 storage.
3363    ///
3364    /// # Lock Ordering
3365    ///
3366    /// To prevent deadlocks, locks must be acquired in the following order:
3367    /// 1. `Writer` lock (held by caller via outer `Arc<RwLock<Writer>>`; removed in Phase 4)
3368    /// 2. `flush_lock` (acquired by this entry point; held across the whole flush)
3369    /// 3. `L0Manager` lock (via `begin_flush` / `get_current`)
3370    /// 4. `L0Buffer` lock (individual buffer RWLocks)
3371    /// 5. `Index` / `Storage` locks (during actual flush)
3372    ///
3373    /// Callers that already hold `flush_lock` (today only `commit_transaction_l0`)
3374    /// must call `flush_inline_under_lock` (private) directly to avoid a re-entrant
3375    /// `tokio::sync::Mutex` deadlock — see concurrent_writer.md §5.5.
3376    pub async fn flush_to_l1(&self, name: Option<String>) -> Result<String> {
3377        // Drain any in-flight async flushes first. `flush_to_l1` is a
3378        // SYNCHRONIZATION BARRIER — callers (test fixtures, fork
3379        // setup, shutdown paths) rely on it as "all writes are now
3380        // durably in Lance". Without the drain, an async stream from
3381        // a recent commit might still be writing to Lance when
3382        // `flush_to_l1` returns, leaving a window where forks branch
3383        // off pre-write Lance state and lose data.
3384        if let Some(coord) = self.flush_coordinator.as_ref() {
3385            let _ = coord.drain(self.config.drop_fork_drain_timeout).await;
3386        }
3387        let _flush_lock_guard = self.flush_lock.lock().await;
3388        self.flush_inline_under_lock(name).await
3389    }
3390
3391    /// Async-flush entry point: rotate under `flush_lock`, release the
3392    /// lock, then submit the stream phase to the [`FlushCoordinator`].
3393    /// Returns a [`FlushTicket`](crate::runtime::flush_coordinator::FlushTicket)
3394    /// that resolves when finalize completes.
3395    ///
3396    /// Errors if `config.async_flush_enabled = false` (the coordinator
3397    /// is `None` in that case — see `flush_coordinator` field doc).
3398    pub async fn flush_to_l1_async(
3399        self: &Arc<Self>,
3400        name: Option<String>,
3401    ) -> Result<crate::runtime::flush_coordinator::FlushTicket> {
3402        let coord = self
3403            .flush_coordinator
3404            .as_ref()
3405            .ok_or_else(|| anyhow!("async flush not enabled (config.async_flush_enabled=false)"))?
3406            .clone();
3407        // 1. Acquire permit FIRST (outside flush_lock) so we don't
3408        //    introduce a permit-while-holding-flush-lock convoy.
3409        let permit = coord.acquire_permit().await?;
3410        // 2. Rotate under flush_lock (µs work), then allocate the rotate seq
3411        //    and bump pending ONLY after the rotate succeeds (Bug #3). A failed
3412        //    rotate (the `?` below) must consume neither: the finalizer
3413        //    advances in strictly consecutive seq order and only decrements
3414        //    pending on finalize, so a leaked seq/pending would wedge it
3415        //    forever. The seq is allocated under `flush_lock`, immediately
3416        //    after the rotate and before the guard drops, so concurrent rotates
3417        //    keep seq order == rotation order, and the seq is unused until
3418        //    submit. On the `?` error path the permit drops, freeing the slot.
3419        let (
3420            RotateOutput {
3421                old_l0_arc,
3422                wal_lsn,
3423                current_version,
3424                flush_in_progress_guard,
3425            },
3426            seq,
3427        ) = {
3428            let _flush_lock_guard = self.flush_lock.lock().await;
3429            let rotate_out = self.flush_l0_rotate().await?;
3430            let seq = coord.next_rotate_seq();
3431            coord.note_pending();
3432            (rotate_out, seq)
3433        };
3434        // 3. Build the coordinator's RotatedFlush. parent_manifest is the
3435        //    cached_manifest snapshot at this moment.
3436        let parent_manifest = self.cached_manifest.lock().clone();
3437        let rotated = RotatedFlush {
3438            seq,
3439            old_l0_arc: old_l0_arc.clone(),
3440            wal_lsn,
3441            current_version,
3442            name: name.clone(),
3443            parent_manifest,
3444            permit,
3445            flush_in_progress_guard,
3446        };
3447        // 4. Spawn the stream phase via the coordinator. The closure
3448        //    captures Arc<Writer> transiently — drops when stream
3449        //    completes (bounded, ~50-500 ms).
3450        let writer = self.clone();
3451        let ticket = coord.submit_for_stream(rotated, move |old_l0, wal, ver, n| async move {
3452            let outcome = writer.flush_stream_l1(old_l0, wal, ver, n).await?;
3453            Ok(crate::runtime::flush_coordinator::FlushOutcome {
3454                new_manifest: outcome.manifest,
3455                snapshot_id: outcome.snapshot_id,
3456            })
3457        });
3458        Ok(ticket)
3459    }
3460
3461    /// Phase A+B+C of the flush: flush the WAL, rotate L0 (so the
3462    /// to-be-flushed buffer moves to `pending_flush` and a fresh L0 takes
3463    /// its place), and hand off the WAL to the new L0.
3464    ///
3465    /// Runs in microseconds. Must be called under `flush_lock` (the caller
3466    /// is responsible). The returned [`RotateOutput`] carries everything
3467    /// the subsequent stream + finalize phases need; in particular the
3468    /// [`FlushInProgressGuard`] is bound to the return value so it stays
3469    /// alive for the full flush lifetime — including any future async
3470    /// path where stream runs on a spawned task.
3471    async fn flush_l0_rotate(&self) -> Result<RotateOutput> {
3472        // Acquire the in-progress counter BEFORE any heavy work. The
3473        // guard lives on RotateOutput; dropping RotateOutput drops the
3474        // guard, so the counter goes back to zero exactly when the flush
3475        // is fully done.
3476        let flush_in_progress_guard = FlushInProgressGuard::new(&self.storage);
3477
3478        // A. Flush WAL BEFORE rotating L0. If WAL flush fails, the
3479        // current L0 is still active and mutations are retained in
3480        // memory until restart/retry.
3481        let wal_for_truncate = {
3482            let current_l0 = self.l0_manager.get_current();
3483            let l0_guard = current_l0.read();
3484            l0_guard.wal.clone()
3485        };
3486        // Test-only seam (no-op without the `failpoints` feature): inject a
3487        // WAL-flush failure here to drive the "failed async rotate wedges the
3488        // finalizer" regression (Bug #3). When configured to "return" it makes
3489        // `flush_l0_rotate` return Err exactly as a real WAL-flush failure would.
3490        fail::fail_point!("flush::rotate-fail", |_| {
3491            Err(anyhow!("flush::rotate-fail injected WAL-flush failure"))
3492        });
3493        let wal_lsn = if let Some(ref w) = wal_for_truncate {
3494            w.flush().await?
3495        } else {
3496            0
3497        };
3498
3499        // B. Begin flush: rotate L0 and keep old L0 visible to reads via
3500        // pending_flush until complete_flush is called by finalize.
3501        let old_l0_arc = self.l0_manager.begin_flush(0, None);
3502        metrics::counter!("uni_l0_buffer_rotations_total").increment(1);
3503
3504        // C. WAL handoff: record wal_lsn on old L0, transfer WAL handle
3505        // and current_version to the new L0.
3506        let current_version;
3507        {
3508            let mut old_l0_guard = old_l0_arc.write();
3509            current_version = old_l0_guard.current_version;
3510            old_l0_guard.wal_lsn_at_flush = wal_lsn;
3511            let wal = old_l0_guard.wal.take();
3512            let new_l0_arc = self.l0_manager.get_current();
3513            let mut new_l0_guard = new_l0_arc.write();
3514            new_l0_guard.wal = wal;
3515            new_l0_guard.current_version = current_version;
3516        }
3517
3518        Ok(RotateOutput {
3519            old_l0_arc,
3520            wal_lsn,
3521            current_version,
3522            flush_in_progress_guard,
3523        })
3524    }
3525
3526    /// Phases D, E, F, G of the flush: L1 collect, orphan resolve,
3527    /// manifest seed, Lance writes. Reads from `old_l0_arc` (kept in
3528    /// pending_flush by Phase B); writes append-only Lance datasets; does
3529    /// NOT call save_snapshot / set_latest_snapshot — those are
3530    /// finalize's job, so the manifest doesn't get published until the
3531    /// next phase.
3532    ///
3533    /// Today takes `&self`; in a follow-up commit this becomes a
3534    /// static `Send + 'static` function over `SharedFlushCtx` so it can
3535    /// run on a spawned task while concurrent commits proceed.
3536    async fn flush_stream_l1(
3537        &self,
3538        old_l0_arc: Arc<RwLock<L0Buffer>>,
3539        wal_lsn: u64,
3540        current_version: u64,
3541        name: Option<String>,
3542    ) -> Result<FlushOutcome> {
3543        // Test-only seam (no-op without the `failpoints` feature): the rotate
3544        // (begin_flush) already moved the to-be-flushed buffer onto
3545        // pending_flush and installed a fresh empty current buffer, but the
3546        // rotated rows are NOT yet durable in Lance. Pausing here holds that
3547        // window open to drive the unique-constraint-hole regression
3548        // (Bug #9 Mechanism A).
3549        fail::fail_point!("flush::after-rotate-before-lance");
3550
3551        // Phase B: materialize any deferred embeddings before column
3552        // extraction. No-op when `defer_embeddings` is off (the set will
3553        // be empty). On-demand reads of the embedding column are a TODO
3554        // for a future revision (see UniConfig::defer_embeddings docs).
3555        self.drain_pending_embeddings(&old_l0_arc).await?;
3556
3557        let schema = self.schema_manager.schema();
3558        // 2. Acquire Read lock on Old L0 for flushing
3559        let mut entries_by_type: HashMap<u32, Vec<L1Entry>> = HashMap::new();
3560        // (Vid, labels, properties, deleted, version)
3561        type VertexEntry = (Vid, Vec<String>, Properties, bool, u64);
3562        let mut vertices_by_label: HashMap<u16, Vec<VertexEntry>> = HashMap::new();
3563        // Partial-column updates (Lance MergeInsert path). Per-VID tuple:
3564        // (vid, full L0 properties map, version, set of keys to update).
3565        // Only the keys in the HashSet are emitted to the partial source;
3566        // the full props map is retained so the per-row column extractor
3567        // can read each touched key's value.
3568        type PartialEntry = (Vid, Properties, u64, std::collections::HashSet<String>);
3569        let mut partial_by_label: HashMap<u16, Vec<PartialEntry>> = HashMap::new();
3570        // DELETE-via-MergeInsert (Round-12 §B): tombstones flush as a
3571        // partial source with just `_vid`, `_deleted=true`, `_version`,
3572        // `_updated_at`. Skips the wide-row Append payload that adds
3573        // nothing on a soft-delete.
3574        let mut tombstones_by_label: HashMap<u16, Vec<(Vid, u64)>> = HashMap::new();
3575        let mut main_vertex_tombstones: Vec<(Vid, u64)> = Vec::new();
3576        // Collect vertex timestamps from L0 for flushing to storage
3577        let mut vertex_created_at: HashMap<Vid, i64> = HashMap::new();
3578        let mut vertex_updated_at: HashMap<Vid, i64> = HashMap::new();
3579        // Track tombstones missing labels for storage query fallback
3580        let mut orphaned_tombstones: Vec<(Vid, u64)> = Vec::new();
3581
3582        {
3583            let old_l0 = old_l0_arc.read();
3584
3585            // 1. Collect all edges and tombstones from L0
3586            for edge in old_l0.graph.edges() {
3587                let properties = old_l0
3588                    .edge_properties
3589                    .get(&edge.eid)
3590                    .cloned()
3591                    .unwrap_or_default();
3592                let version = old_l0.edge_versions.get(&edge.eid).copied().unwrap_or(0);
3593
3594                // Get timestamps from L0 buffer (populated during insert)
3595                let created_at = old_l0.edge_created_at.get(&edge.eid).copied();
3596                let updated_at = old_l0.edge_updated_at.get(&edge.eid).copied();
3597
3598                entries_by_type
3599                    .entry(edge.edge_type)
3600                    .or_default()
3601                    .push(L1Entry {
3602                        src_vid: edge.src_vid,
3603                        dst_vid: edge.dst_vid,
3604                        eid: edge.eid,
3605                        op: Op::Insert,
3606                        version,
3607                        properties,
3608                        created_at,
3609                        updated_at,
3610                    });
3611            }
3612
3613            // From tombstones
3614            for tombstone in old_l0.tombstones.values() {
3615                let version = old_l0
3616                    .edge_versions
3617                    .get(&tombstone.eid)
3618                    .copied()
3619                    .unwrap_or(0);
3620                // Get timestamps - for deletes, updated_at reflects deletion time
3621                let created_at = old_l0.edge_created_at.get(&tombstone.eid).copied();
3622                let updated_at = old_l0.edge_updated_at.get(&tombstone.eid).copied();
3623
3624                entries_by_type
3625                    .entry(tombstone.edge_type)
3626                    .or_default()
3627                    .push(L1Entry {
3628                        src_vid: tombstone.src_vid,
3629                        dst_vid: tombstone.dst_vid,
3630                        eid: tombstone.eid,
3631                        op: Op::Delete,
3632                        version,
3633                        properties: HashMap::new(),
3634                        created_at,
3635                        updated_at,
3636                    });
3637            }
3638
3639            // 1b. Collect vertices by label (using vertex_labels from L0)
3640            //
3641            // Helper: fan-out a single vertex entry into per-label buckets.
3642            // Each per-label table row carries the full label set so multi-label
3643            // info is preserved after flush.
3644            let push_vertex_to_labels =
3645                |vid: Vid,
3646                 all_labels: &[String],
3647                 props: Properties,
3648                 deleted: bool,
3649                 version: u64,
3650                 out: &mut HashMap<u16, Vec<VertexEntry>>| {
3651                    for label in all_labels {
3652                        if let Some(label_id) = schema.label_id_by_name(label) {
3653                            out.entry(label_id).or_default().push((
3654                                vid,
3655                                all_labels.to_vec(),
3656                                props.clone(),
3657                                deleted,
3658                                version,
3659                            ));
3660                        }
3661                    }
3662                };
3663
3664            for (vid, props) in &old_l0.vertex_properties {
3665                let version = old_l0.vertex_versions.get(vid).copied().unwrap_or(0);
3666                // Collect timestamps for this vertex
3667                if let Some(&ts) = old_l0.vertex_created_at.get(vid) {
3668                    vertex_created_at.insert(*vid, ts);
3669                }
3670                if let Some(&ts) = old_l0.vertex_updated_at.get(vid) {
3671                    vertex_updated_at.insert(*vid, ts);
3672                }
3673                if let Some(labels) = old_l0.vertex_labels.get(vid) {
3674                    // Partial-write routing: when this VID was last
3675                    // touched via `insert_vertex_partial` AND the
3676                    // partial_lance_writes flag is on, send only the
3677                    // touched columns to a MergeInsert batch. Otherwise
3678                    // (CREATE, MERGE-ON-CREATE, full-replace SET, DELETE
3679                    // — or flag off) use the existing full-row Append.
3680                    let is_partial = self.config.partial_lance_writes
3681                        && old_l0.vertex_partial_keys.contains_key(vid);
3682                    if is_partial {
3683                        if let Some(touched) = old_l0.vertex_partial_keys.get(vid) {
3684                            for label in labels {
3685                                if let Some(label_id) = schema.label_id_by_name(label) {
3686                                    partial_by_label.entry(label_id).or_default().push((
3687                                        *vid,
3688                                        props.clone(),
3689                                        version,
3690                                        touched.clone(),
3691                                    ));
3692                                }
3693                            }
3694                        }
3695                    } else {
3696                        push_vertex_to_labels(
3697                            *vid,
3698                            labels,
3699                            props.clone(),
3700                            false,
3701                            version,
3702                            &mut vertices_by_label,
3703                        );
3704                    }
3705                }
3706            }
3707            for &vid in &old_l0.vertex_tombstones {
3708                let version = old_l0.vertex_versions.get(&vid).copied().unwrap_or(0);
3709                if let Some(&ts) = old_l0.vertex_updated_at.get(&vid) {
3710                    vertex_updated_at.insert(vid, ts);
3711                }
3712                if let Some(labels) = old_l0.vertex_labels.get(&vid) {
3713                    // Round-12 §B: tombstones flush via Lance MergeInsert
3714                    // (just `_vid`, `_deleted=true`, `_version`,
3715                    // `_updated_at`) — skipping the wide-row Append.
3716                    // Unconditional (no `partial_lance_writes` gating);
3717                    // tombstone Append carries no useful payload.
3718                    for label in labels {
3719                        if let Some(label_id) = schema.label_id_by_name(label) {
3720                            tombstones_by_label
3721                                .entry(label_id)
3722                                .or_default()
3723                                .push((vid, version));
3724                        }
3725                    }
3726                } else {
3727                    // Tombstone missing labels (old WAL format) - collect for storage query fallback
3728                    orphaned_tombstones.push((vid, version));
3729                }
3730            }
3731        } // Drop read lock
3732
3733        // Resolve orphaned tombstones (missing labels) from storage
3734        if !orphaned_tombstones.is_empty() {
3735            tracing::warn!(
3736                count = orphaned_tombstones.len(),
3737                "Tombstones missing labels in L0, querying storage as fallback"
3738            );
3739            for (vid, version) in orphaned_tombstones {
3740                if let Ok(Some(labels)) = self.find_vertex_labels_in_storage(vid).await
3741                    && !labels.is_empty()
3742                {
3743                    for label in &labels {
3744                        if let Some(label_id) = schema.label_id_by_name(label) {
3745                            // Round-12 §B: route through partial tombstone too.
3746                            tombstones_by_label
3747                                .entry(label_id)
3748                                .or_default()
3749                                .push((vid, version));
3750                        }
3751                    }
3752                }
3753            }
3754        }
3755
3756        // 1. Load previous snapshot from cache, or fall back to storage.
3757        //
3758        // Use clone() not take(): for the async path, multiple
3759        // concurrent streams may run; if we take() here, a sibling
3760        // stream sees cached_manifest = None and seeds from
3761        // load_latest_snapshot (stale), losing the chain. clone()
3762        // preserves the parent. Finalize writes back the new manifest
3763        // unconditionally.
3764        let mut manifest = if let Some(cached) = self.cached_manifest.lock().clone() {
3765            cached
3766        } else {
3767            self.storage
3768                .snapshot_manager()
3769                .load_latest_snapshot()
3770                .await?
3771                .unwrap_or_else(|| {
3772                    SnapshotManifest::new(Uuid::new_v4().to_string(), schema.schema_version)
3773                })
3774        };
3775
3776        // Update snapshot metadata
3777        // Save parent snapshot ID before generating new one (for lineage tracking)
3778        let parent_id = manifest.snapshot_id.clone();
3779        manifest.parent_snapshot = Some(parent_id);
3780        manifest.snapshot_id = Uuid::new_v4().to_string();
3781        manifest.name = name;
3782        manifest.created_at = Utc::now();
3783        manifest.version_high_water_mark = current_version;
3784        manifest.wal_high_water_mark = wal_lsn;
3785        let snapshot_id = manifest.snapshot_id.clone();
3786
3787        tracing::Span::current().record("snapshot_id", &snapshot_id);
3788
3789        // 2. Write main unified tables FIRST (before deltas).
3790        //    Ensures the dual-write invariant: by the time an EID appears in a
3791        //    delta table, it already exists in main_edges. This prevents the
3792        //    compaction debug_assert from firing when compaction interleaves
3793        //    with flush at async yield points.
3794        //
3795        // 2.1 Main edges table
3796        let (main_edges, edge_created_at_map, edge_updated_at_map) = {
3797            let _old_l0 = old_l0_arc.read();
3798            let mut main_edges: Vec<(
3799                uni_common::core::id::Eid,
3800                Vid,
3801                Vid,
3802                String,
3803                Properties,
3804                bool,
3805                u64,
3806            )> = Vec::new();
3807            let mut edge_created_at_map: HashMap<uni_common::core::id::Eid, i64> = HashMap::new();
3808            let mut edge_updated_at_map: HashMap<uni_common::core::id::Eid, i64> = HashMap::new();
3809
3810            for (&edge_type_id, entries) in entries_by_type.iter() {
3811                for entry in entries {
3812                    let edge_type_name = self
3813                        .storage
3814                        .schema_manager()
3815                        .edge_type_name_by_id_unified(edge_type_id)
3816                        .unwrap_or_else(|| "unknown".to_string());
3817
3818                    let deleted = matches!(entry.op, Op::Delete);
3819                    main_edges.push((
3820                        entry.eid,
3821                        entry.src_vid,
3822                        entry.dst_vid,
3823                        edge_type_name,
3824                        entry.properties.clone(),
3825                        deleted,
3826                        entry.version,
3827                    ));
3828
3829                    if let Some(ts) = entry.created_at {
3830                        edge_created_at_map.insert(entry.eid, ts);
3831                    }
3832                    if let Some(ts) = entry.updated_at {
3833                        edge_updated_at_map.insert(entry.eid, ts);
3834                    }
3835                }
3836            }
3837
3838            (main_edges, edge_created_at_map, edge_updated_at_map)
3839        };
3840
3841        if !main_edges.is_empty() {
3842            let main_edge_batch = MainEdgeDataset::build_record_batch(
3843                &main_edges,
3844                Some(&edge_created_at_map),
3845                Some(&edge_updated_at_map),
3846            )?;
3847            MainEdgeDataset::write_batch(self.storage.backend(), main_edge_batch).await?;
3848            MainEdgeDataset::ensure_default_indexes(self.storage.backend()).await?;
3849        }
3850
3851        // 2.2 Main vertices table
3852        let main_vertices: Vec<(Vid, Vec<String>, Properties, bool, u64)> = {
3853            let old_l0 = old_l0_arc.read();
3854            let mut vertices = Vec::new();
3855
3856            // Live vertices: full-row Append on the main table (the
3857            // props_json blob is required for global ID lookups). For
3858            // partial-row VIDs (vertex_partial_keys non-empty), the
3859            // main table still needs the full props for the
3860            // ext_id-uniqueness path; we keep the Append here. The
3861            // per-label Lance write IS partial via MergeInsert.
3862            for (vid, props) in &old_l0.vertex_properties {
3863                let version = old_l0.vertex_versions.get(vid).copied().unwrap_or(0);
3864                let labels = old_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
3865                vertices.push((*vid, labels, props.clone(), false, version));
3866            }
3867
3868            // Tombstones: collected into `main_vertex_tombstones` for
3869            // the MergeInsert path below; skipping the wide-row Append.
3870            for &vid in &old_l0.vertex_tombstones {
3871                let version = old_l0.vertex_versions.get(&vid).copied().unwrap_or(0);
3872                main_vertex_tombstones.push((vid, version));
3873            }
3874
3875            vertices
3876        };
3877
3878        if !main_vertices.is_empty() {
3879            let main_vertex_batch = MainVertexDataset::build_record_batch(
3880                &main_vertices,
3881                Some(&vertex_created_at),
3882                Some(&vertex_updated_at),
3883            )?;
3884            MainVertexDataset::write_batch(self.storage.backend(), main_vertex_batch).await?;
3885        }
3886        // Round-12 §B: tombstones via MergeInsert on the main vertices
3887        // table. Independent of `vertex_properties` length.
3888        if !main_vertex_tombstones.is_empty() {
3889            let tomb_batch = MainVertexDataset::build_tombstone_partial_batch(
3890                &main_vertex_tombstones,
3891                Some(&vertex_updated_at),
3892            )?;
3893            MainVertexDataset::merge_insert_tombstone_batch(self.storage.backend(), tomb_batch)
3894                .await?;
3895        }
3896        if !main_vertices.is_empty() || !main_vertex_tombstones.is_empty() {
3897            MainVertexDataset::ensure_default_indexes(self.storage.backend()).await?;
3898        }
3899
3900        // 3. For each edge type, write FWD and BWD delta runs
3901        for (&edge_type_id, entries) in entries_by_type.iter() {
3902            // Get edge type name from unified lookup (handles both schema'd and schemaless)
3903            let edge_type_name = self
3904                .storage
3905                .schema_manager()
3906                .edge_type_name_by_id_unified(edge_type_id)
3907                .ok_or_else(|| anyhow!("Edge type ID {} not found", edge_type_id))?;
3908
3909            // FWD Run (sorted by src_vid)
3910            // Round-12 §A: split entries into full-row Append and
3911            // partial MergeInsert routes based on `edge_partial_keys`.
3912            // Edges in `edge_partial_keys` were last written via
3913            // `insert_edge_partial_full`; the per-edge-type delta
3914            // tables receive only the touched schema columns plus
3915            // (when any overflow key was touched) the regenerated
3916            // `overflow_json` blob. Untouched columns retain their
3917            // previous-version value via Lance MergeInsert.
3918            let partial_eids: std::collections::HashSet<Eid> = {
3919                let old_l0 = old_l0_arc.read();
3920                entries
3921                    .iter()
3922                    .filter(|e| {
3923                        self.config.partial_lance_writes
3924                            && old_l0.edge_partial_keys.contains_key(&e.eid)
3925                    })
3926                    .map(|e| e.eid)
3927                    .collect()
3928            };
3929            let touched_union_by_eid: HashMap<Eid, std::collections::HashSet<String>> = {
3930                let old_l0 = old_l0_arc.read();
3931                partial_eids
3932                    .iter()
3933                    .filter_map(|eid| old_l0.edge_partial_keys.get(eid).map(|s| (*eid, s.clone())))
3934                    .collect()
3935            };
3936            let (full_entries, partial_entries): (Vec<L1Entry>, Vec<L1Entry>) = entries
3937                .clone()
3938                .into_iter()
3939                .partition(|e| !partial_eids.contains(&e.eid));
3940
3941            let backend = self.storage.backend();
3942
3943            // FWD run (sorted by src_vid)
3944            let mut fwd_full = full_entries.clone();
3945            fwd_full.sort_by_key(|e| e.src_vid);
3946            let mut fwd_partial = partial_entries.clone();
3947            fwd_partial.sort_by_key(|e| e.src_vid);
3948            let fwd_ds = self.storage.delta_dataset(&edge_type_name, "fwd")?;
3949            if !fwd_full.is_empty() {
3950                let fwd_batch = fwd_ds.build_record_batch(&fwd_full, &schema)?;
3951                fwd_ds.write_run(backend, fwd_batch).await?;
3952            }
3953            if !fwd_partial.is_empty() {
3954                let touched_union: std::collections::HashSet<String> = fwd_partial
3955                    .iter()
3956                    .flat_map(|e| {
3957                        touched_union_by_eid
3958                            .get(&e.eid)
3959                            .cloned()
3960                            .unwrap_or_default()
3961                            .into_iter()
3962                    })
3963                    .collect();
3964                let fwd_partial_batch =
3965                    fwd_ds.build_partial_record_batch(&fwd_partial, &touched_union, &schema)?;
3966                fwd_ds
3967                    .merge_insert_partial_run(backend, fwd_partial_batch)
3968                    .await?;
3969            }
3970            fwd_ds.ensure_eid_index(backend).await?;
3971
3972            // BWD Run (sorted by dst_vid)
3973            let mut bwd_full = full_entries.clone();
3974            bwd_full.sort_by_key(|e| e.dst_vid);
3975            let mut bwd_partial = partial_entries.clone();
3976            bwd_partial.sort_by_key(|e| e.dst_vid);
3977            let bwd_ds = self.storage.delta_dataset(&edge_type_name, "bwd")?;
3978            if !bwd_full.is_empty() {
3979                let bwd_batch = bwd_ds.build_record_batch(&bwd_full, &schema)?;
3980                bwd_ds.write_run(backend, bwd_batch).await?;
3981            }
3982            if !bwd_partial.is_empty() {
3983                let touched_union: std::collections::HashSet<String> = bwd_partial
3984                    .iter()
3985                    .flat_map(|e| {
3986                        touched_union_by_eid
3987                            .get(&e.eid)
3988                            .cloned()
3989                            .unwrap_or_default()
3990                            .into_iter()
3991                    })
3992                    .collect();
3993                let bwd_partial_batch =
3994                    bwd_ds.build_partial_record_batch(&bwd_partial, &touched_union, &schema)?;
3995                bwd_ds
3996                    .merge_insert_partial_run(backend, bwd_partial_batch)
3997                    .await?;
3998            }
3999            bwd_ds.ensure_eid_index(backend).await?;
4000
4001            // Update Manifest
4002            let current_snap =
4003                manifest
4004                    .edges
4005                    .entry(edge_type_name.to_string())
4006                    .or_insert(EdgeSnapshot {
4007                        version: 0,
4008                        count: 0,
4009                        lance_version: 0,
4010                    });
4011            current_snap.version += 1;
4012            current_snap.count += entries.len() as u64;
4013            // LanceDB tables don't expose Lance version directly
4014            current_snap.lance_version = 0;
4015
4016            // Note: No CSR invalidation needed. AdjacencyManager's overlay
4017            // already has these edges via dual-write in insert_edge/delete_edge.
4018        }
4019
4020        // 4. Per-label vertex table writes
4021        // Iterate all labels that have either full-row OR partial-write
4022        // data pending. A label may appear in only one of the two maps
4023        // (e.g., all updates on this label were partial-only).
4024        let all_label_ids: std::collections::HashSet<u16> = vertices_by_label
4025            .keys()
4026            .chain(partial_by_label.keys())
4027            .chain(tombstones_by_label.keys())
4028            .copied()
4029            .collect();
4030        for label_id in all_label_ids {
4031            let vertices = vertices_by_label.remove(&label_id).unwrap_or_default();
4032            let label_name = schema
4033                .label_name_by_id(label_id)
4034                .ok_or_else(|| anyhow!("Label ID {} not found", label_id))?;
4035
4036            let ds = self.storage.vertex_dataset(label_name)?;
4037
4038            // Collect inverted index updates before consuming vertices
4039            // Maps: cfg.property -> (added, removed)
4040            type InvertedUpdateMap = HashMap<String, (HashMap<Vid, Vec<String>>, HashSet<Vid>)>;
4041            let mut inverted_updates: InvertedUpdateMap = HashMap::new();
4042
4043            for idx in &schema.indexes {
4044                if let IndexDefinition::Inverted(cfg) = idx
4045                    && cfg.label == label_name
4046                {
4047                    let mut added: HashMap<Vid, Vec<String>> = HashMap::new();
4048                    let mut removed: HashSet<Vid> = HashSet::new();
4049
4050                    for (vid, _labels, props, deleted, _version) in &vertices {
4051                        if *deleted {
4052                            removed.insert(*vid);
4053                        } else if let Some(prop_value) = props.get(&cfg.property) {
4054                            // Extract terms from the property value (List<String>)
4055                            if let Some(arr) = prop_value.as_array() {
4056                                let terms: Vec<String> = arr
4057                                    .iter()
4058                                    .filter_map(|v| v.as_str().map(ToString::to_string))
4059                                    .collect();
4060                                if !terms.is_empty() {
4061                                    added.insert(*vid, terms);
4062                                }
4063                            }
4064                        }
4065                    }
4066                    // Round-12 §B: tombstones no longer in `vertices`;
4067                    // pull them from `tombstones_by_label` for inverted
4068                    // index removal.
4069                    if let Some(tomb_rows) = tombstones_by_label.get(&label_id) {
4070                        for (vid, _) in tomb_rows {
4071                            removed.insert(*vid);
4072                        }
4073                    }
4074
4075                    if !added.is_empty() || !removed.is_empty() {
4076                        inverted_updates.insert(cfg.property.clone(), (added, removed));
4077                    }
4078                }
4079            }
4080
4081            let mut v_data = Vec::new();
4082            let mut d_data = Vec::new();
4083            let mut ver_data = Vec::new();
4084            for (vid, labels, props, deleted, version) in vertices {
4085                v_data.push((vid, labels, props));
4086                d_data.push(deleted);
4087                ver_data.push(version);
4088            }
4089
4090            let backend = self.storage.backend();
4091
4092            // Skip the full-row Append entirely if this label only has
4093            // partial-write rows pending.
4094            if !v_data.is_empty() {
4095                let batch = ds.build_record_batch_with_timestamps(
4096                    &v_data,
4097                    &d_data,
4098                    &ver_data,
4099                    &schema,
4100                    Some(&vertex_created_at),
4101                    Some(&vertex_updated_at),
4102                )?;
4103                ds.write_batch(backend, batch, &schema).await?;
4104            }
4105
4106            // Partial-column batch (Lance MergeInsert path). The flag
4107            // gates whether the routing classified any VIDs as partial;
4108            // outside the flag this collection is always empty so the
4109            // call below is a cheap no-op.
4110            if let Some(partial_rows) = partial_by_label.remove(&label_id)
4111                && !partial_rows.is_empty()
4112            {
4113                let touched_union: std::collections::HashSet<String> = partial_rows
4114                    .iter()
4115                    .flat_map(|(_, _, _, keys)| keys.iter().cloned())
4116                    .collect();
4117                let pairs: Vec<(Vid, Properties)> = partial_rows
4118                    .iter()
4119                    .map(|(vid, props, _, _)| (*vid, props.clone()))
4120                    .collect();
4121                let versions: Vec<u64> = partial_rows.iter().map(|(_, _, v, _)| *v).collect();
4122                let partial_batch = ds.build_partial_record_batch(
4123                    &pairs,
4124                    &versions,
4125                    &touched_union,
4126                    &schema,
4127                    Some(&vertex_updated_at),
4128                )?;
4129                if partial_batch.num_rows() > 0 {
4130                    ds.merge_insert_batch(backend, partial_batch).await?;
4131                }
4132            }
4133
4134            // Tombstone batch (Round-12 §B): always MergeInsert with
4135            // just `_vid`, `_deleted=true`, `_version`, `_updated_at`.
4136            // No partial_lance_writes gating — tombstones never carry
4137            // useful property payload to write. Captured tombstone vids
4138            // also drive `remove_from_vid_labels_index` below.
4139            let tombstone_rows = tombstones_by_label.remove(&label_id).unwrap_or_default();
4140            if !tombstone_rows.is_empty() {
4141                let tomb_batch =
4142                    ds.build_tombstone_partial_batch(&tombstone_rows, Some(&vertex_updated_at))?;
4143                if tomb_batch.num_rows() > 0 {
4144                    ds.merge_insert_batch(backend, tomb_batch).await?;
4145                }
4146            }
4147
4148            ds.ensure_default_indexes(backend).await?;
4149
4150            // Update VidLabelsIndex (if enabled). v_data carries live
4151            // vertices; tombstone removals come from the
4152            // `tombstone_rows` captured above the MergeInsert call.
4153            for ((vid, labels, _props), &deleted) in v_data.iter().zip(d_data.iter()) {
4154                if deleted {
4155                    self.storage.remove_from_vid_labels_index(*vid);
4156                } else {
4157                    self.storage.update_vid_labels_index(*vid, labels.clone());
4158                }
4159            }
4160            for (vid, _) in &tombstone_rows {
4161                self.storage.remove_from_vid_labels_index(*vid);
4162            }
4163
4164            // Update Manifest
4165            let current_snap =
4166                manifest
4167                    .vertices
4168                    .entry(label_name.to_string())
4169                    .or_insert(LabelSnapshot {
4170                        version: 0,
4171                        count: 0,
4172                        lance_version: 0,
4173                    });
4174            current_snap.version += 1;
4175            current_snap.count += v_data.len() as u64;
4176            // LanceDB tables don't expose Lance version directly
4177            current_snap.lance_version = 0;
4178
4179            // Invalidate table cache to ensure next read picks up new version
4180            self.storage.invalidate_table_cache(label_name);
4181
4182            // Apply inverted index updates incrementally
4183            #[cfg(feature = "lance-backend")]
4184            for idx in &schema.indexes {
4185                if let IndexDefinition::Inverted(cfg) = idx
4186                    && cfg.label == label_name
4187                    && let Some((added, removed)) = inverted_updates.get(&cfg.property)
4188                {
4189                    self.storage
4190                        .index_manager()
4191                        .update_inverted_index_incremental(cfg, added, removed)
4192                        .await?;
4193                }
4194            }
4195
4196            // Update UID index with new vertex mappings
4197            // Collect (UniId, Vid) mappings from non-deleted vertices
4198            #[cfg(feature = "lance-backend")]
4199            {
4200                let mut uid_mappings: Vec<(uni_common::core::id::UniId, Vid)> = Vec::new();
4201                for (vid, _labels, props) in &v_data {
4202                    let ext_id = props.get("ext_id").and_then(|v| v.as_str());
4203                    let uid = crate::storage::vertex::VertexDataset::compute_vertex_uid(
4204                        label_name, ext_id, props,
4205                    );
4206                    uid_mappings.push((uid, *vid));
4207                }
4208
4209                if !uid_mappings.is_empty()
4210                    && let Ok(uid_index) = self.storage.uid_index(label_name)
4211                {
4212                    uid_index.write_mapping(&uid_mappings).await?;
4213                }
4214            }
4215        }
4216        Ok(FlushOutcome {
4217            manifest,
4218            snapshot_id,
4219        })
4220    }
4221
4222    /// Composition entry that assumes the caller already holds `flush_lock`.
4223    /// Runs rotate + stream + finalize_locked in sequence. Used by
4224    /// [`Writer::flush_to_l1`] (acquires the lock first) and by
4225    /// `commit_transaction_l0`'s post-merge auto-flush branch (which already
4226    /// holds the lock from the commit critical section).
4227    #[instrument(
4228        skip(self),
4229        fields(snapshot_id, mutations_count, size_bytes),
4230        level = "info"
4231    )]
4232    async fn flush_inline_under_lock(&self, name: Option<String>) -> Result<String> {
4233        let start = std::time::Instant::now();
4234
4235        let (initial_size, initial_count) = {
4236            let l0_arc = self.l0_manager.get_current();
4237            let l0 = l0_arc.read();
4238            (l0.estimated_size, l0.mutation_count)
4239        };
4240        tracing::Span::current().record("size_bytes", initial_size);
4241        tracing::Span::current().record("mutations_count", initial_count);
4242
4243        debug!("Starting L0 flush to L1");
4244
4245        // Phases A (WAL pre-flush), B (rotate), C (WAL handoff).
4246        // FlushInProgressGuard lives on RotateOutput and stays alive for
4247        // the full flush — including the finalize_locked call below.
4248        let RotateOutput {
4249            old_l0_arc,
4250            wal_lsn,
4251            current_version,
4252            flush_in_progress_guard: _flush_guard,
4253        } = self.flush_l0_rotate().await?;
4254
4255        // Phases D (L1 collect), E (orphan resolve), F (manifest seed),
4256        // G (Lance writes). Builds the manifest but does NOT publish it.
4257        let FlushOutcome {
4258            manifest,
4259            snapshot_id,
4260        } = self
4261            .flush_stream_l1(old_l0_arc.clone(), wal_lsn, current_version, name)
4262            .await?;
4263
4264        // Phases H..S: publish manifest, complete_flush, WAL truncate,
4265        // property cache clear, last_flush_time, metrics, l1_runs++,
4266        // compaction trigger, index-rebuild scheduling, fork tick.
4267        self.flush_finalize_locked(
4268            old_l0_arc,
4269            wal_lsn,
4270            manifest,
4271            snapshot_id,
4272            initial_size,
4273            initial_count,
4274            start,
4275        )
4276        .await
4277    }
4278
4279    /// Phases H..S of the flush: publish the manifest and run all
4280    /// post-publish bookkeeping. Assumes the caller already holds
4281    /// `flush_lock` — see [`Writer::flush_finalize_now`] for the
4282    /// lock-acquiring variant used by the async finalize path.
4283    #[allow(clippy::too_many_arguments)]
4284    async fn flush_finalize_locked(
4285        &self,
4286        old_l0_arc: Arc<RwLock<L0Buffer>>,
4287        wal_lsn: u64,
4288        manifest: SnapshotManifest,
4289        snapshot_id: String,
4290        initial_size: usize,
4291        initial_count: usize,
4292        start: std::time::Instant,
4293    ) -> Result<String> {
4294        Self::flush_finalize_body(
4295            &self.shared_ctx(),
4296            old_l0_arc,
4297            wal_lsn,
4298            manifest,
4299            snapshot_id,
4300            initial_size,
4301            initial_count,
4302            start,
4303        )
4304        .await
4305    }
4306
4307    /// Phases H..S of the flush, lock-acquiring variant. Used by the
4308    /// async-flush finalizer task (running on a spawned tokio task),
4309    /// which holds neither `&self` nor `flush_lock`. Briefly re-acquires
4310    /// `flush_lock` to serialize the publish boundary, then runs the
4311    /// same body as `flush_finalize_locked` but over a SharedFlushCtx.
4312    #[allow(clippy::too_many_arguments)]
4313    pub(crate) async fn flush_finalize_now(
4314        shared: SharedFlushCtx,
4315        old_l0_arc: Arc<RwLock<L0Buffer>>,
4316        wal_lsn: u64,
4317        manifest: SnapshotManifest,
4318        snapshot_id: String,
4319        initial_size: usize,
4320        initial_count: usize,
4321        start: std::time::Instant,
4322    ) -> Result<String> {
4323        let _flush_lock_guard = shared.flush_lock.clone().lock_owned().await;
4324        Self::flush_finalize_body(
4325            &shared,
4326            old_l0_arc,
4327            wal_lsn,
4328            manifest,
4329            snapshot_id,
4330            initial_size,
4331            initial_count,
4332            start,
4333        )
4334        .await
4335    }
4336
4337    /// Shared body of `flush_finalize_locked` and `flush_finalize_now`.
4338    /// Static over `SharedFlushCtx`; the caller is responsible for
4339    /// holding `flush_lock`.
4340    #[allow(clippy::too_many_arguments)]
4341    async fn flush_finalize_body(
4342        shared: &SharedFlushCtx,
4343        old_l0_arc: Arc<RwLock<L0Buffer>>,
4344        wal_lsn: u64,
4345        mut manifest: SnapshotManifest,
4346        snapshot_id: String,
4347        initial_size: usize,
4348        initial_count: usize,
4349        start: std::time::Instant,
4350    ) -> Result<String> {
4351        // Parent-snapshot fixup. The stream phase built `manifest` with
4352        // parent_snapshot set from cached_manifest at stream time. If
4353        // OTHER flushes (sync or async) have finalized since then,
4354        // cached_manifest has advanced. Re-link this manifest to the
4355        // current cached chain so we don't orphan their data when we
4356        // overwrite cached_manifest below.
4357        let current_parent_id = shared
4358            .cached_manifest
4359            .lock()
4360            .as_ref()
4361            .map(|m| m.snapshot_id.clone());
4362        if current_parent_id.is_some() && manifest.parent_snapshot != current_parent_id {
4363            manifest.parent_snapshot = current_parent_id;
4364            metrics::counter!("uni_flush_parent_chain_fixups_total").increment(1);
4365        }
4366
4367        // H. Publish manifest (body first, then pointer — recovery is
4368        // idempotent if we crash between the two).
4369        shared
4370            .storage
4371            .snapshot_manager()
4372            .save_snapshot(&manifest)
4373            .await?;
4374        shared
4375            .storage
4376            .snapshot_manager()
4377            .set_latest_snapshot(&manifest.snapshot_id)
4378            .await?;
4379
4380        // I. Cache manifest for next flush to avoid re-reading from object store.
4381        *shared.cached_manifest.lock() = Some(manifest.clone());
4382
4383        // L. Invalidate the property cache BEFORE removing the flushed buffer
4384        // from the L0 chain (Bug #10). `clear_cache` has no dependency on the
4385        // complete_flush (J) / WAL-truncate (K) steps below, so clearing it
4386        // first closes the non-monotonic-read window: once the buffer leaves
4387        // the L0 chain at J a freshly-written value would otherwise miss the
4388        // chain and fall through to a stale cache entry. By the time finalize
4389        // runs the streamed rows are already durable in L1, so a post-clear
4390        // read falls through to fresh storage instead. The finalizer holds
4391        // `flush_lock` throughout, so reordering L ahead of J is safe.
4392        if let Some(ref pm) = shared.property_manager {
4393            pm.clear_cache().await;
4394        }
4395
4396        // J. Complete flush: remove old L0 from pending_flush. MUST happen
4397        // BEFORE WAL truncation so min_pending_wal_lsn is accurate.
4398        shared.l0_manager.complete_flush(&old_l0_arc);
4399
4400        // Test-only seam (no-op without the `failpoints` feature): pause AFTER
4401        // complete_flush removed the buffer from the L0 chain (J) but BEFORE
4402        // WAL truncation (K). The property cache is already cleared (L moved
4403        // ahead of J above), so a read in this window falls through to fresh
4404        // L1 storage rather than a stale cache entry (Bug #10 — non-monotonic
4405        // read after flush finalize).
4406        fail::fail_point!("flush::after-complete-before-cache-clear");
4407
4408        // K. Truncate WAL up to the safe LSN.
4409        let wal_handle = shared.l0_manager.get_current().read().wal.clone();
4410        if let Some(w) = wal_handle {
4411            let safe_lsn = shared
4412                .l0_manager
4413                .min_pending_wal_lsn()
4414                .map(|min_pending| min_pending.min(wal_lsn))
4415                .unwrap_or(wal_lsn);
4416            w.truncate_before(safe_lsn).await?;
4417        }
4418
4419        // M. Reset last flush time for time-based auto-flush.
4420        *shared.last_flush_time.lock() = std::time::Instant::now();
4421
4422        info!(
4423            snapshot_id,
4424            mutations_count = initial_count,
4425            size_bytes = initial_size,
4426            "L0 flush to L1 completed successfully"
4427        );
4428        metrics::histogram!("uni_flush_duration_seconds").record(start.elapsed().as_secs_f64());
4429        metrics::counter!("uni_flush_bytes_total").increment(initial_size as u64);
4430        metrics::counter!("uni_flush_rows_total").increment(initial_count as u64);
4431
4432        // P. Increment flush generation counter for write throttling.
4433        {
4434            let mut status = uni_common::sync::acquire_mutex(
4435                &shared.storage.compaction_status,
4436                "compaction_status",
4437            )?;
4438            status.l1_runs += 1;
4439        }
4440
4441        // Q. Trigger CSR compaction if enough frozen segments have accumulated.
4442        let am = shared.adjacency_manager.clone();
4443        if am.should_compact(shared.compaction_config.frozen_segments_compact_threshold) {
4444            let previous_still_running = {
4445                let guard = shared.compaction_handle.read();
4446                guard.as_ref().is_some_and(|h| !h.is_finished())
4447            };
4448            if previous_still_running {
4449                info!("Skipping compaction: previous compaction still in progress");
4450            } else {
4451                let handle = tokio::spawn(async move {
4452                    am.compact();
4453                });
4454                *shared.compaction_handle.write() = Some(handle);
4455            }
4456        }
4457
4458        // R. Post-flush: check if any indexes need rebuilding based on thresholds.
4459        if shared.auto_rebuild_enabled
4460            && let Some(rebuild_mgr) = shared.index_rebuild_manager.get()
4461        {
4462            Self::schedule_index_rebuilds_if_needed_static(
4463                &manifest,
4464                rebuild_mgr.clone(),
4465                shared.schema_manager.clone(),
4466                shared.index_rebuild_config.clone(),
4467            );
4468        }
4469
4470        // S. Emit fork-fragment observability after a successful forked flush.
4471        Self::tick_fork_fragment_observability_static(
4472            shared.fork_id,
4473            shared.fork_flush_count.clone(),
4474            shared.fork_fragment_warn_fired.clone(),
4475            shared.fork_fragment_warn_threshold,
4476        );
4477
4478        Ok(snapshot_id)
4479    }
4480
4481    /// Increment fork-flush bookkeeping and fire the fragment warn
4482    /// once if the threshold is crossed.
4483    ///
4484    /// Each flush typically appends ~1 fragment per touched dataset on
4485    /// the fork's branches; without compaction (deferred to Phase 5)
4486    /// long-lived heavy-write forks degrade. The flush count is a
4487    /// proxy for actual fragment growth — reading
4488    /// `Dataset::manifest().fragments.len()` per dataset would add a
4489    /// per-flush object-store roundtrip on the hot commit path, which
4490    /// is too costly for a purely observational guard rail.
4491    ///
4492    /// No-op for primary writers (`fork_id == None`).
4493    #[allow(dead_code)] // called by tests; production path uses _static
4494    pub(crate) fn tick_fork_fragment_observability(&self) {
4495        Self::tick_fork_fragment_observability_static(
4496            self.fork_id,
4497            self.fork_flush_count.clone(),
4498            self.fork_fragment_warn_fired.clone(),
4499            self.config.fork_fragment_warn_threshold,
4500        );
4501    }
4502
4503    /// Static variant of [`Writer::tick_fork_fragment_observability`].
4504    /// Used by the async-flush finalize path, where we hold a
4505    /// [`SharedFlushCtx`] bundle of Arcs rather than `&Writer`.
4506    pub(crate) fn tick_fork_fragment_observability_static(
4507        fork_id: Option<ForkId>,
4508        fork_flush_count: Arc<AtomicU64>,
4509        fork_fragment_warn_fired: Arc<AtomicBool>,
4510        warn_threshold: usize,
4511    ) {
4512        let Some(fork_id) = fork_id else { return };
4513        // `Relaxed` is sufficient: observational counter, no synchronizes-with.
4514        let new_count = fork_flush_count.fetch_add(1, Ordering::Relaxed) + 1;
4515        let fork_label = fork_id.to_string();
4516        metrics::gauge!(
4517            "uni_fork_l1_flushes",
4518            "fork" => fork_label.clone(),
4519        )
4520        .set(new_count as f64);
4521        let threshold = warn_threshold as u64;
4522        if !fork_fragment_warn_fired.load(Ordering::Relaxed)
4523            && threshold > 0
4524            && new_count >= threshold
4525        {
4526            fork_fragment_warn_fired.store(true, Ordering::Relaxed);
4527            tracing::warn!(
4528                fork = %fork_label,
4529                flush_count = new_count,
4530                threshold,
4531                "fork has exceeded the L1 flush-count threshold; \
4532                 fork compaction is deferred to Phase 5 — consider \
4533                 drop+recreate or promotion to bound fragment growth"
4534            );
4535        }
4536    }
4537
4538    /// Check rebuild thresholds and schedule background index rebuilds for
4539    /// labels that exceed growth or age limits. Marks affected indexes as
4540    /// `Stale` and spawns an async task to schedule the rebuild.
4541    #[allow(dead_code)] // production path uses _static; kept as the
4542    // documented instance entry point.
4543    fn schedule_index_rebuilds_if_needed(
4544        &self,
4545        manifest: &SnapshotManifest,
4546        rebuild_mgr: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
4547    ) {
4548        Self::schedule_index_rebuilds_if_needed_static(
4549            manifest,
4550            rebuild_mgr,
4551            self.schema_manager.clone(),
4552            self.config.index_rebuild.clone(),
4553        );
4554    }
4555
4556    /// Static variant of [`Writer::schedule_index_rebuilds_if_needed`].
4557    /// Used by the async-flush finalize path, where we hold the
4558    /// [`SchemaManager`] via `SharedFlushCtx` rather than `&Writer`.
4559    pub(crate) fn schedule_index_rebuilds_if_needed_static(
4560        manifest: &SnapshotManifest,
4561        rebuild_mgr: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
4562        schema_manager: Arc<uni_common::core::schema::SchemaManager>,
4563        index_rebuild_config: uni_common::config::IndexRebuildConfig,
4564    ) {
4565        let checker =
4566            crate::storage::index_rebuild::RebuildTriggerChecker::new(index_rebuild_config);
4567        let schema = schema_manager.schema();
4568        let labels = checker.labels_needing_rebuild(manifest, &schema.indexes);
4569
4570        if labels.is_empty() {
4571            return;
4572        }
4573
4574        // Mark affected indexes as Stale
4575        for label in &labels {
4576            for idx in &schema.indexes {
4577                if idx.label() == label {
4578                    let _ = schema_manager.update_index_metadata(idx.name(), |m| {
4579                        m.status = uni_common::core::schema::IndexStatus::Stale;
4580                    });
4581                }
4582            }
4583        }
4584
4585        tokio::spawn(async move {
4586            if let Err(e) = rebuild_mgr.schedule(labels).await {
4587                tracing::warn!("Failed to schedule index rebuild: {e}");
4588            }
4589        });
4590    }
4591}
4592
4593/// `FinalizeFn` implementation that the `FlushCoordinator` invokes from
4594/// its single-task finalizer loop. Unit struct on purpose: it must NOT
4595/// hold `Arc<Writer>` (that would create a reference cycle Writer ->
4596/// FlushCoordinator -> Arc<dyn FinalizeFn> -> Writer). All state needed
4597/// for finalize travels in via `SharedFlushCtx`.
4598pub(crate) struct WriterFinalizer;
4599
4600impl FinalizeFn for WriterFinalizer {
4601    fn finalize<'a>(
4602        &'a self,
4603        rotated: RotatedFlush,
4604        outcome: AsyncFlushOutcome,
4605        shared: SharedFlushCtx,
4606    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<String>> + Send + 'a>> {
4607        Box::pin(async move {
4608            // Read initial_size / initial_count from the rotated L0 so
4609            // we don't have to plumb them through the coordinator
4610            // submission. The buffer is still alive in pending_flush
4611            // until `complete_flush` (J) below pops it.
4612            let (initial_size, initial_count) = {
4613                let l0 = rotated.old_l0_arc.read();
4614                (l0.estimated_size, l0.mutation_count)
4615            };
4616            let result = Writer::flush_finalize_now(
4617                shared,
4618                rotated.old_l0_arc.clone(),
4619                rotated.wal_lsn,
4620                outcome.new_manifest,
4621                outcome.snapshot_id,
4622                initial_size,
4623                initial_count,
4624                std::time::Instant::now(),
4625            )
4626            .await;
4627            // `rotated` (permit + flush_in_progress_guard) drops here.
4628            drop(rotated.permit);
4629            result
4630        })
4631    }
4632
4633    fn finalize_failure<'a>(
4634        &'a self,
4635        rotated: RotatedFlush,
4636        err: anyhow::Error,
4637        _shared: SharedFlushCtx,
4638    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Error> + Send + 'a>> {
4639        Box::pin(async move {
4640            tracing::warn!(
4641                error = %err,
4642                seq = rotated.seq,
4643                "async flush stream failed; old L0 remains in pending_flush, \
4644                 WAL retains its data, recovery via WAL replay on restart"
4645            );
4646            metrics::counter!("uni_flush_failures_total").increment(1);
4647            // Permit + guard drop here so back-pressure releases even on
4648            // failure.
4649            drop(rotated.permit);
4650            err
4651        })
4652    }
4653}
4654
4655#[cfg(test)]
4656mod tests {
4657    use super::*;
4658    use tempfile::tempdir;
4659
4660    /// Test that commit_transaction writes mutations to WAL before merging to main L0.
4661    /// This verifies fix for issue #137 (transaction commit atomicity).
4662    #[tokio::test]
4663    async fn test_commit_transaction_wal_before_merge() -> Result<()> {
4664        use crate::runtime::wal::WriteAheadLog;
4665        use crate::storage::manager::StorageManager;
4666        use object_store::local::LocalFileSystem;
4667        use object_store::path::Path as ObjectStorePath;
4668        use uni_common::core::schema::SchemaManager;
4669
4670        let dir = tempdir()?;
4671        let path = dir.path().to_str().unwrap();
4672        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
4673        let schema_path = ObjectStorePath::from("schema.json");
4674
4675        let schema_manager =
4676            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
4677        let _label_id = schema_manager.add_label("Test")?;
4678        schema_manager.save().await?;
4679
4680        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
4681
4682        // Create WAL for main L0
4683        let wal_path = ObjectStorePath::from("wal");
4684        let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
4685
4686        let writer = Writer::new_with_config(
4687            storage.clone(),
4688            schema_manager.clone(),
4689            1,
4690            UniConfig::default(),
4691            Some(wal),
4692            None,
4693        )
4694        .await?;
4695
4696        // Begin transaction — create a transaction L0
4697        let tx_l0 = writer.create_transaction_l0();
4698
4699        // Insert data in transaction
4700        let vid_a = writer.next_vid().await?;
4701        let vid_b = writer.next_vid().await?;
4702
4703        let mut props = std::collections::HashMap::new();
4704        props.insert("test".to_string(), Value::String("data".to_string()));
4705
4706        writer
4707            .insert_vertex_with_labels(vid_a, props.clone(), &["Test".to_string()], Some(&tx_l0))
4708            .await?;
4709        writer
4710            .insert_vertex_with_labels(
4711                vid_b,
4712                std::collections::HashMap::new(),
4713                &["Test".to_string()],
4714                Some(&tx_l0),
4715            )
4716            .await?;
4717
4718        let eid = writer.next_eid(1).await?;
4719        writer
4720            .insert_edge(
4721                vid_a,
4722                vid_b,
4723                1,
4724                eid,
4725                std::collections::HashMap::new(),
4726                None,
4727                Some(&tx_l0),
4728            )
4729            .await?;
4730
4731        // Get WAL before commit
4732        let l0 = writer.l0_manager.get_current();
4733        let wal = l0.read().wal.clone().expect("Main L0 should have WAL");
4734        let mutations_before = wal.replay().await?;
4735        let count_before = mutations_before.len();
4736
4737        // Commit transaction - this should write to WAL first
4738        let writer = Arc::new(writer);
4739        writer.commit_transaction_l0(tx_l0).await?;
4740
4741        // Verify WAL has the new mutations
4742        let mutations_after = wal.replay().await?;
4743        assert!(
4744            mutations_after.len() > count_before,
4745            "WAL should contain transaction mutations after commit"
4746        );
4747
4748        // Verify mutations are in correct order: vertices first, then edges
4749        let new_mutations: Vec<_> = mutations_after.into_iter().skip(count_before).collect();
4750
4751        let mut saw_vertex_a = false;
4752        let mut saw_vertex_b = false;
4753        let mut saw_edge = false;
4754
4755        for mutation in &new_mutations {
4756            match mutation {
4757                crate::runtime::wal::Mutation::InsertVertex { vid, .. } => {
4758                    if *vid == vid_a {
4759                        saw_vertex_a = true;
4760                    }
4761                    if *vid == vid_b {
4762                        saw_vertex_b = true;
4763                    }
4764                    // Vertices should come before edges
4765                    assert!(!saw_edge, "Vertices should be logged to WAL before edges");
4766                }
4767                crate::runtime::wal::Mutation::InsertEdge { eid: e, .. } => {
4768                    if *e == eid {
4769                        saw_edge = true;
4770                    }
4771                    // Edges should come after vertices
4772                    assert!(
4773                        saw_vertex_a && saw_vertex_b,
4774                        "Edge should be logged after both vertices"
4775                    );
4776                }
4777                _ => {}
4778            }
4779        }
4780
4781        assert!(saw_vertex_a, "Vertex A should be in WAL");
4782        assert!(saw_vertex_b, "Vertex B should be in WAL");
4783        assert!(saw_edge, "Edge should be in WAL");
4784
4785        // Verify data is also in main L0
4786        let l0_read = l0.read();
4787        assert!(
4788            l0_read.vertex_properties.contains_key(&vid_a),
4789            "Vertex A should be in main L0"
4790        );
4791        assert!(
4792            l0_read.vertex_properties.contains_key(&vid_b),
4793            "Vertex B should be in main L0"
4794        );
4795        assert!(
4796            l0_read.edge_endpoints.contains_key(&eid),
4797            "Edge should be in main L0"
4798        );
4799
4800        Ok(())
4801    }
4802
4803    /// Test that failed WAL flush leaves transaction intact for retry or rollback.
4804    #[tokio::test]
4805    async fn test_commit_transaction_wal_failure_rollback() -> Result<()> {
4806        use crate::runtime::wal::WriteAheadLog;
4807        use crate::storage::manager::StorageManager;
4808        use object_store::local::LocalFileSystem;
4809        use object_store::path::Path as ObjectStorePath;
4810        use uni_common::core::schema::SchemaManager;
4811
4812        let dir = tempdir()?;
4813        let path = dir.path().to_str().unwrap();
4814        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
4815        let schema_path = ObjectStorePath::from("schema.json");
4816
4817        let schema_manager =
4818            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
4819        let _label_id = schema_manager.add_label("Test")?;
4820        let _baseline_label_id = schema_manager.add_label("Baseline")?;
4821        let _txdata_label_id = schema_manager.add_label("TxData")?;
4822        schema_manager.save().await?;
4823
4824        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
4825
4826        // Create WAL for main L0
4827        let wal_path = ObjectStorePath::from("wal");
4828        let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
4829
4830        let writer = Writer::new_with_config(
4831            storage.clone(),
4832            schema_manager.clone(),
4833            1,
4834            UniConfig::default(),
4835            Some(wal),
4836            None,
4837        )
4838        .await?;
4839
4840        // Insert baseline data (outside transaction)
4841        let baseline_vid = writer.next_vid().await?;
4842        writer
4843            .insert_vertex_with_labels(
4844                baseline_vid,
4845                [("baseline".to_string(), Value::Bool(true))]
4846                    .into_iter()
4847                    .collect(),
4848                &["Baseline".to_string()],
4849                None,
4850            )
4851            .await?;
4852
4853        // Begin transaction — create a transaction L0
4854        let tx_l0 = writer.create_transaction_l0();
4855
4856        // Insert data in transaction
4857        let tx_vid = writer.next_vid().await?;
4858        writer
4859            .insert_vertex_with_labels(
4860                tx_vid,
4861                [("tx_data".to_string(), Value::Bool(true))]
4862                    .into_iter()
4863                    .collect(),
4864                &["TxData".to_string()],
4865                Some(&tx_l0),
4866            )
4867            .await?;
4868
4869        // Capture main L0 state before rollback
4870        let l0 = writer.l0_manager.get_current();
4871        let vertex_count_before = l0.read().vertex_properties.len();
4872
4873        // Rollback transaction (simulating what would happen after WAL flush failure)
4874        drop(tx_l0);
4875
4876        // Verify main L0 is unchanged
4877        let vertex_count_after = l0.read().vertex_properties.len();
4878        assert_eq!(
4879            vertex_count_before, vertex_count_after,
4880            "Main L0 should not change after rollback"
4881        );
4882
4883        // Baseline should still be present
4884        assert!(
4885            l0.read().vertex_properties.contains_key(&baseline_vid),
4886            "Baseline data should remain"
4887        );
4888
4889        // Transaction data should NOT be in main L0
4890        assert!(
4891            !l0.read().vertex_properties.contains_key(&tx_vid),
4892            "Transaction data should not be in main L0 after rollback"
4893        );
4894
4895        Ok(())
4896    }
4897
4898    /// Test that batch insert with shared labels does not clone labels per vertex.
4899    /// This verifies fix for issue #161 (redundant label cloning).
4900    #[tokio::test]
4901    async fn test_batch_insert_shared_labels() -> Result<()> {
4902        use crate::storage::manager::StorageManager;
4903        use object_store::local::LocalFileSystem;
4904        use object_store::path::Path as ObjectStorePath;
4905        use uni_common::core::schema::SchemaManager;
4906
4907        let dir = tempdir()?;
4908        let path = dir.path().to_str().unwrap();
4909        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
4910        let schema_path = ObjectStorePath::from("schema.json");
4911
4912        let schema_manager =
4913            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
4914        let _label_id = schema_manager.add_label("Person")?;
4915        schema_manager.save().await?;
4916
4917        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
4918
4919        let writer = Writer::new(storage.clone(), schema_manager.clone(), 1).await?;
4920
4921        // Shared labels - should not be cloned per vertex
4922        let labels = &["Person".to_string()];
4923
4924        // Insert batch of vertices with same labels
4925        let mut vids = Vec::new();
4926        for i in 0..100 {
4927            let vid = writer.next_vid().await?;
4928            let mut props = std::collections::HashMap::new();
4929            props.insert("id".to_string(), Value::Int(i));
4930            writer
4931                .insert_vertex_with_labels(vid, props, labels, None)
4932                .await?;
4933            vids.push(vid);
4934        }
4935
4936        // Verify all vertices have the correct labels
4937        let l0 = writer.l0_manager.get_current();
4938        for vid in vids {
4939            let l0_guard = l0.read();
4940            let vertex_labels = l0_guard.vertex_labels.get(&vid);
4941            assert!(vertex_labels.is_some(), "Vertex should have labels");
4942            assert_eq!(
4943                vertex_labels.unwrap(),
4944                &vec!["Person".to_string()],
4945                "Labels should match"
4946            );
4947        }
4948
4949        Ok(())
4950    }
4951
4952    /// Test that estimated_size tracks mutations correctly and approximates size_bytes().
4953    /// This verifies fix for issue #147 (O(V+E) size_bytes() in metrics).
4954    #[tokio::test]
4955    async fn test_estimated_size_tracks_mutations() -> Result<()> {
4956        use crate::storage::manager::StorageManager;
4957        use object_store::local::LocalFileSystem;
4958        use object_store::path::Path as ObjectStorePath;
4959        use uni_common::core::schema::SchemaManager;
4960
4961        let dir = tempdir()?;
4962        let path = dir.path().to_str().unwrap();
4963        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
4964        let schema_path = ObjectStorePath::from("schema.json");
4965
4966        let schema_manager =
4967            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
4968        let _label_id = schema_manager.add_label("Test")?;
4969        schema_manager.save().await?;
4970
4971        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
4972
4973        let writer = Writer::new(storage.clone(), schema_manager.clone(), 1).await?;
4974
4975        let l0 = writer.l0_manager.get_current();
4976
4977        // Initial state should be empty
4978        let initial_estimated = l0.read().estimated_size;
4979        let initial_actual = l0.read().size_bytes();
4980        assert_eq!(initial_estimated, 0, "Initial estimated_size should be 0");
4981        assert_eq!(initial_actual, 0, "Initial size_bytes should be 0");
4982
4983        // Insert vertices with properties
4984        let mut vids = Vec::new();
4985        for i in 0..10 {
4986            let vid = writer.next_vid().await?;
4987            let mut props = std::collections::HashMap::new();
4988            props.insert("name".to_string(), Value::String(format!("vertex_{}", i)));
4989            props.insert("index".to_string(), Value::Int(i));
4990            writer
4991                .insert_vertex_with_labels(vid, props, &[], None)
4992                .await?;
4993            vids.push(vid);
4994        }
4995
4996        // Verify estimated_size grew
4997        let after_vertices_estimated = l0.read().estimated_size;
4998        let after_vertices_actual = l0.read().size_bytes();
4999        assert!(
5000            after_vertices_estimated > 0,
5001            "estimated_size should grow after insertions"
5002        );
5003
5004        // Verify estimated_size is within reasonable bounds of actual size (within 2x)
5005        let ratio = after_vertices_estimated as f64 / after_vertices_actual as f64;
5006        assert!(
5007            (0.5..=2.0).contains(&ratio),
5008            "estimated_size ({}) should be within 2x of size_bytes ({}), ratio: {}",
5009            after_vertices_estimated,
5010            after_vertices_actual,
5011            ratio
5012        );
5013
5014        // Insert edges with a simple edge type
5015        let edge_type = 1u32;
5016        for i in 0..9 {
5017            let eid = writer.next_eid(edge_type).await?;
5018            writer
5019                .insert_edge(
5020                    vids[i],
5021                    vids[i + 1],
5022                    edge_type,
5023                    eid,
5024                    std::collections::HashMap::new(),
5025                    Some("NEXT".to_string()),
5026                    None,
5027                )
5028                .await?;
5029        }
5030
5031        // Verify estimated_size grew further
5032        let after_edges_estimated = l0.read().estimated_size;
5033        let after_edges_actual = l0.read().size_bytes();
5034        assert!(
5035            after_edges_estimated > after_vertices_estimated,
5036            "estimated_size should grow after edge insertions"
5037        );
5038
5039        // Verify still within reasonable bounds
5040        let ratio = after_edges_estimated as f64 / after_edges_actual as f64;
5041        assert!(
5042            (0.5..=2.0).contains(&ratio),
5043            "estimated_size ({}) should be within 2x of size_bytes ({}), ratio: {}",
5044            after_edges_estimated,
5045            after_edges_actual,
5046            ratio
5047        );
5048
5049        Ok(())
5050    }
5051
5052    /// Test that flushing WAL on a writer with no mutations succeeds cleanly.
5053    #[tokio::test]
5054    async fn test_flush_wal_empty_l0_is_noop() -> Result<()> {
5055        use crate::runtime::wal::WriteAheadLog;
5056        use crate::storage::manager::StorageManager;
5057        use object_store::local::LocalFileSystem;
5058        use object_store::path::Path as ObjectStorePath;
5059        use uni_common::core::schema::SchemaManager;
5060
5061        let dir = tempdir()?;
5062        let path = dir.path().to_str().unwrap();
5063        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5064        let schema_path = ObjectStorePath::from("schema.json");
5065
5066        let schema_manager =
5067            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5068        schema_manager.save().await?;
5069
5070        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5071
5072        let wal_path = ObjectStorePath::from("wal");
5073        let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
5074
5075        let writer = Writer::new_with_config(
5076            storage.clone(),
5077            schema_manager.clone(),
5078            1,
5079            UniConfig::default(),
5080            Some(wal.clone()),
5081            None,
5082        )
5083        .await?;
5084
5085        // Flush with no mutations — should succeed cleanly
5086        let lsn = writer.flush_wal().await?;
5087        // LSN should be 0 or 1 (no real mutations flushed)
5088        assert!(lsn <= 1, "Empty flush should produce low LSN, got {}", lsn);
5089
5090        Ok(())
5091    }
5092
5093    /// Test that transaction data does not leak into main L0 without commit.
5094    #[tokio::test]
5095    async fn test_transaction_isolation_without_commit() -> Result<()> {
5096        use crate::runtime::wal::WriteAheadLog;
5097        use crate::storage::manager::StorageManager;
5098        use object_store::local::LocalFileSystem;
5099        use object_store::path::Path as ObjectStorePath;
5100        use uni_common::core::schema::SchemaManager;
5101
5102        let dir = tempdir()?;
5103        let path = dir.path().to_str().unwrap();
5104        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5105        let schema_path = ObjectStorePath::from("schema.json");
5106
5107        let schema_manager =
5108            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5109        let _label_id = schema_manager.add_label("Person")?;
5110        schema_manager.save().await?;
5111
5112        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5113
5114        let wal_path = ObjectStorePath::from("wal");
5115        let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
5116
5117        let writer = Writer::new_with_config(
5118            storage.clone(),
5119            schema_manager.clone(),
5120            1,
5121            UniConfig::default(),
5122            Some(wal),
5123            None,
5124        )
5125        .await?;
5126
5127        // Create transaction L0
5128        let tx_l0 = writer.create_transaction_l0();
5129
5130        // Insert vertex into transaction L0
5131        let vid = writer.next_vid().await?;
5132        writer
5133            .insert_vertex_with_labels(
5134                vid,
5135                [("name".to_string(), Value::String("Ghost".to_string()))]
5136                    .into_iter()
5137                    .collect(),
5138                &["Person".to_string()],
5139                Some(&tx_l0),
5140            )
5141            .await?;
5142
5143        // Verify data is in transaction L0
5144        assert!(
5145            tx_l0.read().vertex_properties.contains_key(&vid),
5146            "Transaction L0 should contain the vertex"
5147        );
5148
5149        // Verify data is NOT in main L0
5150        let main_l0 = writer.l0_manager.get_current();
5151        assert!(
5152            !main_l0.read().vertex_properties.contains_key(&vid),
5153            "Main L0 should NOT contain uncommitted transaction data"
5154        );
5155
5156        // Drop transaction without committing — data should be lost
5157        drop(tx_l0);
5158
5159        // Main L0 still should not have it
5160        assert!(
5161            !main_l0.read().vertex_properties.contains_key(&vid),
5162            "Main L0 should remain clean after dropped transaction"
5163        );
5164
5165        Ok(())
5166    }
5167
5168    /// Phase 2 Day 12: the fork-fragment warn fires exactly once when
5169    /// the flush count crosses the configured threshold and stays
5170    /// silent on subsequent flushes for the lifetime of the writer.
5171    /// Primary writers (`fork_id == None`) never fire it.
5172    ///
5173    /// Tested directly against `tick_fork_fragment_observability` so
5174    /// the contract is locked in independently of the broader
5175    /// `flush_to_l1` path (the end-to-end fork-flush path is blocked
5176    /// on Day 10's on-the-fly schema overlay growth).
5177    #[tokio::test]
5178    async fn fork_fragment_warn_fires_once_then_silences() -> Result<()> {
5179        use crate::storage::manager::StorageManager;
5180        use object_store::local::LocalFileSystem;
5181        use object_store::path::Path as ObjectStorePath;
5182        use uni_common::core::fork::ForkId;
5183        use uni_common::core::schema::SchemaManager;
5184
5185        let dir = tempdir()?;
5186        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5187        let schema_path = ObjectStorePath::from("schema.json");
5188        let schema_manager =
5189            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5190        let storage = Arc::new(
5191            StorageManager::new(dir.path().to_str().unwrap(), schema_manager.clone()).await?,
5192        );
5193
5194        let config = UniConfig {
5195            fork_fragment_warn_threshold: 3,
5196            ..Default::default()
5197        };
5198        let mut writer =
5199            Writer::new_with_config(storage, schema_manager, 1, config, None, None).await?;
5200
5201        // Primary path: never fires.
5202        for _ in 0..10 {
5203            writer.tick_fork_fragment_observability();
5204        }
5205        assert!(!writer.fork_fragment_warn_fired.load(Ordering::Relaxed));
5206        assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 0);
5207
5208        // Fork path: tag and tick. Below threshold → no fire.
5209        writer.fork_id = Some(ForkId::new());
5210        writer.tick_fork_fragment_observability();
5211        writer.tick_fork_fragment_observability();
5212        assert!(!writer.fork_fragment_warn_fired.load(Ordering::Relaxed));
5213        assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 2);
5214
5215        // Crossing threshold → fires once.
5216        writer.tick_fork_fragment_observability();
5217        assert!(writer.fork_fragment_warn_fired.load(Ordering::Relaxed));
5218        assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 3);
5219
5220        // Subsequent ticks bump the gauge but do not re-fire.
5221        let fired_after = writer.fork_fragment_warn_fired.load(Ordering::Relaxed);
5222        for _ in 0..5 {
5223            writer.tick_fork_fragment_observability();
5224        }
5225        assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 8);
5226        assert_eq!(
5227            writer.fork_fragment_warn_fired.load(Ordering::Relaxed),
5228            fired_after
5229        );
5230
5231        Ok(())
5232    }
5233
5234    /// The hot-path mutators must not write to any `Writer` struct field.
5235    /// Phase 2 of the refactor
5236    /// gave them `&self` receivers, which the compiler enforces against
5237    /// direct `self.x = y` assignment — but interior-mutable writes
5238    /// (Mutex/Atomic/OnceLock) still compile. This regression test snapshots
5239    /// every potentially-writable field, calls each hot-path mutator, and
5240    /// asserts no field changed.
5241    ///
5242    /// Cold-path methods (`flush_to_l1`, `commit_transaction_l0`,
5243    /// `tick_fork_fragment_observability`) DO mutate fields by design and
5244    /// are intentionally out of scope here.
5245    #[tokio::test]
5246    async fn hot_path_mutators_do_not_change_writer_fields() -> Result<()> {
5247        use crate::storage::manager::StorageManager;
5248        use object_store::local::LocalFileSystem;
5249        use object_store::path::Path as ObjectStorePath;
5250        use uni_common::core::schema::SchemaManager;
5251
5252        let dir = tempdir()?;
5253        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5254        let schema_path = ObjectStorePath::from("schema.json");
5255        let schema_manager =
5256            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5257        schema_manager.add_label("Person")?;
5258        schema_manager.save().await?;
5259        let storage = Arc::new(
5260            StorageManager::new(dir.path().to_str().unwrap(), schema_manager.clone()).await?,
5261        );
5262
5263        let writer =
5264            Writer::new_with_config(storage, schema_manager, 1, UniConfig::default(), None, None)
5265                .await?;
5266
5267        /// Captures every `Writer` field that *could* be written by a
5268        /// hot-path mutator (i.e., every non-Arc, non-immutable-after-
5269        /// construction field). Arc'd substructures (`l0_manager`,
5270        /// `storage`, etc.) are intentionally not checked — they are
5271        /// re-pointed only at construction.
5272        #[derive(Debug, PartialEq)]
5273        struct Snapshot {
5274            last_flush_time: std::time::Instant,
5275            cached_manifest_some: bool,
5276            fork_flush_count: u64,
5277            fork_fragment_warn_fired: bool,
5278            xervo_runtime_some: bool,
5279            index_rebuild_manager_some: bool,
5280            fork_id: Option<ForkId>,
5281        }
5282
5283        fn snap(w: &Writer) -> Snapshot {
5284            Snapshot {
5285                last_flush_time: *w.last_flush_time.lock(),
5286                cached_manifest_some: w.cached_manifest.lock().is_some(),
5287                fork_flush_count: w.fork_flush_count.load(Ordering::Relaxed),
5288                fork_fragment_warn_fired: w.fork_fragment_warn_fired.load(Ordering::Relaxed),
5289                xervo_runtime_some: w.xervo_runtime.get().is_some(),
5290                index_rebuild_manager_some: w.index_rebuild_manager.get().is_some(),
5291                fork_id: w.fork_id,
5292            }
5293        }
5294
5295        // 1. insert_vertex_with_labels
5296        let before = snap(&writer);
5297        let vid = writer.next_vid().await?;
5298        writer
5299            .insert_vertex_with_labels(vid, Properties::new(), &["Person".to_string()], None)
5300            .await?;
5301        assert_eq!(
5302            snap(&writer),
5303            before,
5304            "insert_vertex_with_labels mutated a Writer field"
5305        );
5306
5307        // 2. insert_vertices_batch
5308        let before = snap(&writer);
5309        let vids = writer.allocate_vids(2).await?;
5310        writer
5311            .insert_vertices_batch(
5312                vids,
5313                vec![Properties::new(), Properties::new()],
5314                vec!["Person".into()],
5315                None,
5316            )
5317            .await?;
5318        assert_eq!(
5319            snap(&writer),
5320            before,
5321            "insert_vertices_batch mutated a Writer field"
5322        );
5323
5324        // 3. delete_vertex
5325        let before = snap(&writer);
5326        writer.delete_vertex(vid, None, None).await?;
5327        assert_eq!(
5328            snap(&writer),
5329            before,
5330            "delete_vertex mutated a Writer field"
5331        );
5332
5333        // (insert_edge / delete_edge are skipped here: their fixture cost is
5334        // disproportionate to the audit's marginal value, and the same
5335        // structural argument plus the compiler-enforced `&self` covers them.)
5336
5337        Ok(())
5338    }
5339}