Skip to main content

uni_store/runtime/
writer.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::runtime::context::QueryContext;
5use crate::runtime::flush_coordinator::{
6    FinalizeFn, FlushCoordinator, FlushOutcome as AsyncFlushOutcome, RotatedFlush, SharedFlushCtx,
7};
8use crate::runtime::id_allocator::IdAllocator;
9use crate::runtime::l0::{L0Buffer, serialize_constraint_key};
10use crate::runtime::l0_manager::L0Manager;
11use crate::runtime::property_manager::PropertyManager;
12use crate::runtime::wal::WriteAheadLog;
13use crate::storage::adjacency_manager::AdjacencyManager;
14use crate::storage::delta::{L1Entry, Op};
15use crate::storage::main_edge::MainEdgeDataset;
16use crate::storage::main_vertex::MainVertexDataset;
17use crate::storage::manager::StorageManager;
18use anyhow::{Result, anyhow};
19use chrono::Utc;
20use metrics;
21use parking_lot::{Mutex as PlMutex, RwLock};
22use std::collections::{HashMap, HashSet};
23use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
24use std::sync::{Arc, OnceLock};
25use tracing::{debug, info, instrument};
26use uni_common::Properties;
27use uni_common::Value;
28use uni_common::config::UniConfig;
29use uni_common::core::fork::ForkId;
30use uni_common::core::id::{Eid, Vid};
31use uni_common::core::schema::{ConstraintTarget, ConstraintType, IndexDefinition};
32use uni_common::core::snapshot::{EdgeSnapshot, LabelSnapshot, SnapshotManifest};
33use uni_xervo::runtime::ModelRuntime;
34use uuid::Uuid;
35
36#[derive(Clone, Debug)]
37pub struct WriterConfig {
38    pub max_mutations: usize,
39    /// Enable the partial-column MergeInsert path for SET-only flushes.
40    ///
41    /// When `true`, `Writer::insert_vertex_partial` records the touched
42    /// property keys into `L0Buffer::vertex_partial_keys` and the flush
43    /// routes those VIDs through Lance `MergeInsertBuilder` with a
44    /// subset-of-schema source, skipping the read of (and write of)
45    /// the unchanged columns — including wide ones like embeddings.
46    ///
47    /// When `false`, `insert_vertex_partial` falls back to the
48    /// read-modify-write `insert_vertex_with_labels` path (preserving
49    /// bit-for-bit equivalence with prior releases). Default `false`
50    /// for the first release; flip to `true` after telemetry on the
51    /// issue #72 ingest workload confirms the win.
52    ///
53    /// See the soundness probe at
54    /// `crates/uni-store/tests/common/storage/lance_merge_insert_probe.rs`.
55    pub partial_lance_writes: bool,
56}
57
58impl Default for WriterConfig {
59    fn default() -> Self {
60        Self {
61            max_mutations: 10_000,
62            partial_lance_writes: false,
63        }
64    }
65}
66
67/// RAII latch on [`StorageManager::flush_in_progress`].
68///
69/// Sets the flag to `true` on construction (via CAS) and back to `false` on
70/// drop, so any `?` early-exit inside `flush_to_l1` cannot leave the flag
71/// stuck. Returns `None` if a flush is already in progress, providing
72/// forward-compatible exclusion once the outer writer-RwLock is removed in
73/// Phase 4 of the concurrent-writer refactor.
74// FlushInProgressGuard moved to storage/manager.rs so flush_coordinator.rs
75// can hold it on RotatedFlush without a writer.rs back-import cycle.
76pub use crate::storage::manager::FlushInProgressGuard;
77
78/// Output of [`Writer::flush_l0_rotate`]: the to-be-flushed L0 buffer,
79/// captured WAL LSN, current_version, and the in-progress guard whose
80/// lifetime spans the full flush (including the future async stream
81/// phase that runs on a spawned task).
82struct RotateOutput {
83    old_l0_arc: Arc<RwLock<L0Buffer>>,
84    wal_lsn: u64,
85    current_version: u64,
86    flush_in_progress_guard: FlushInProgressGuard,
87}
88
89/// Project a property map to a subset selected by `keys`. Used to
90/// run `touched_needs_full_read` against just the SET-touched keys
91/// when the caller passes a fully-merged `props` map.
92fn props_subset(props: &Properties, keys: &HashSet<String>) -> Properties {
93    let mut out = Properties::new();
94    for k in keys {
95        if let Some(v) = props.get(k) {
96            out.insert(k.clone(), v.clone());
97        }
98    }
99    out
100}
101
102/// Output of [`Writer::flush_stream_l1`]: the built (but not yet
103/// published) snapshot manifest and its id. Finalize is responsible
104/// for `save_snapshot` + `set_latest_snapshot` + `cached_manifest`
105/// update.
106struct FlushOutcome {
107    manifest: SnapshotManifest,
108    snapshot_id: String,
109}
110
111pub struct Writer {
112    pub l0_manager: Arc<L0Manager>,
113    pub storage: Arc<StorageManager>,
114    pub schema_manager: Arc<uni_common::core::schema::SchemaManager>,
115    pub allocator: Arc<IdAllocator>,
116    pub config: UniConfig,
117    /// Optional embedding runtime. `OnceLock` so the initializer can run
118    /// on `&self` after the `Writer` has been wrapped in `Arc<Writer>`
119    /// (Phase 4 of concurrent_writer.md). Read through
120    /// [`Writer::xervo_runtime`] — the field itself is private to keep
121    /// callers oblivious to the OnceLock representation.
122    xervo_runtime: OnceLock<Arc<ModelRuntime>>,
123    /// Property manager for cache invalidation after flush
124    pub property_manager: Option<Arc<PropertyManager>>,
125    /// Adjacency manager for dual-write (edges survive flush).
126    adjacency_manager: Arc<AdjacencyManager>,
127    /// Timestamp of last flush or creation. Interior-mutable so that
128    /// `&self` callers can update it; uncontended in practice because all
129    /// writes happen inside the single-flusher critical section.
130    /// Arc-wrapped so it can travel into the SharedFlushCtx that the
131    /// async-flush coordinator passes to spawned stream/finalize tasks.
132    last_flush_time: Arc<PlMutex<std::time::Instant>>,
133    /// Background compaction task handle (prevents concurrent compaction races)
134    compaction_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
135    /// Optional index rebuild manager for post-flush automatic rebuild scheduling.
136    /// `OnceLock` for the same reason as `xervo_runtime`.
137    /// Wrapped in `Arc` so the async-flush finalize path can read it
138    /// from a spawned task via `SharedFlushCtx`.
139    index_rebuild_manager: Arc<OnceLock<Arc<crate::storage::index_rebuild::IndexRebuildManager>>>,
140    /// Cached snapshot manifest from the last flush. Avoids re-reading from
141    /// object store on every flush_to_l1 call. Wrapped in a `Mutex` for
142    /// `&self` access; uncontended because all access is inside the
143    /// single-flusher critical section.
144    cached_manifest: Arc<PlMutex<Option<SnapshotManifest>>>,
145    /// Identifier of the fork this writer serves, if any. `None` for
146    /// primary's writer. Set by [`crate::fork::writer_factory::new_for_fork`]
147    /// and read in `flush_to_l1` to emit fork-tagged metrics and to fire
148    /// the fragment-count guard rail (Phase 2 Day 12).
149    pub fork_id: Option<ForkId>,
150    /// Number of `flush_to_l1` calls since this writer was constructed.
151    /// Used as a proxy for L1 fragment growth on the fork's branches:
152    /// each flush typically appends ~1 fragment per touched dataset, so
153    /// the count tracks the order of magnitude of fragment accumulation.
154    /// Reading the actual `Dataset::manifest().fragments.len()` per
155    /// flush would add a per-dataset object-store roundtrip on the hot
156    /// commit path; the proxy keeps the guard rail purely observational
157    /// (Phase 5 introduces fork compaction proper). Only meaningful when
158    /// `fork_id.is_some()`. `Relaxed` is sufficient — observational only.
159    fork_flush_count: Arc<AtomicU64>,
160    /// Whether the fork-fragment warning has already fired at the
161    /// configured threshold. One-shot per writer lifetime. `Relaxed` is
162    /// sufficient — observational only.
163    fork_fragment_warn_fired: Arc<AtomicBool>,
164    /// Dedicated lock for the genuinely-exclusive flush path. Acquired by
165    /// the [`Writer::flush_to_l1`] entry and by `commit_transaction_l0`
166    /// across its WAL-append + L0-merge window. Replaces the outer
167    /// `Arc<RwLock<Writer>>` for flush exclusion once Phase 4 drops it.
168    /// Arc-wrapped so async-flush coordinator's finalize path can
169    /// re-acquire it from a spawned task via SharedFlushCtx.
170    flush_lock: Arc<tokio::sync::Mutex<()>>,
171    /// Coordinator for async-flush pipeline. Owns the back-pressure
172    /// semaphore, rotate-order sequence, single-finalizer task, and
173    /// pending-flush counter. Always present even when async flush is
174    /// disabled — the sync `flush_to_l1` path uses it for the future
175    /// `FlushInProgressGuard`/permit ownership model.
176    /// Coordinator is `None` when `async_flush_enabled = false`. The
177    /// coordinator's finalizer task captures `SharedFlushCtx` which
178    /// includes `Arc<StorageManager>`; on a fork-scoped Writer that
179    /// also pins the fork's `ForkScope` via `storage.fork_scope`, so
180    /// the holder count never drops. Constructing it only when the
181    /// feature is actually on avoids that side-effect for all
182    /// existing sync-flush paths. When async-flush graduates from
183    /// opt-in to default (Commit 12), `drop_fork` (Commit 8) handles
184    /// the drain explicitly.
185    #[allow(dead_code)] // first production use lands in Commit 6/7
186    pub(crate) flush_coordinator: Option<Arc<crate::runtime::flush_coordinator::FlushCoordinator>>,
187    /// Optimistic-concurrency commit-sequence counter (SSI). Incremented once
188    /// per successful commit under `flush_lock`; a transaction captures the
189    /// current value at begin as its read sequence (`L0Buffer::occ_read_seq`).
190    ///
191    /// Always allocated; consulted only when `config.ssi_enabled` is `true`.
192    commit_sequence: Arc<AtomicU64>,
193    /// Bounded log of recently-committed write-sets for OCC conflict detection.
194    /// Read and updated only under `flush_lock`.
195    ///
196    /// Always allocated; consulted only when `config.ssi_enabled` is `true`.
197    committed_writes: Arc<PlMutex<crate::runtime::occ::CommitRegistry>>,
198    /// Per-row pessimistic locks for `FOR UPDATE` (SSI escape hatch), keyed by
199    /// canonical (label, key-props) bytes. A transaction holds the lock from
200    /// MATCH until commit/rollback, serializing concurrent `FOR UPDATE` writers
201    /// on the same key (avoiding optimistic abort-retry on hot keys).
202    ///
203    /// Always allocated; populated only when `config.ssi_enabled` is `true`.
204    for_update_locks: Arc<dashmap::DashMap<Vec<u8>, Arc<tokio::sync::Mutex<()>>>>,
205}
206
207/// Number of recent commits retained for OCC conflict detection. Large enough
208/// that under-run — and the resulting conservative abort — is rare in practice;
209/// each entry is a small set of touched ids.
210const OCC_REGISTRY_CAPACITY: usize = 4096;
211
212impl Writer {
213    pub async fn new(
214        storage: Arc<StorageManager>,
215        schema_manager: Arc<uni_common::core::schema::SchemaManager>,
216        start_version: u64,
217    ) -> Result<Self> {
218        Self::new_with_config(
219            storage,
220            schema_manager,
221            start_version,
222            UniConfig::default(),
223            None,
224            None,
225        )
226        .await
227    }
228
229    pub async fn new_with_config(
230        storage: Arc<StorageManager>,
231        schema_manager: Arc<uni_common::core::schema::SchemaManager>,
232        start_version: u64,
233        config: UniConfig,
234        wal: Option<Arc<WriteAheadLog>>,
235        allocator: Option<Arc<IdAllocator>>,
236    ) -> Result<Self> {
237        let allocator = if let Some(a) = allocator {
238            a
239        } else {
240            let store = storage.store();
241            let path = object_store::path::Path::from("id_allocator.json");
242            Arc::new(IdAllocator::new(store, path, 1000).await?)
243        };
244
245        let l0_manager = Arc::new(L0Manager::new(start_version, wal));
246
247        let property_manager = Some(Arc::new(PropertyManager::new(
248            storage.clone(),
249            schema_manager.clone(),
250            1000,
251        )));
252
253        let adjacency_manager = storage.adjacency_manager();
254
255        // Hoist the Arc'd fields so we can both stash them on Writer and
256        // hand the same Arcs to the SharedFlushCtx that FlushCoordinator
257        // captures. Single-source-of-truth for each piece of mutable
258        // shared state.
259        let last_flush_time = Arc::new(PlMutex::new(std::time::Instant::now()));
260        let cached_manifest = Arc::new(PlMutex::new(None));
261        let fork_flush_count = Arc::new(AtomicU64::new(0));
262        let fork_fragment_warn_fired = Arc::new(AtomicBool::new(false));
263        let flush_lock = Arc::new(tokio::sync::Mutex::new(()));
264        let compaction_handle = Arc::new(RwLock::new(None));
265        let index_rebuild_manager: Arc<
266            OnceLock<Arc<crate::storage::index_rebuild::IndexRebuildManager>>,
267        > = Arc::new(OnceLock::new());
268
269        let flush_coordinator = if config.async_flush_enabled {
270            let shared = SharedFlushCtx {
271                storage: storage.clone(),
272                l0_manager: l0_manager.clone(),
273                adjacency_manager: adjacency_manager.clone(),
274                property_manager: property_manager.clone(),
275                schema_manager: schema_manager.clone(),
276                cached_manifest: cached_manifest.clone(),
277                last_flush_time: last_flush_time.clone(),
278                fork_id: None,
279                fork_flush_count: fork_flush_count.clone(),
280                fork_fragment_warn_fired: fork_fragment_warn_fired.clone(),
281                fork_fragment_warn_threshold: config.fork_fragment_warn_threshold,
282                flush_lock: flush_lock.clone(),
283                index_rebuild_manager: index_rebuild_manager.clone(),
284                compaction_handle: compaction_handle.clone(),
285                compaction_config: config.compaction.clone(),
286                index_rebuild_config: config.index_rebuild.clone(),
287                auto_rebuild_enabled: config.index_rebuild.auto_rebuild_enabled,
288            };
289            let finalize_fn: Arc<dyn FinalizeFn> = Arc::new(WriterFinalizer);
290            Some(Arc::new(FlushCoordinator::new(
291                config.max_pending_flushes,
292                shared,
293                finalize_fn,
294            )))
295        } else {
296            None
297        };
298
299        let commit_sequence = Arc::new(AtomicU64::new(0));
300        let committed_writes = Arc::new(PlMutex::new(crate::runtime::occ::CommitRegistry::new(
301            OCC_REGISTRY_CAPACITY,
302        )));
303        let for_update_locks = Arc::new(dashmap::DashMap::new());
304
305        Ok(Self {
306            l0_manager,
307            storage,
308            schema_manager,
309            allocator,
310            config,
311            xervo_runtime: OnceLock::new(),
312            property_manager,
313            adjacency_manager,
314            last_flush_time,
315            compaction_handle,
316            index_rebuild_manager,
317            cached_manifest,
318            fork_id: None,
319            fork_flush_count,
320            fork_fragment_warn_fired,
321            flush_lock,
322            flush_coordinator,
323            commit_sequence,
324            committed_writes,
325            for_update_locks,
326        })
327    }
328
329    /// Returns the shared pessimistic lock handle for a `FOR UPDATE` row key,
330    /// creating it on first use. The caller `.lock_owned().await`s the returned
331    /// mutex and holds the guard for the transaction's lifetime.
332    pub fn row_lock_handle(&self, key: &[u8]) -> Arc<tokio::sync::Mutex<()>> {
333        self.for_update_locks
334            .entry(key.to_vec())
335            .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
336            .clone()
337    }
338
339    /// Prunes `FOR UPDATE` lock-map entries for `keys` that no live transaction
340    /// holds anymore, so the map does not grow without bound across the keyspace.
341    ///
342    /// Called when a transaction ends, **after** its guards have been dropped.
343    /// `remove_if` evaluates its predicate under the DashMap shard lock, which is
344    /// the same lock `row_lock_handle` takes to clone an entry — so the check
345    /// `strong_count == 1` (only the map holds the `Arc`) is race-free: a
346    /// concurrent acquirer either already cloned the `Arc` (count ≥ 2 → we skip
347    /// removal) or has not yet taken the shard lock (it will mint a fresh entry
348    /// after we remove). Either way no two transactions ever lock different
349    /// `Mutex` instances for the same key.
350    pub fn release_for_update_locks(&self, keys: &[Vec<u8>]) {
351        for key in keys {
352            self.for_update_locks
353                .remove_if(key, |_, handle| Arc::strong_count(handle) == 1);
354        }
355    }
356
357    /// Number of live entries in the `FOR UPDATE` lock map. Introspection for
358    /// tests that the map does not leak entries across transactions (G5).
359    pub fn for_update_lock_count(&self) -> usize {
360        self.for_update_locks.len()
361    }
362
363    /// The current OCC commit sequence. A `FOR UPDATE` acquisition re-stamps a
364    /// fresh transaction's `occ_read_seq` to this so its conflict-detection
365    /// baseline advances to lock-acquisition time (read-latest under the lock).
366    pub fn current_commit_sequence(&self) -> u64 {
367        self.commit_sequence.load(Ordering::Relaxed)
368    }
369
370    /// Build a fresh `SharedFlushCtx` from this Writer's current state.
371    /// Used by the async-flush stream/finalize paths to pass into spawned
372    /// tasks without smuggling `Arc<Writer>` (which would create a cycle
373    /// with `flush_coordinator -> FinalizeFn -> Writer`).
374    pub(crate) fn shared_ctx(&self) -> SharedFlushCtx {
375        SharedFlushCtx {
376            storage: self.storage.clone(),
377            l0_manager: self.l0_manager.clone(),
378            adjacency_manager: self.adjacency_manager.clone(),
379            property_manager: self.property_manager.clone(),
380            schema_manager: self.schema_manager.clone(),
381            cached_manifest: self.cached_manifest.clone(),
382            last_flush_time: self.last_flush_time.clone(),
383            fork_id: self.fork_id,
384            fork_flush_count: self.fork_flush_count.clone(),
385            fork_fragment_warn_fired: self.fork_fragment_warn_fired.clone(),
386            fork_fragment_warn_threshold: self.config.fork_fragment_warn_threshold,
387            flush_lock: self.flush_lock.clone(),
388            index_rebuild_manager: self.index_rebuild_manager.clone(),
389            compaction_handle: self.compaction_handle.clone(),
390            compaction_config: self.config.compaction.clone(),
391            index_rebuild_config: self.config.index_rebuild.clone(),
392            auto_rebuild_enabled: self.config.index_rebuild.auto_rebuild_enabled,
393        }
394    }
395
396    /// Borrow the flush coordinator if async flush is enabled.
397    /// Returns `None` when `config.async_flush_enabled = false`.
398    /// External callers (`drop_fork`) use this to drain pending streams.
399    pub fn flush_coordinator(
400        &self,
401    ) -> Option<&Arc<crate::runtime::flush_coordinator::FlushCoordinator>> {
402        self.flush_coordinator.as_ref()
403    }
404
405    /// Set the index rebuild manager for post-flush automatic rebuild scheduling.
406    ///
407    /// One-shot: returns `Err` if already set. The receiver is `&self` so this
408    /// can be called after the `Writer` has been wrapped in `Arc<Writer>`.
409    pub fn set_index_rebuild_manager(
410        &self,
411        manager: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
412    ) -> Result<()> {
413        self.index_rebuild_manager
414            .set(manager)
415            .map_err(|_| anyhow!("index_rebuild_manager already set"))
416    }
417
418    /// Replay WAL mutations into the current L0 buffer.
419    pub async fn replay_wal(&self, wal_high_water_mark: u64) -> Result<usize> {
420        let l0 = self.l0_manager.get_current();
421        let wal = l0.read().wal.clone();
422
423        if let Some(wal) = wal {
424            wal.initialize().await?;
425            let mutations = wal.replay_since(wal_high_water_mark).await?;
426            let count = mutations.len();
427
428            if count > 0 {
429                log::info!(
430                    "Replaying {} mutations from WAL (LSN > {})",
431                    count,
432                    wal_high_water_mark
433                );
434                let mut l0_guard = l0.write();
435                l0_guard.replay_mutations(mutations)?;
436                // Rebuild the UNIQUE constraint index over the recovered rows
437                // (Bug #9 Mechanism B). `replay_mutations` restores
438                // vertices/properties/labels but never repopulates
439                // `constraint_index` (its only other caller is the live insert
440                // path). Without this, a unique key that lives only in the WAL
441                // (committed but not yet flushed to Lance) is invisible to
442                // `check_unique_constraint_multi` after recovery and a
443                // duplicate of it could be created.
444                self.rebuild_constraint_index(&mut l0_guard);
445            }
446
447            Ok(count)
448        } else {
449            Ok(0)
450        }
451    }
452
453    /// Rebuild the UNIQUE constraint index on a recovered L0 buffer.
454    ///
455    /// Scans every recovered vertex's properties and, for each enabled UNIQUE
456    /// constraint whose target label the vertex carries and whose member
457    /// properties are all present, inserts the same constraint key the live
458    /// insert path builds (`serialize_constraint_key`). Tombstoned vertices are
459    /// skipped. Called after [`L0Buffer::replay_mutations`] under the buffer's
460    /// write lock; the schema is already loaded on the `Writer`.
461    fn rebuild_constraint_index(&self, l0_guard: &mut L0Buffer) {
462        let schema = self.schema_manager.schema();
463        // Collect entries first to avoid borrowing `vertex_properties`
464        // immutably while mutating `constraint_index` through the same guard.
465        let mut keys: Vec<(Vec<u8>, Vid)> = Vec::new();
466        for (&vid, props) in &l0_guard.vertex_properties {
467            if l0_guard.vertex_tombstones.contains(&vid) {
468                continue;
469            }
470            let Some(labels) = l0_guard.vertex_labels.get(&vid) else {
471                continue;
472            };
473            for label in labels {
474                for constraint in &schema.constraints {
475                    if !constraint.enabled {
476                        continue;
477                    }
478                    let ConstraintTarget::Label(l) = &constraint.target else {
479                        continue;
480                    };
481                    if l != label {
482                        continue;
483                    }
484                    let ConstraintType::Unique {
485                        properties: unique_props,
486                    } = &constraint.constraint_type
487                    else {
488                        continue;
489                    };
490                    let mut key_values = Vec::new();
491                    let mut all_present = true;
492                    for prop in unique_props {
493                        if let Some(val) = props.get(prop) {
494                            key_values.push((prop.clone(), val.clone()));
495                        } else {
496                            all_present = false;
497                            break;
498                        }
499                    }
500                    if all_present {
501                        keys.push((serialize_constraint_key(label, &key_values), vid));
502                    }
503                }
504            }
505        }
506        for (key, vid) in keys {
507            l0_guard.insert_constraint_key(key, vid);
508        }
509    }
510
511    /// Allocates the next VID (pure auto-increment).
512    pub async fn next_vid(&self) -> Result<Vid> {
513        self.allocator.allocate_vid().await
514    }
515
516    /// Allocates multiple VIDs at once for bulk operations.
517    /// This is more efficient than calling next_vid() in a loop.
518    pub async fn allocate_vids(&self, count: usize) -> Result<Vec<Vid>> {
519        self.allocator.allocate_vids(count).await
520    }
521
522    /// Allocates the next EID (pure auto-increment).
523    pub async fn next_eid(&self, _type_id: u32) -> Result<Eid> {
524        self.allocator.allocate_eid().await
525    }
526
527    /// Allocates multiple EIDs at once for bulk operations.
528    /// This is more efficient than calling next_eid() in a loop.
529    pub async fn allocate_eids(&self, count: usize) -> Result<Vec<Eid>> {
530        self.allocator.allocate_eids(count).await
531    }
532
533    /// Install the embedding runtime exactly once. Receiver is `&self` so it
534    /// can be called after the `Writer` has been wrapped in `Arc<Writer>`.
535    pub fn set_xervo_runtime(&self, runtime: Arc<ModelRuntime>) -> Result<()> {
536        self.xervo_runtime
537            .set(runtime)
538            .map_err(|_| anyhow!("xervo_runtime already set"))
539    }
540
541    pub fn xervo_runtime(&self) -> Option<Arc<ModelRuntime>> {
542        self.xervo_runtime.get().cloned()
543    }
544
545    /// Create a new empty L0 buffer for transaction-scoped mutations.
546    ///
547    /// Only reads the current version — no exclusive lock required on Writer.
548    /// The returned buffer has no WAL reference; mutations are logged at
549    /// commit time via [`Self::commit_transaction_l0`].
550    pub fn create_transaction_l0(&self) -> Arc<RwLock<L0Buffer>> {
551        let current_version = self.l0_manager.get_current().read().current_version;
552        // Transaction mutations are logged to WAL at COMMIT time, not during the transaction.
553        let buf = L0Buffer::new(current_version, None);
554        // SSI: stamp the OCC read sequence at begin so commit can detect any
555        // transaction that committed since. Gated on the runtime `ssi_enabled`
556        // toggle — when off, `occ_read_set` stays `None` and every downstream
557        // read-set recording / commit validation self-gates to a no-op.
558        let buf = if self.config.ssi_enabled {
559            let mut buf = buf;
560            buf.occ_read_seq = self.commit_sequence.load(Ordering::Relaxed);
561            // The read path records observed ids here for SSI antidependency
562            // detection; commit consults it.
563            buf.occ_read_set = Some(Arc::new(parking_lot::Mutex::new(
564                crate::runtime::l0::OccReadSet::default(),
565            )));
566            buf
567        } else {
568            buf
569        };
570        Arc::new(RwLock::new(buf))
571    }
572
573    /// Resolve the target L0 buffer for a mutation.
574    ///
575    /// When `tx_l0` is `Some`, the mutation targets a transaction-private buffer.
576    /// When `None`, it targets the global L0 from the manager.
577    fn resolve_l0(&self, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) -> Arc<RwLock<L0Buffer>> {
578        tx_l0
579            .cloned()
580            .unwrap_or_else(|| self.l0_manager.get_current())
581    }
582
583    fn update_metrics(&self) {
584        let l0 = self.l0_manager.get_current();
585        let size = l0.read().estimated_size;
586        metrics::gauge!("l0_buffer_size_bytes").set(size as f64);
587    }
588
589    /// Overlay-aware issue-#77 edge-endpoint validation.
590    ///
591    /// The current buffer alone does not hold all committed-but-unflushed
592    /// tombstones — a flush rotation moves them onto `pending_flush` until
593    /// the Lance write completes. A vertex is effectively deleted iff,
594    /// walking newest-first (tx → current → pending newest→oldest), the
595    /// first buffer that knows the vid says "tombstoned" (an insert clears
596    /// the tombstone within a buffer, so props/tombstone are mutually
597    /// exclusive per buffer).
598    ///
599    /// Must run under `flush_lock` so the overlay cannot change before the
600    /// merge, and BEFORE the durable WAL flush (see the call site).
601    fn validate_edge_endpoints_overlay(&self, tx_l0: &L0Buffer) -> Result<()> {
602        let chain = self.l0_manager.get_pending_flush();
603        let current = self.l0_manager.get_current();
604        let effectively_deleted = |vid: &Vid| -> bool {
605            if tx_l0.vertex_properties.contains_key(vid) {
606                return false;
607            }
608            if tx_l0.vertex_tombstones.contains(vid) {
609                return true;
610            }
611            {
612                let cur = current.read();
613                if cur.vertex_properties.contains_key(vid) {
614                    return false;
615                }
616                if cur.vertex_tombstones.contains(vid) {
617                    return true;
618                }
619            }
620            for frozen in chain.iter().rev() {
621                let g = frozen.read();
622                if g.vertex_properties.contains_key(vid) {
623                    return false;
624                }
625                if g.vertex_tombstones.contains(vid) {
626                    return true;
627                }
628            }
629            false
630        };
631        for (eid, (src_vid, dst_vid, _etype)) in &tx_l0.edge_endpoints {
632            if tx_l0.tombstones.contains_key(eid) {
633                continue; // a deletion, not an insertion — never resurrects a vertex
634            }
635            if effectively_deleted(src_vid) {
636                anyhow::bail!(
637                    "Cannot insert edge {}: source vertex {} has been deleted (issue #77)",
638                    eid,
639                    src_vid
640                );
641            }
642            if effectively_deleted(dst_vid) {
643                anyhow::bail!(
644                    "Cannot insert edge {}: destination vertex {} has been deleted (issue #77)",
645                    eid,
646                    dst_vid
647                );
648            }
649        }
650        Ok(())
651    }
652
653    /// Seed `main_l0` (the current buffer) with the newest pending-overlay
654    /// value for each CRDT property the transaction writes, so the commit
655    /// merge MERGES against the committed CRDT state instead of shadowing it
656    /// (the carve-out lets concurrent CRDT writers commit on the assumption
657    /// that the merge sees the committed value — true only while that value
658    /// lives in current, not mid-flush on `pending_flush`). No-op when
659    /// nothing is pending or the property already exists in current. Vertex
660    /// properties only, mirroring the carve-out itself.
661    fn seed_crdt_state_from_chain(&self, tx_l0: &L0Buffer, main_l0: &mut L0Buffer) {
662        let chain = self.l0_manager.get_pending_flush();
663        if chain.is_empty() {
664            return;
665        }
666        for (vid, props) in &tx_l0.vertex_properties {
667            Self::seed_crdt_props(&chain, *vid, props, main_l0);
668        }
669    }
670
671    /// Per-vertex CRDT seeding (see [`Self::seed_crdt_state_from_chain`]).
672    /// Also used by the non-transactional vertex write path, which CRDT-merges
673    /// into the current buffer directly and has the same shadowing hazard
674    /// during a flush window.
675    fn seed_crdt_props(
676        chain: &[Arc<RwLock<L0Buffer>>],
677        vid: Vid,
678        props: &Properties,
679        target: &mut L0Buffer,
680    ) {
681        for (key, value) in props {
682            if crate::runtime::l0::try_as_crdt(value).is_none() {
683                continue;
684            }
685            if target
686                .vertex_properties
687                .get(&vid)
688                .is_some_and(|p| p.contains_key(key))
689            {
690                continue;
691            }
692            // Newest generation first: the first hit is the live state.
693            for frozen in chain.iter().rev() {
694                let g = frozen.read();
695                if let Some(v) = g.vertex_properties.get(&vid).and_then(|p| p.get(key)) {
696                    if crate::runtime::l0::try_as_crdt(v).is_some() {
697                        target
698                            .vertex_properties
699                            .entry(vid)
700                            .or_default()
701                            .insert(key.clone(), v.clone());
702                    }
703                    break;
704                }
705            }
706        }
707    }
708
709    /// Commit an externally-owned transaction L0 buffer.
710    ///
711    /// Writes mutations to WAL, flushes, merges into main L0, and replays
712    /// edges into the AdjacencyManager. Returns the WAL LSN of the commit
713    /// (0 when no WAL is configured).
714    /// Commit a transaction's private L0 buffer into main L0.
715    ///
716    /// Returns `(wal_lsn, flush_pending)`. When `flush_pending == true`, the
717    /// post-commit `should_flush()` predicate fired but no flush ran — the
718    /// caller is expected to spawn a background `flush_to_l1`. This is the
719    /// shape used when `UniConfig::async_flush_enabled` is set, so commits
720    /// don't block on L1-streaming I/O.
721    pub async fn commit_transaction_l0(
722        self: &Arc<Self>,
723        tx_l0_arc: Arc<RwLock<L0Buffer>>,
724    ) -> Result<(u64, bool)> {
725        // Hold `flush_lock` across WAL append + flush + main-L0 merge.
726        // Two concurrent commits serialize here; in Phase 3 the outer
727        // `Arc<RwLock<Writer>>` already provides this exclusion, so the
728        // acquisition is uncontended. Phase 4 drops the outer lock and
729        // this becomes the load-bearing serialization point.
730        let _flush_lock_guard = self.flush_lock.lock().await;
731
732        // Crash-recovery seam: simulate process death immediately after winning
733        // the commit serialization point but before any durable work. No-op
734        // unless built with `--features failpoints`. (See ssi_resilience tests.)
735        fail::fail_point!("commit::after-flush-lock");
736
737        // SSI: optimistic conflict detection. This MUST run before any WAL
738        // write — `flush_wal()` below is the durable commit point and the WAL
739        // has no abort marker, so aborting after it would resurrect this
740        // transaction on crash recovery. The write-set is reused for
741        // registration after a successful merge.
742        // Runtime-gated on `config.ssi_enabled`. When off, no validation runs
743        // and `occ_write_set` is `None`, so the post-merge registration below
744        // is skipped — reproducing last-writer-wins exactly.
745        let occ_write_set: Option<crate::runtime::occ::WriteSet> = if self.config.ssi_enabled {
746            let tx_l0 = tx_l0_arc.read();
747            let read_seq = tx_l0.occ_read_seq;
748            let write_set = crate::runtime::occ::WriteSet::from_l0(&tx_l0);
749            if !write_set.is_empty() {
750                // Telemetry: one validation per non-empty (writing) commit. The
751                // ratio of conflicts to validations is the headline abort rate.
752                metrics::counter!("uni_ssi_commit_validations_total").increment(1);
753                // Read-set is consulted only for writing transactions, so a
754                // read-only commit (empty write-set) runs at snapshot isolation.
755                let read_guard = tx_l0.occ_read_set.as_ref().map(|rs| rs.lock());
756                if let Some(conflict) =
757                    self.committed_writes
758                        .lock()
759                        .check(read_seq, &write_set, read_guard.as_deref())
760                {
761                    use crate::runtime::occ::Conflict;
762                    match &conflict {
763                        Conflict::WriteWrite { .. } => metrics::counter!(
764                            "uni_ssi_serialization_conflicts_total",
765                            "kind" => "write_write",
766                        )
767                        .increment(1),
768                        Conflict::ReadWrite { .. } => metrics::counter!(
769                            "uni_ssi_serialization_conflicts_total",
770                            "kind" => "read_write",
771                        )
772                        .increment(1),
773                        Conflict::HistoryTruncated { .. } => {
774                            metrics::counter!("uni_ssi_history_truncated_total").increment(1)
775                        }
776                    }
777                    return Err(anyhow::Error::new(
778                        uni_common::UniError::SerializationConflict {
779                            message: conflict.to_string(),
780                        },
781                    ));
782                }
783            }
784
785            // Validate against the committed-but-unflushed overlay under
786            // `flush_lock`: serializable MERGE uniqueness + CRDT carve-out
787            // soundness. The current buffer alone does not hold all committed
788            // state — a flush rotation moves it onto `pending_flush` until the
789            // Lance write completes (the Bug #9A window, here at the
790            // commit-time layer) — so every check walks [current, pending…].
791            {
792                let pending = self.l0_manager.get_pending_flush();
793                let main_l0 = self.l0_manager.get_current();
794                let overlay: Vec<Arc<RwLock<L0Buffer>>> =
795                    std::iter::once(main_l0).chain(pending).collect();
796
797                // SSI / serializable MERGE: abort if a concurrent transaction has
798                // already committed a row with one of this transaction's unique
799                // keys. Commits serialize here, so this closes the race window
800                // left by the per-insert check. (Empty index → no iterations.)
801                for (key, vid) in &tx_l0.constraint_index {
802                    if overlay
803                        .iter()
804                        .any(|b| b.read().has_constraint_key(key, *vid))
805                    {
806                        metrics::counter!("uni_ssi_constraint_conflicts_total").increment(1);
807                        return Err(anyhow::Error::new(
808                            uni_common::UniError::ConstraintConflict {
809                                message: "unique key already committed by a concurrent \
810                                          transaction"
811                                    .to_string(),
812                            },
813                        ));
814                    }
815                }
816
817                // Implicit MERGE phantom guard: a `MERGE` that *created* a node
818                // registered its (label, key-props) here even with no declared
819                // UNIQUE constraint. If a concurrent transaction already committed
820                // the same MERGE key, abort retriably so the two converge to one
821                // node on retry (the loser's MATCH then finds the committed row).
822                // Only MERGE-creates register keys, so a plain CREATE of the same
823                // properties never lands here. (Empty index → no iterations.)
824                for (key, vid) in &tx_l0.merge_guard_index {
825                    if overlay
826                        .iter()
827                        .any(|b| b.read().has_merge_guard_key(key, *vid))
828                    {
829                        metrics::counter!("uni_ssi_constraint_conflicts_total").increment(1);
830                        return Err(anyhow::Error::new(
831                            uni_common::UniError::ConstraintConflict {
832                                message: "MERGE key already committed by a concurrent \
833                                          transaction"
834                                    .to_string(),
835                            },
836                        ));
837                    }
838                }
839
840                // Same race window for global ext_id uniqueness: the per-insert
841                // check ran against an older main L0; re-probe the committed
842                // index here, where commits serialize.
843                for (ext_id, vid) in &tx_l0.extid_index {
844                    let taken = overlay.iter().any(|b| {
845                        matches!(b.read().extid_index.get(ext_id), Some(&owner) if owner != *vid)
846                    });
847                    if taken {
848                        metrics::counter!("uni_ssi_constraint_conflicts_total").increment(1);
849                        return Err(anyhow::Error::new(
850                            uni_common::UniError::ConstraintConflict {
851                                message: format!(
852                                    "ext_id '{ext_id}' already committed by a concurrent \
853                                     transaction"
854                                ),
855                            },
856                        ));
857                    }
858                }
859
860                // CRDT carve-out soundness: a pure-CRDT write was dropped from the
861                // write-set assuming its merge commutes. If the overlay holds a
862                // *different* CRDT variant for the same property, the merge would
863                // silently overwrite it — abort instead of losing the update.
864                // (Checked against every overlay buffer: conservative if an old
865                // generation held a different variant that a newer commit already
866                // replaced, but an abort+retry is always sound.)
867                for buf in &overlay {
868                    if let Some(conflict) =
869                        crate::runtime::occ::crdt_carveout_overwrite(&tx_l0, &buf.read())
870                    {
871                        metrics::counter!("uni_ssi_crdt_aborts_total").increment(1);
872                        return Err(anyhow::Error::new(
873                            uni_common::UniError::SerializationConflict {
874                                message: conflict.to_string(),
875                            },
876                        ));
877                    }
878                }
879            }
880            Some(write_set)
881        } else {
882            None
883        };
884
885        // Issue #77: an edge whose endpoint is effectively deleted makes the
886        // merge below bail. That bail MUST happen before the durable WAL flush —
887        // after it the transaction is committed-but-unmerged (a ghost commit),
888        // and WAL replay re-hits the same bail, making the database unopenable.
889        // SSI validation was deliberately placed before the flush for exactly
890        // this reason; the endpoint check belongs here too. Runs unconditionally
891        // (issue #77 is not SSI-gated) under `flush_lock`, so the overlay
892        // tombstone state cannot change between here and the merge. A
893        // tombstone may live in a flush-rotated pending buffer rather than
894        // the current buffer, so the check walks the overlay newest-first.
895        {
896            let tx_l0 = tx_l0_arc.read();
897            self.validate_edge_endpoints_overlay(&tx_l0)?;
898        }
899
900        // Crash-recovery seam: SSI validation has passed; the transaction is
901        // about to become durable. A crash here must leave NO trace (validation
902        // happens before the WAL is touched). No-op unless `failpoints`.
903        fail::fail_point!("commit::after-validate");
904
905        // 1. Write transaction mutations to WAL BEFORE merging into main L0
906        // This ensures durability before visibility.
907        {
908            let tx_l0 = tx_l0_arc.read();
909            let main_l0_arc = self.l0_manager.get_current();
910            let main_l0 = main_l0_arc.read();
911
912            // If WAL exists, write mutations to it for durability
913            if let Some(wal) = main_l0.wal.as_ref() {
914                // Order: vertices first, then edges (to ensure src/dst exist on replay)
915
916                // Vertex insertions
917                for (vid, properties) in &tx_l0.vertex_properties {
918                    if !tx_l0.vertex_tombstones.contains(vid) {
919                        let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
920                        wal.append(crate::runtime::wal::Mutation::InsertVertex {
921                            vid: *vid,
922                            properties: properties.clone(),
923                            labels,
924                        })?;
925                    }
926                }
927
928                // Vertex deletions
929                for vid in &tx_l0.vertex_tombstones {
930                    let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
931                    wal.append(crate::runtime::wal::Mutation::DeleteVertex { vid: *vid, labels })?;
932                }
933
934                // Label-only mutations (SET n:Label / REMOVE n:Label). After
935                // vertex inserts (so the vertex exists on replay), before edges,
936                // and skipping vertices deleted in this same commit.
937                for vid in &tx_l0.vertex_label_overwrites {
938                    if tx_l0.vertex_tombstones.contains(vid) {
939                        continue;
940                    }
941                    let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
942                    wal.append(crate::runtime::wal::Mutation::SetVertexLabels {
943                        vid: *vid,
944                        labels,
945                    })?;
946                }
947
948                // Crash-recovery seam: vertices appended, edges not yet. Tests
949                // assert that a crash here (before `flush_wal`) recovers NOTHING
950                // — the durable commit point is the flush below, not append.
951                fail::fail_point!("commit::mid-wal");
952
953                // Edge insertions and deletions from edge_endpoints
954                for (eid, (src_vid, dst_vid, edge_type)) in &tx_l0.edge_endpoints {
955                    if tx_l0.tombstones.contains_key(eid) {
956                        let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
957                        wal.append(crate::runtime::wal::Mutation::DeleteEdge {
958                            eid: *eid,
959                            src_vid: *src_vid,
960                            dst_vid: *dst_vid,
961                            edge_type: *edge_type,
962                            version,
963                        })?;
964                    } else {
965                        let properties =
966                            tx_l0.edge_properties.get(eid).cloned().unwrap_or_default();
967                        let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
968                        let edge_type_name = tx_l0.edge_types.get(eid).cloned();
969                        wal.append(crate::runtime::wal::Mutation::InsertEdge {
970                            src_vid: *src_vid,
971                            dst_vid: *dst_vid,
972                            edge_type: *edge_type,
973                            eid: *eid,
974                            version,
975                            properties,
976                            edge_type_name,
977                        })?;
978                    }
979                }
980
981                // Tombstones for edges that only exist in the global L0 (not in
982                // this transaction's edge_endpoints).  Without this, deletes of
983                // pre-existing edges would be silently lost.
984                for (eid, tombstone) in &tx_l0.tombstones {
985                    if !tx_l0.edge_endpoints.contains_key(eid) {
986                        let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
987                        wal.append(crate::runtime::wal::Mutation::DeleteEdge {
988                            eid: *eid,
989                            src_vid: tombstone.src_vid,
990                            dst_vid: tombstone.dst_vid,
991                            edge_type: tombstone.edge_type,
992                            version,
993                        })?;
994                    }
995                }
996            }
997        }
998
999        // 2. Flush WAL to durable storage - THIS IS THE COMMIT POINT
1000        let wal_lsn = self.flush_wal().await?;
1001
1002        // Crash-recovery seam: the WAL is durable but main L0 has NOT merged.
1003        // A crash here must RECOVER the transaction on replay (it is committed),
1004        // even though it was never made visible in-process. No-op unless `failpoints`.
1005        fail::fail_point!("commit::after-wal-flush");
1006
1007        // Component C1: if an outstanding snapshot pins the current generation,
1008        // clone it aside (lazy copy-on-write) before merging, so the pinning
1009        // transaction's reads stay isolated from this commit. No-op — and zero
1010        // cost — when nothing is pinned (the common case). We hold `flush_lock`,
1011        // so this cannot race a flush rotate or another commit's merge; the merge
1012        // below re-fetches `get_current()`, landing in the fresh post-freeze buffer.
1013        // Self-gates on the runtime SSI toggle: a snapshot is only ever pinned by
1014        // a transaction begun under `ssi_enabled`, so `is_current_pinned()` is
1015        // always false when SSI is off and this is a zero-cost no-op.
1016        if self.l0_manager.is_current_pinned() {
1017            self.l0_manager.freeze_current_for_snapshot();
1018            metrics::counter!("uni_l0_snapshot_freezes_total").increment(1);
1019        }
1020
1021        // 3. Merge into main L0 and make visible
1022        {
1023            // Write-lock the tx buffer: `merge_take` moves its property maps
1024            // into main L0 instead of cloning them. The commit consumes the
1025            // transaction, so the drained maps are never observed afterwards;
1026            // everything read below (endpoints, versions, tombstones) is left
1027            // intact.
1028            let mut tx_l0 = tx_l0_arc.write();
1029            let main_l0_arc = self.l0_manager.get_current();
1030            let mut main_l0 = main_l0_arc.write();
1031            // A CRDT property's committed state may live only in a
1032            // flush-rotated pending buffer (the post-rotation current is
1033            // empty until the Lance write completes — the Bug #9A window).
1034            // `merge_crdt_properties` merges against the CURRENT buffer's
1035            // value — without seeding, the tx's CRDT state would SHADOW the
1036            // pending buffer's at read time (newest buffer wins per property)
1037            // and concurrent increments would be lost. Seed the newest
1038            // overlay value for each CRDT property the tx writes that current
1039            // lacks, so the merge below merges instead of replaces.
1040            self.seed_crdt_state_from_chain(&tx_l0, &mut main_l0);
1041            main_l0.merge_take(&mut tx_l0)?;
1042
1043            // Replay transaction edges into the AdjacencyManager overlay
1044            for (eid, (src, dst, etype)) in &tx_l0.edge_endpoints {
1045                let edge_version = tx_l0
1046                    .edge_versions
1047                    .get(eid)
1048                    .copied()
1049                    .unwrap_or(main_l0.current_version);
1050                if tx_l0.tombstones.contains_key(eid) {
1051                    self.adjacency_manager
1052                        .add_tombstone(*eid, *src, *dst, *etype, edge_version);
1053                } else {
1054                    self.adjacency_manager
1055                        .insert_edge(*src, *dst, *eid, *etype, edge_version);
1056                }
1057            }
1058
1059            // Replay tombstones for edges that only exist in the global L0
1060            // (not in this transaction's edge_endpoints).
1061            for (eid, tombstone) in &tx_l0.tombstones {
1062                if !tx_l0.edge_endpoints.contains_key(eid) {
1063                    let edge_version = tx_l0
1064                        .edge_versions
1065                        .get(eid)
1066                        .copied()
1067                        .unwrap_or(main_l0.current_version);
1068                    self.adjacency_manager.add_tombstone(
1069                        *eid,
1070                        tombstone.src_vid,
1071                        tombstone.dst_vid,
1072                        tombstone.edge_type,
1073                        edge_version,
1074                    );
1075                }
1076            }
1077        }
1078
1079        // Crash-recovery seam: durable AND merged, but the in-memory commit
1080        // registry has not recorded this write-set yet. A crash here is
1081        // indistinguishable from one at `after-wal-flush` on reopen (the
1082        // registry is in-memory and rebuilt empty); the tx still recovers.
1083        fail::fail_point!("commit::after-merge");
1084
1085        // SSI: register this commit's write-set under a fresh commit sequence so
1086        // later transactions detect conflicts against it. Still under
1087        // `flush_lock`, before the async-flush branch can drop the guard.
1088        // `occ_write_set` is `Some` only when `config.ssi_enabled`.
1089        if let Some(write_set) = occ_write_set
1090            && !write_set.is_empty()
1091        {
1092            let seq = self.commit_sequence.fetch_add(1, Ordering::Relaxed) + 1;
1093            self.committed_writes.lock().record(seq, write_set);
1094        }
1095
1096        self.update_metrics();
1097
1098        // 4. Best-effort post-commit auto-flush.
1099        //
1100        // Two paths:
1101        // - async_flush_enabled = false (default): inline under our
1102        //   existing flush_lock guard via flush_inline_under_lock.
1103        // - async_flush_enabled = true: rotate inline, drop flush_lock,
1104        //   then submit the stream phase to the coordinator. Gated on
1105        //   `pending_flush_count() < max_pending_flushes` so we don't
1106        //   stack up rotations beyond the configured pipeline depth.
1107        //   `try_acquire_permit` is non-blocking: if we lose the race
1108        //   for the last permit, we just skip this trigger (the next
1109        //   commit retries).
1110        let mut flush_pending = false;
1111        if self.should_flush() {
1112            if self.config.async_flush_enabled
1113                && let Some(coord) = self.flush_coordinator.as_ref()
1114                && coord.pending_flush_count() < self.config.max_pending_flushes
1115            {
1116                match coord.try_acquire_permit() {
1117                    Some(permit) => {
1118                        match self.flush_l0_rotate().await {
1119                            Ok(rotate_out) => {
1120                                // Allocate the rotate seq and bump pending ONLY
1121                                // after the rotate succeeds (Bug #3). A failed
1122                                // rotate must consume neither: the finalizer
1123                                // advances strictly in consecutive seq order and
1124                                // only decrements pending on finalize, so a
1125                                // leaked seq/pending from a failed rotate would
1126                                // wedge the finalizer forever and climb pending
1127                                // toward `max_pending_flushes`. The seq is still
1128                                // allocated under `flush_lock` (immediately after
1129                                // the rotate, before the guard drops below), so
1130                                // concurrent rotates keep seq order == rotation
1131                                // order, and the seq is not used until submit.
1132                                let seq = coord.next_rotate_seq();
1133                                coord.note_pending();
1134                                // Release flush_lock BEFORE the spawn so concurrent
1135                                // commits can proceed while the stream runs.
1136                                drop(_flush_lock_guard);
1137                                let parent_manifest = self.cached_manifest.lock().clone();
1138                                let rotated = crate::runtime::flush_coordinator::RotatedFlush {
1139                                    seq,
1140                                    old_l0_arc: rotate_out.old_l0_arc.clone(),
1141                                    wal_lsn: rotate_out.wal_lsn,
1142                                    current_version: rotate_out.current_version,
1143                                    name: None,
1144                                    parent_manifest,
1145                                    permit,
1146                                    flush_in_progress_guard: rotate_out.flush_in_progress_guard,
1147                                };
1148                                let writer = self.clone();
1149                                let _ticket = coord.submit_for_stream(
1150                                    rotated,
1151                                    move |old_l0, wal, ver, n| async move {
1152                                        let outcome =
1153                                            writer.flush_stream_l1(old_l0, wal, ver, n).await?;
1154                                        Ok(crate::runtime::flush_coordinator::FlushOutcome {
1155                                            new_manifest: outcome.manifest,
1156                                            snapshot_id: outcome.snapshot_id,
1157                                        })
1158                                    },
1159                                );
1160                                flush_pending = true;
1161                                // Early return — flush_lock already dropped.
1162                                return Ok((wal_lsn, flush_pending));
1163                            }
1164                            Err(e) => {
1165                                tracing::warn!("Async rotate failed (non-critical): {}", e);
1166                                // No seq was allocated and pending was not
1167                                // bumped (both moved into the Ok arm for Bug
1168                                // #3), so the finalizer is not wedged. The
1169                                // permit drops here, freeing the slot.
1170                            }
1171                        }
1172                    }
1173                    None => {
1174                        // Race: someone else grabbed the last permit. Skip;
1175                        // next commit will retry should_flush().
1176                        metrics::counter!("uni_flush_trigger_skipped_total").increment(1);
1177                    }
1178                }
1179            } else if let Err(e) = self.flush_inline_under_lock(None).await {
1180                tracing::warn!("Post-commit flush check failed (non-critical): {}", e);
1181            }
1182        }
1183
1184        Ok((wal_lsn, flush_pending))
1185    }
1186
1187    /// Flush the WAL buffer to durable storage.
1188    ///
1189    /// Returns the LSN of the flushed segment, or `0` when no WAL is configured.
1190    pub async fn flush_wal(&self) -> Result<u64> {
1191        let l0 = self.l0_manager.get_current();
1192        let wal = l0.read().wal.clone();
1193
1194        match wal {
1195            Some(wal) => Ok(wal.flush().await?),
1196            None => Ok(0),
1197        }
1198    }
1199
1200    /// Record property removals in the active L0 mutation stats.
1201    ///
1202    /// Routes to the transaction L0 if provided, otherwise to the main L0.
1203    pub fn track_properties_removed(&self, count: usize, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) {
1204        if count == 0 {
1205            return;
1206        }
1207        let l0 = self.resolve_l0(tx_l0);
1208        l0.write().mutation_stats.properties_removed += count;
1209    }
1210
1211    /// Validates vertex constraints for the given properties.
1212    /// In the new design, label is passed as a parameter since VID no longer embeds label.
1213    async fn validate_vertex_constraints_for_label(
1214        &self,
1215        vid: Vid,
1216        properties: &Properties,
1217        label: &str,
1218        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1219    ) -> Result<()> {
1220        self.validate_vertex_constraints_for_label_impl(vid, properties, label, tx_l0, false)
1221            .await
1222    }
1223
1224    /// Partial-update sibling: validates only constraints touching keys
1225    /// present in `properties` (the touched set). NOT NULL is checked
1226    /// only for touched keys; multi-key UNIQUE / CHECK / EXISTS are
1227    /// skipped when any referenced key is absent (the caller is
1228    /// expected to have routed to the full-row path in that case via
1229    /// `touched_needs_full_read`).
1230    async fn validate_vertex_constraints_for_label_partial(
1231        &self,
1232        vid: Vid,
1233        properties: &Properties,
1234        label: &str,
1235        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1236    ) -> Result<()> {
1237        self.validate_vertex_constraints_for_label_impl(vid, properties, label, tx_l0, true)
1238            .await
1239    }
1240
1241    async fn validate_vertex_constraints_for_label_impl(
1242        &self,
1243        vid: Vid,
1244        properties: &Properties,
1245        label: &str,
1246        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1247        partial: bool,
1248    ) -> Result<()> {
1249        let schema = self.schema_manager.schema();
1250
1251        {
1252            // 1. Check NOT NULL constraints (from Property definitions).
1253            //    Under partial-update mode, skip properties NOT in
1254            //    `properties` — they retain their previous (already-
1255            //    validated) value.
1256            if let Some(props_meta) = schema.properties.get(label) {
1257                for (prop_name, meta) in props_meta {
1258                    if !meta.nullable {
1259                        let present = properties.get(prop_name);
1260                        if partial && present.is_none() {
1261                            continue;
1262                        }
1263                        if present.is_none_or(|v| v.is_null()) {
1264                            log::warn!(
1265                                "Constraint violation: Property '{}' cannot be null for label '{}'",
1266                                prop_name,
1267                                label
1268                            );
1269                            return Err(anyhow!(
1270                                "Constraint violation: Property '{}' cannot be null",
1271                                prop_name
1272                            ));
1273                        }
1274                    }
1275                }
1276            }
1277
1278            // 2. Check Explicit Constraints (Unique, Check, etc.)
1279            for constraint in &schema.constraints {
1280                if !constraint.enabled {
1281                    continue;
1282                }
1283                match &constraint.target {
1284                    ConstraintTarget::Label(l) if l == label => {}
1285                    _ => continue,
1286                }
1287
1288                match &constraint.constraint_type {
1289                    ConstraintType::Unique {
1290                        properties: unique_props,
1291                    } => {
1292                        // Support single and multi-property unique constraints
1293                        if !unique_props.is_empty() {
1294                            let mut key_values = Vec::new();
1295                            let mut missing = false;
1296                            for prop in unique_props {
1297                                if let Some(val) = properties.get(prop) {
1298                                    key_values.push((prop.clone(), val.clone()));
1299                                } else {
1300                                    missing = true; // Can't enforce if property missing (partial update?)
1301                                    // For INSERT, missing means null?
1302                                    // If property is nullable, unique constraint typically allows multiple nulls or ignores?
1303                                    // For now, only check if ALL keys are present
1304                                }
1305                            }
1306
1307                            if !missing {
1308                                self.check_unique_constraint_multi(label, &key_values, vid, tx_l0)
1309                                    .await?;
1310                            }
1311                        }
1312                    }
1313                    ConstraintType::Exists { property } => {
1314                        if properties.get(property).is_none_or(|v| v.is_null()) {
1315                            log::warn!(
1316                                "Constraint violation: Property '{}' must exist for label '{}'",
1317                                property,
1318                                label
1319                            );
1320                            return Err(anyhow!(
1321                                "Constraint violation: Property '{}' must exist",
1322                                property
1323                            ));
1324                        }
1325                    }
1326                    ConstraintType::Check { expression } => {
1327                        if !self.evaluate_check_constraint(expression, properties)? {
1328                            return Err(anyhow!(
1329                                "CHECK constraint '{}' violated: expression '{}' evaluated to false",
1330                                constraint.name,
1331                                expression
1332                            ));
1333                        }
1334                    }
1335                    _ => {
1336                        return Err(anyhow!("Unsupported constraint type"));
1337                    }
1338                }
1339            }
1340        }
1341        Ok(())
1342    }
1343
1344    /// Validates vertex constraints for a vertex with the given labels.
1345    /// Labels must be passed explicitly since the vertex may not yet be in L0.
1346    /// Unknown labels (not in schema) are skipped.
1347    async fn validate_vertex_constraints(
1348        &self,
1349        vid: Vid,
1350        properties: &Properties,
1351        labels: &[String],
1352        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1353    ) -> Result<()> {
1354        let schema = self.schema_manager.schema();
1355
1356        // Validate constraints only for known labels
1357        for label in labels {
1358            // Skip unknown labels (schemaless support)
1359            if schema.get_label_case_insensitive(label).is_none() {
1360                continue;
1361            }
1362            self.validate_vertex_constraints_for_label(vid, properties, label, tx_l0)
1363                .await?;
1364        }
1365
1366        // Check global ext_id uniqueness if ext_id is provided
1367        if let Some(ext_id) = properties.get("ext_id").and_then(|v| v.as_str()) {
1368            self.check_extid_globally_unique(ext_id, vid, tx_l0).await?;
1369        }
1370
1371        Ok(())
1372    }
1373
1374    /// Partial sibling of `validate_vertex_constraints` — validates only
1375    /// constraints touching keys present in `properties`. Used by
1376    /// `insert_vertex_partial`'s fast path; the caller pre-screens for
1377    /// multi-key UNIQUE constraints via `touched_needs_full_read`.
1378    async fn validate_vertex_constraints_partial(
1379        &self,
1380        vid: Vid,
1381        touched: &Properties,
1382        labels: &[String],
1383        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1384    ) -> Result<()> {
1385        let schema = self.schema_manager.schema();
1386        for label in labels {
1387            if schema.get_label_case_insensitive(label).is_none() {
1388                continue;
1389            }
1390            self.validate_vertex_constraints_for_label_partial(vid, touched, label, tx_l0)
1391                .await?;
1392        }
1393        if let Some(ext_id) = touched.get("ext_id").and_then(|v| v.as_str()) {
1394            self.check_extid_globally_unique(ext_id, vid, tx_l0).await?;
1395        }
1396        Ok(())
1397    }
1398
1399    /// Collect ext_ids and unique constraint keys from an iterator of vertex properties.
1400    ///
1401    /// Used to build a constraint key index from L0 buffers for batch validation.
1402    fn collect_constraint_keys_from_properties<'a>(
1403        properties_iter: impl Iterator<Item = &'a Properties>,
1404        label: &str,
1405        constraints: &[uni_common::core::schema::Constraint],
1406        existing_keys: &mut HashMap<String, HashSet<String>>,
1407        existing_extids: &mut HashSet<String>,
1408    ) {
1409        for props in properties_iter {
1410            if let Some(ext_id) = props.get("ext_id").and_then(|v| v.as_str()) {
1411                existing_extids.insert(ext_id.to_string());
1412            }
1413
1414            for constraint in constraints {
1415                if !constraint.enabled {
1416                    continue;
1417                }
1418                if let ConstraintTarget::Label(l) = &constraint.target {
1419                    if l != label {
1420                        continue;
1421                    }
1422                } else {
1423                    continue;
1424                }
1425
1426                if let ConstraintType::Unique {
1427                    properties: unique_props,
1428                } = &constraint.constraint_type
1429                {
1430                    let mut key_parts = Vec::new();
1431                    let mut all_present = true;
1432                    for prop in unique_props {
1433                        if let Some(val) = props.get(prop) {
1434                            key_parts.push(format!("{}:{}", prop, val));
1435                        } else {
1436                            all_present = false;
1437                            break;
1438                        }
1439                    }
1440                    if all_present {
1441                        let key = key_parts.join("|");
1442                        existing_keys
1443                            .entry(constraint.name.clone())
1444                            .or_default()
1445                            .insert(key);
1446                    }
1447                }
1448            }
1449        }
1450    }
1451
1452    /// Validates constraints for a batch of vertices efficiently.
1453    ///
1454    /// This method builds an in-memory index from L0 buffers ONCE instead of scanning
1455    /// per vertex, reducing complexity from O(n²) to O(n) for bulk inserts.
1456    ///
1457    /// # Arguments
1458    /// * `vids` - VIDs of vertices being inserted
1459    /// * `properties_batch` - Properties for each vertex
1460    /// * `label` - Label for all vertices (assumes single label for now)
1461    ///
1462    /// # Performance
1463    /// For N vertices with unique constraints:
1464    /// - Old approach: O(N²) - scan L0 buffer N times
1465    /// - New approach: O(N) - scan L0 buffer once, build HashSet, check each vertex in O(1)
1466    async fn validate_vertex_batch_constraints(
1467        &self,
1468        vids: &[Vid],
1469        properties_batch: &[Properties],
1470        label: &str,
1471        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1472    ) -> Result<()> {
1473        if vids.len() != properties_batch.len() {
1474            return Err(anyhow!("VID/properties length mismatch"));
1475        }
1476
1477        let schema = self.schema_manager.schema();
1478
1479        // 1. Validate NOT NULL constraints for each vertex
1480        if let Some(props_meta) = schema.properties.get(label) {
1481            for (idx, properties) in properties_batch.iter().enumerate() {
1482                for (prop_name, meta) in props_meta {
1483                    if !meta.nullable && properties.get(prop_name).is_none_or(|v| v.is_null()) {
1484                        return Err(anyhow!(
1485                            "Constraint violation at index {}: Property '{}' cannot be null",
1486                            idx,
1487                            prop_name
1488                        ));
1489                    }
1490                }
1491            }
1492        }
1493
1494        // 2. Build constraint key index from L0 buffers (ONCE for entire batch)
1495        let mut existing_keys: HashMap<String, HashSet<String>> = HashMap::new();
1496        let mut existing_extids: HashSet<String> = HashSet::new();
1497
1498        // Scan current L0 buffer
1499        {
1500            let l0 = self.l0_manager.get_current();
1501            let l0_guard = l0.read();
1502            Self::collect_constraint_keys_from_properties(
1503                l0_guard.vertex_properties.values(),
1504                label,
1505                &schema.constraints,
1506                &mut existing_keys,
1507                &mut existing_extids,
1508            );
1509        }
1510
1511        // Scan transaction L0 if present
1512        if let Some(tx_l0) = tx_l0 {
1513            let tx_l0_guard = tx_l0.read();
1514            Self::collect_constraint_keys_from_properties(
1515                tx_l0_guard.vertex_properties.values(),
1516                label,
1517                &schema.constraints,
1518                &mut existing_keys,
1519                &mut existing_extids,
1520            );
1521        }
1522
1523        // 3. Check batch vertices against index AND check for duplicates within batch
1524        let mut batch_keys: HashMap<String, HashMap<String, usize>> = HashMap::new();
1525        let mut batch_extids: HashMap<String, usize> = HashMap::new();
1526
1527        for (idx, (_vid, properties)) in vids.iter().zip(properties_batch.iter()).enumerate() {
1528            // Check ext_id uniqueness
1529            if let Some(ext_id) = properties.get("ext_id").and_then(|v| v.as_str()) {
1530                if existing_extids.contains(ext_id) {
1531                    return Err(anyhow!(
1532                        "Constraint violation at index {}: ext_id '{}' already exists",
1533                        idx,
1534                        ext_id
1535                    ));
1536                }
1537                if let Some(first_idx) = batch_extids.get(ext_id) {
1538                    return Err(anyhow!(
1539                        "Constraint violation: ext_id '{}' duplicated in batch at indices {} and {}",
1540                        ext_id,
1541                        first_idx,
1542                        idx
1543                    ));
1544                }
1545                batch_extids.insert(ext_id.to_string(), idx);
1546            }
1547
1548            // Check unique constraints
1549            for constraint in &schema.constraints {
1550                if !constraint.enabled {
1551                    continue;
1552                }
1553                if let ConstraintTarget::Label(l) = &constraint.target {
1554                    if l != label {
1555                        continue;
1556                    }
1557                } else {
1558                    continue;
1559                }
1560
1561                match &constraint.constraint_type {
1562                    ConstraintType::Unique {
1563                        properties: unique_props,
1564                    } => {
1565                        let mut key_parts = Vec::new();
1566                        let mut all_present = true;
1567                        for prop in unique_props {
1568                            if let Some(val) = properties.get(prop) {
1569                                key_parts.push(format!("{}:{}", prop, val));
1570                            } else {
1571                                all_present = false;
1572                                break;
1573                            }
1574                        }
1575
1576                        if all_present {
1577                            let key = key_parts.join("|");
1578
1579                            // Check against existing L0 keys
1580                            if let Some(keys) = existing_keys.get(&constraint.name)
1581                                && keys.contains(&key)
1582                            {
1583                                return Err(anyhow!(
1584                                    "Constraint violation at index {}: Duplicate composite key for label '{}' (constraint '{}')",
1585                                    idx,
1586                                    label,
1587                                    constraint.name
1588                                ));
1589                            }
1590
1591                            // Check for duplicates within batch
1592                            let batch_constraint_keys =
1593                                batch_keys.entry(constraint.name.clone()).or_default();
1594                            if let Some(first_idx) = batch_constraint_keys.get(&key) {
1595                                return Err(anyhow!(
1596                                    "Constraint violation: Duplicate key '{}' in batch at indices {} and {}",
1597                                    key,
1598                                    first_idx,
1599                                    idx
1600                                ));
1601                            }
1602                            batch_constraint_keys.insert(key, idx);
1603                        }
1604                    }
1605                    ConstraintType::Exists { property }
1606                        if properties.get(property).is_none_or(|v| v.is_null()) =>
1607                    {
1608                        return Err(anyhow!(
1609                            "Constraint violation at index {}: Property '{}' must exist",
1610                            idx,
1611                            property
1612                        ));
1613                    }
1614                    ConstraintType::Check { expression }
1615                        if !self.evaluate_check_constraint(expression, properties)? =>
1616                    {
1617                        return Err(anyhow!(
1618                            "Constraint violation at index {}: CHECK constraint '{}' violated",
1619                            idx,
1620                            constraint.name
1621                        ));
1622                    }
1623                    _ => {}
1624                }
1625            }
1626        }
1627
1628        // 4. Check storage for unique constraints (can batch this into a single query)
1629        for constraint in &schema.constraints {
1630            if !constraint.enabled {
1631                continue;
1632            }
1633            if let ConstraintTarget::Label(l) = &constraint.target {
1634                if l != label {
1635                    continue;
1636                }
1637            } else {
1638                continue;
1639            }
1640
1641            if let ConstraintType::Unique {
1642                properties: unique_props,
1643            } = &constraint.constraint_type
1644            {
1645                // Build compound OR filter for all batch vertices
1646                let mut or_filters = Vec::new();
1647                for properties in properties_batch.iter() {
1648                    let mut and_parts = Vec::new();
1649                    let mut all_present = true;
1650                    for prop in unique_props {
1651                        if let Some(val) = properties.get(prop) {
1652                            let val_str = match val {
1653                                Value::String(s) => format!("'{}'", s.replace('\'', "''")),
1654                                Value::Int(n) => n.to_string(),
1655                                Value::Float(f) => f.to_string(),
1656                                Value::Bool(b) => b.to_string(),
1657                                _ => {
1658                                    all_present = false;
1659                                    break;
1660                                }
1661                            };
1662                            and_parts.push(format!("{} = {}", prop, val_str));
1663                        } else {
1664                            all_present = false;
1665                            break;
1666                        }
1667                    }
1668                    if all_present {
1669                        or_filters.push(format!("({})", and_parts.join(" AND ")));
1670                    }
1671                }
1672
1673                #[cfg(feature = "lance-backend")]
1674                if !or_filters.is_empty() {
1675                    let vid_list: Vec<String> =
1676                        vids.iter().map(|v| v.as_u64().to_string()).collect();
1677                    let filter = format!(
1678                        "({}) AND _deleted = false AND _vid NOT IN ({})",
1679                        or_filters.join(" OR "),
1680                        vid_list.join(", ")
1681                    );
1682
1683                    if let Ok(ds) = self.storage.vertex_dataset(label)
1684                        && let Ok(lance_ds) = ds.open_raw().await
1685                    {
1686                        let count = lance_ds.count_rows(Some(filter.clone())).await?;
1687                        if count > 0 {
1688                            return Err(anyhow!(
1689                                "Constraint violation: Duplicate composite key for label '{}' in storage (constraint '{}')",
1690                                label,
1691                                constraint.name
1692                            ));
1693                        }
1694                    }
1695                }
1696            }
1697        }
1698
1699        Ok(())
1700    }
1701
1702    /// Checks that ext_id is globally unique across all vertices.
1703    ///
1704    /// Searches L0 buffers (current, transaction, pending) and the main vertices table
1705    /// to ensure no other vertex uses this ext_id.
1706    ///
1707    /// # Errors
1708    ///
1709    /// Returns error if another vertex with the same ext_id exists.
1710    async fn check_extid_globally_unique(
1711        &self,
1712        ext_id: &str,
1713        current_vid: Vid,
1714        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1715    ) -> Result<()> {
1716        // Check L0 buffers: current, transaction, and pending flush
1717        let l0_buffers_to_check: Vec<Arc<RwLock<L0Buffer>>> = {
1718            let mut buffers = vec![self.l0_manager.get_current()];
1719            if let Some(tx_l0) = tx_l0 {
1720                buffers.push(tx_l0.clone());
1721            }
1722            buffers.extend(self.l0_manager.get_pending_flush());
1723            buffers
1724        };
1725
1726        for l0 in &l0_buffers_to_check {
1727            // O(1) per buffer via the maintained `extid_index` (the previous
1728            // full `vertex_properties` scan made constrained ingest O(n²)).
1729            if let Some(&vid) = l0.read().extid_index.get(ext_id)
1730                && vid != current_vid
1731            {
1732                return Err(anyhow!(
1733                    "Constraint violation: ext_id '{}' already exists (vertex {:?})",
1734                    ext_id,
1735                    vid
1736                ));
1737            }
1738        }
1739
1740        // Check main vertices table (if it exists)
1741        // Pass None for global uniqueness check (not snapshot-isolated)
1742        let backend = self.storage.backend();
1743        if let Ok(Some(found_vid)) = MainVertexDataset::find_by_ext_id(backend, ext_id, None).await
1744            && found_vid != current_vid
1745        {
1746            return Err(anyhow!(
1747                "Constraint violation: ext_id '{}' already exists (vertex {:?})",
1748                ext_id,
1749                found_vid
1750            ));
1751        }
1752
1753        Ok(())
1754    }
1755
1756    /// Helper to get vertex labels from L0 buffer.
1757    fn get_vertex_labels_from_l0(&self, vid: Vid) -> Option<Vec<String>> {
1758        let l0 = self.l0_manager.get_current();
1759        let l0_guard = l0.read();
1760        // Check if vertex is tombstoned (deleted) - if so, return None
1761        if l0_guard.vertex_tombstones.contains(&vid) {
1762            return None;
1763        }
1764        l0_guard.get_vertex_labels(vid).map(|l| l.to_vec())
1765    }
1766
1767    /// Get vertex labels from all sources: current L0, pending L0s, and storage.
1768    /// This is the proper way to read vertex labels after a flush, as it checks both
1769    /// in-memory buffers and persisted storage.
1770    pub async fn get_vertex_labels(
1771        &self,
1772        vid: Vid,
1773        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1774    ) -> Option<Vec<String>> {
1775        // 1. Check current L0
1776        if let Some(labels) = self.get_vertex_labels_from_l0(vid) {
1777            return Some(labels);
1778        }
1779
1780        // 2. Check transaction L0 if present
1781        if let Some(tx_l0) = tx_l0 {
1782            let guard = tx_l0.read();
1783            if guard.vertex_tombstones.contains(&vid) {
1784                return None;
1785            }
1786            if let Some(labels) = guard.get_vertex_labels(vid) {
1787                return Some(labels.to_vec());
1788            }
1789        }
1790
1791        // 3. Check pending flush L0s
1792        for pending_l0 in self.l0_manager.get_pending_flush() {
1793            let guard = pending_l0.read();
1794            if guard.vertex_tombstones.contains(&vid) {
1795                return None;
1796            }
1797            if let Some(labels) = guard.get_vertex_labels(vid) {
1798                return Some(labels.to_vec());
1799            }
1800        }
1801
1802        // 4. Check storage
1803        self.find_vertex_labels_in_storage(vid).await.ok().flatten()
1804    }
1805
1806    /// Helper to get edge type from L0 buffer.
1807    fn get_edge_type_from_l0(&self, eid: Eid) -> Option<String> {
1808        let l0 = self.l0_manager.get_current();
1809        let l0_guard = l0.read();
1810        l0_guard.get_edge_type(eid).map(|s| s.to_string())
1811    }
1812
1813    /// Look up the edge type ID (u32) for an EID from the L0 buffer's edge endpoints.
1814    /// Falls back to the transaction L0 if available.
1815    pub fn get_edge_type_id_from_l0(
1816        &self,
1817        eid: Eid,
1818        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1819    ) -> Option<u32> {
1820        // Check transaction L0 first
1821        if let Some(tx_l0) = tx_l0 {
1822            let guard = tx_l0.read();
1823            if let Some((_, _, etype)) = guard.get_edge_endpoint_full(eid) {
1824                return Some(etype);
1825            }
1826        }
1827        // Fall back to main L0
1828        let l0 = self.l0_manager.get_current();
1829        let l0_guard = l0.read();
1830        l0_guard
1831            .get_edge_endpoint_full(eid)
1832            .map(|(_, _, etype)| etype)
1833    }
1834
1835    /// Set the type name for an edge (used for schemaless edge types).
1836    /// This is called during CREATE for edge types not found in the schema.
1837    pub fn set_edge_type(
1838        &self,
1839        eid: Eid,
1840        type_name: String,
1841        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1842    ) {
1843        self.resolve_l0(tx_l0).write().set_edge_type(eid, type_name);
1844    }
1845
1846    /// Evaluate a simple CHECK constraint expression.
1847    /// Supports: "property op value" (e.g., "age > 18", "status = 'active'")
1848    fn evaluate_check_constraint(&self, expression: &str, properties: &Properties) -> Result<bool> {
1849        let parts: Vec<&str> = expression.split_whitespace().collect();
1850        if parts.len() != 3 {
1851            // For now, only support "prop op val"
1852            // Fallback to true if too complex to avoid breaking, but warn
1853            log::warn!(
1854                "Complex CHECK constraint expression '{}' not fully supported yet; allowing write.",
1855                expression
1856            );
1857            return Ok(true);
1858        }
1859
1860        let prop_part = parts[0].trim_start_matches('(');
1861        // Handle "variable.property" format - take the part after the dot
1862        let prop_name = if let Some(idx) = prop_part.find('.') {
1863            &prop_part[idx + 1..]
1864        } else {
1865            prop_part
1866        };
1867
1868        let op = parts[1];
1869        let val_str = parts[2].trim_end_matches(')');
1870
1871        let prop_val = match properties.get(prop_name) {
1872            Some(v) => v,
1873            None => return Ok(true), // If property missing, CHECK usually passes (unless NOT NULL)
1874        };
1875
1876        // Parse value string (handle quotes for strings)
1877        let target_val = if (val_str.starts_with('\'') && val_str.ends_with('\''))
1878            || (val_str.starts_with('"') && val_str.ends_with('"'))
1879        {
1880            Value::String(val_str[1..val_str.len() - 1].to_string())
1881        } else if let Ok(n) = val_str.parse::<i64>() {
1882            Value::Int(n)
1883        } else if let Ok(n) = val_str.parse::<f64>() {
1884            Value::Float(n)
1885        } else if let Ok(b) = val_str.parse::<bool>() {
1886            Value::Bool(b)
1887        } else {
1888            // Check for internal format wrappers if they somehow leaked through
1889            if val_str.starts_with("Number(") && val_str.ends_with(')') {
1890                let n_str = &val_str[7..val_str.len() - 1];
1891                if let Ok(n) = n_str.parse::<i64>() {
1892                    Value::Int(n)
1893                } else if let Ok(n) = n_str.parse::<f64>() {
1894                    Value::Float(n)
1895                } else {
1896                    Value::String(val_str.to_string())
1897                }
1898            } else {
1899                Value::String(val_str.to_string())
1900            }
1901        };
1902
1903        match op {
1904            "=" | "==" => Ok(prop_val == &target_val),
1905            "!=" | "<>" => Ok(prop_val != &target_val),
1906            ">" => self
1907                .compare_values(prop_val, &target_val)
1908                .map(|o| o.is_gt()),
1909            "<" => self
1910                .compare_values(prop_val, &target_val)
1911                .map(|o| o.is_lt()),
1912            ">=" => self
1913                .compare_values(prop_val, &target_val)
1914                .map(|o| o.is_ge()),
1915            "<=" => self
1916                .compare_values(prop_val, &target_val)
1917                .map(|o| o.is_le()),
1918            _ => {
1919                log::warn!("Unsupported operator '{}' in CHECK constraint", op);
1920                Ok(true)
1921            }
1922        }
1923    }
1924
1925    fn compare_values(&self, a: &Value, b: &Value) -> Result<std::cmp::Ordering> {
1926        use std::cmp::Ordering;
1927
1928        fn cmp_f64(x: f64, y: f64) -> Ordering {
1929            x.partial_cmp(&y).unwrap_or(Ordering::Equal)
1930        }
1931
1932        match (a, b) {
1933            (Value::Int(n1), Value::Int(n2)) => Ok(n1.cmp(n2)),
1934            (Value::Float(f1), Value::Float(f2)) => Ok(cmp_f64(*f1, *f2)),
1935            (Value::Int(n), Value::Float(f)) => Ok(cmp_f64(*n as f64, *f)),
1936            (Value::Float(f), Value::Int(n)) => Ok(cmp_f64(*f, *n as f64)),
1937            (Value::String(s1), Value::String(s2)) => Ok(s1.cmp(s2)),
1938            _ => Err(anyhow!(
1939                "Cannot compare incompatible types: {:?} vs {:?}",
1940                a,
1941                b
1942            )),
1943        }
1944    }
1945
1946    async fn check_unique_constraint_multi(
1947        &self,
1948        label: &str,
1949        key_values: &[(String, Value)],
1950        current_vid: Vid,
1951        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1952    ) -> Result<()> {
1953        // Serialize constraint key once for O(1) lookups
1954        let key = serialize_constraint_key(label, key_values);
1955
1956        // 1. Check L0 (in-memory) using O(1) constraint index
1957        {
1958            let l0 = self.l0_manager.get_current();
1959            let l0_guard = l0.read();
1960            if l0_guard.has_constraint_key(&key, current_vid) {
1961                return Err(anyhow!(
1962                    "Constraint violation: Duplicate composite key for label '{}'",
1963                    label
1964                ));
1965            }
1966        }
1967
1968        // 1b. Check pending-flush buffers (Bug #9A). A flush rotates a key's
1969        // buffer onto `pending_flush` and installs a fresh empty current
1970        // buffer; until the rotated rows reach Lance the key is invisible to
1971        // both the current-buffer check above and the storage check below, so
1972        // a duplicate could slip through that flush window. Mirror the read
1973        // paths (e.g. `check_extid_globally_unique`, `get_vertex_labels`) that
1974        // already consult `pending_flush`.
1975        for pending_l0 in self.l0_manager.get_pending_flush() {
1976            if pending_l0.read().has_constraint_key(&key, current_vid) {
1977                return Err(anyhow!(
1978                    "Constraint violation: Duplicate composite key for label '{}' (in pending flush)",
1979                    label
1980                ));
1981            }
1982        }
1983
1984        // Check Transaction L0
1985        if let Some(tx_l0) = tx_l0 {
1986            let tx_l0_guard = tx_l0.read();
1987            if tx_l0_guard.has_constraint_key(&key, current_vid) {
1988                return Err(anyhow!(
1989                    "Constraint violation: Duplicate composite key for label '{}' (in tx)",
1990                    label
1991                ));
1992            }
1993        }
1994
1995        // 2. Check Storage (L1/L2)
1996        let filters: Vec<String> = key_values
1997            .iter()
1998            .map(|(prop, val)| {
1999                let val_str = match val {
2000                    Value::String(s) => format!("'{}'", s.replace('\'', "''")),
2001                    Value::Int(n) => n.to_string(),
2002                    Value::Float(f) => f.to_string(),
2003                    Value::Bool(b) => b.to_string(),
2004                    _ => "NULL".to_string(),
2005                };
2006                format!("{} = {}", prop, val_str)
2007            })
2008            .collect();
2009
2010        let mut filter = filters.join(" AND ");
2011        filter.push_str(&format!(
2012            " AND _deleted = false AND _vid != {}",
2013            current_vid.as_u64()
2014        ));
2015
2016        #[cfg(feature = "lance-backend")]
2017        if let Ok(ds) = self.storage.vertex_dataset(label)
2018            && let Ok(lance_ds) = ds.open_raw().await
2019        {
2020            let count = lance_ds.count_rows(Some(filter.clone())).await?;
2021            if count > 0 {
2022                return Err(anyhow!(
2023                    "Constraint violation: Duplicate composite key for label '{}' (in storage). Filter: {}",
2024                    label,
2025                    filter
2026                ));
2027            }
2028        }
2029
2030        Ok(())
2031    }
2032
2033    async fn check_write_pressure(&self) -> Result<()> {
2034        let status = self
2035            .storage
2036            .compaction_status()
2037            .map_err(|e| anyhow::anyhow!("Failed to get compaction status: {}", e))?;
2038        let l1_runs = status.l1_runs;
2039        let throttle = &self.config.throttle;
2040
2041        if l1_runs >= throttle.hard_limit {
2042            log::warn!("Write stalled: L1 runs ({}) at hard limit", l1_runs);
2043            // Simple polling for now
2044            while self
2045                .storage
2046                .compaction_status()
2047                .map_err(|e| anyhow::anyhow!("Failed to get compaction status: {}", e))?
2048                .l1_runs
2049                >= throttle.hard_limit
2050            {
2051                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2052            }
2053        } else if l1_runs >= throttle.soft_limit {
2054            let excess = l1_runs - throttle.soft_limit;
2055            // Cap multiplier to avoid overflow
2056            let excess = std::cmp::min(excess, 31);
2057            let multiplier = 2_u32.pow(excess as u32);
2058            let delay = throttle.base_delay * multiplier;
2059            tokio::time::sleep(delay).await;
2060        }
2061        Ok(())
2062    }
2063
2064    /// Check transaction memory limit to prevent OOM.
2065    /// No-op when no transaction is active.
2066    fn check_transaction_memory(&self, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) -> Result<()> {
2067        if let Some(tx_l0) = tx_l0 {
2068            let size = tx_l0.read().estimated_size;
2069            if size > self.config.max_transaction_memory {
2070                return Err(anyhow!(
2071                    "Transaction memory limit exceeded: {} bytes used, limit is {} bytes. \
2072                     Roll back or commit the current transaction.",
2073                    size,
2074                    self.config.max_transaction_memory
2075                ));
2076            }
2077        }
2078        Ok(())
2079    }
2080
2081    async fn get_query_context(
2082        &self,
2083        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2084    ) -> Option<QueryContext> {
2085        Some(QueryContext::new_with_pending(
2086            self.l0_manager.get_current(),
2087            tx_l0.cloned(),
2088            self.l0_manager.get_pending_flush(),
2089        ))
2090    }
2091
2092    /// Layer-1 CRDT variant enforcement, shared by the single-vertex and batch
2093    /// write paths.
2094    ///
2095    /// Rejects a declared CRDT property written as a parsed CRDT value
2096    /// (`Value::Map`) whose variant differs from the schema's declared variant.
2097    /// A mismatch would make the commit-time merge silently overwrite instead of
2098    /// merge, and the OCC CRDT carve-out (`occ::crdt_carveout_overwrite` /
2099    /// `WriteSet::from_l0`) would hide it as a lost update — so it must be caught
2100    /// at write time, on *every* write path. `try_as_crdt` is `Map`-gated, so the
2101    /// JSON-string (Cypher) form and non-CRDT values pass through untouched: they
2102    /// are never carved out and stay conflictable.
2103    fn enforce_crdt_variants(
2104        props_meta: &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
2105        properties: &Properties,
2106    ) -> Result<()> {
2107        for (key, value) in properties {
2108            let Some(meta) = props_meta.get(key) else {
2109                continue;
2110            };
2111            let uni_common::core::schema::DataType::Crdt(expected) = &meta.r#type else {
2112                continue;
2113            };
2114            if let Some(crdt) = crate::runtime::l0::try_as_crdt(value)
2115                && crdt.type_name() != expected.type_name()
2116            {
2117                return Err(anyhow::Error::new(uni_common::UniError::Constraint {
2118                    message: format!(
2119                        "CRDT property '{key}' must be written as a {} value",
2120                        expected.type_name()
2121                    ),
2122                }));
2123            }
2124        }
2125        Ok(())
2126    }
2127
2128    /// Prepare a vertex for upsert by merging CRDT properties with existing values.
2129    ///
2130    /// When `label` is provided, uses it directly to look up property metadata.
2131    /// Otherwise falls back to discovering the label from L0 buffers and storage.
2132    ///
2133    /// # Errors
2134    ///
2135    /// Returns an error if CRDT property merging fails.
2136    async fn prepare_vertex_upsert(
2137        &self,
2138        vid: Vid,
2139        properties: &mut Properties,
2140        label: Option<&str>,
2141        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2142    ) -> Result<()> {
2143        let Some(pm) = &self.property_manager else {
2144            return Ok(());
2145        };
2146
2147        let schema = self.schema_manager.schema();
2148
2149        // Resolve label: use provided label or discover from L0/storage
2150        let discovered_labels;
2151        let label_name = if let Some(l) = label {
2152            Some(l)
2153        } else {
2154            discovered_labels = self.get_vertex_labels(vid, tx_l0).await;
2155            discovered_labels
2156                .as_ref()
2157                .and_then(|l| l.first().map(|s| s.as_str()))
2158        };
2159
2160        let Some(label_str) = label_name else {
2161            return Ok(());
2162        };
2163        let Some(props_meta) = schema.properties.get(label_str) else {
2164            return Ok(());
2165        };
2166
2167        // Identify CRDT properties in the insert data
2168        let crdt_keys: Vec<String> = properties
2169            .keys()
2170            .filter(|key| {
2171                props_meta.get(*key).is_some_and(|meta| {
2172                    matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2173                })
2174            })
2175            .cloned()
2176            .collect();
2177
2178        if crdt_keys.is_empty() {
2179            return Ok(());
2180        }
2181
2182        // Enforce that each declared CRDT property written as a parsed CRDT value
2183        // (`Value::Map`) carries its declared variant. A mismatched variant makes
2184        // `merge_crdt_properties` overwrite rather than merge at commit, and the
2185        // OCC carve-out (`occ::crdt_carveout_overwrite` / `WriteSet::from_l0`)
2186        // would hide that as a silent lost update — reject it at the source.
2187        //
2188        // Only the `Map` form is checked: it is exactly the form the carve-out
2189        // applies to (`try_as_crdt` is `Map`-gated). A CRDT written as a JSON
2190        // string (the Cypher form) or a non-CRDT value is never carved out — it
2191        // stays conflictable — so it poses no carve-out soundness risk and is left
2192        // to the existing merge/parse path. This is the declared-property half of
2193        // the layered fix; the commit-time check covers undeclared CRDT-shaped values.
2194        Self::enforce_crdt_variants(props_meta, properties)?;
2195
2196        let ctx = self.get_query_context(tx_l0).await;
2197        for key in crdt_keys {
2198            let existing = pm.get_vertex_prop_with_ctx(vid, &key, ctx.as_ref()).await?;
2199            if !existing.is_null()
2200                && let Some(val) = properties.get_mut(&key)
2201            {
2202                *val = pm.merge_crdt_values(&existing, val)?;
2203            }
2204        }
2205
2206        Ok(())
2207    }
2208
2209    async fn prepare_edge_upsert(
2210        &self,
2211        eid: Eid,
2212        properties: &mut Properties,
2213        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2214    ) -> Result<()> {
2215        if let Some(pm) = &self.property_manager {
2216            let schema = self.schema_manager.schema();
2217            // Get edge type from L0 buffer instead of from EID
2218            let type_name = self.get_edge_type_from_l0(eid);
2219
2220            if let Some(ref t_name) = type_name
2221                && let Some(props_meta) = schema.properties.get(t_name)
2222            {
2223                let mut crdt_keys = Vec::new();
2224                for (key, _) in properties.iter() {
2225                    if let Some(meta) = props_meta.get(key)
2226                        && matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2227                    {
2228                        crdt_keys.push(key.clone());
2229                    }
2230                }
2231
2232                if !crdt_keys.is_empty() {
2233                    let ctx = self.get_query_context(tx_l0).await;
2234                    for key in crdt_keys {
2235                        let existing = pm.get_edge_prop(eid, &key, ctx.as_ref()).await?;
2236
2237                        if !existing.is_null()
2238                            && let Some(val) = properties.get_mut(&key)
2239                        {
2240                            *val = pm.merge_crdt_values(&existing, val)?;
2241                        }
2242                    }
2243                }
2244            }
2245        }
2246        Ok(())
2247    }
2248
2249    #[instrument(skip(self, properties), level = "trace")]
2250    pub async fn insert_vertex(
2251        &self,
2252        vid: Vid,
2253        properties: Properties,
2254        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2255    ) -> Result<()> {
2256        self.insert_vertex_with_labels(vid, properties, &[], tx_l0)
2257            .await?;
2258        Ok(())
2259    }
2260
2261    /// Component C1 (G4): before a non-transactional mutation merges into main
2262    /// L0, if an outstanding snapshot pins the current generation, freeze it
2263    /// aside so snapshots taken *before* this write stay isolated from it.
2264    ///
2265    /// `flush_lock` (acquired and released here) serializes the freeze against
2266    /// concurrent commit-time freezes/merges, matching the atomicity the tx
2267    /// commit path gets. No-op for transactional writes (their freeze happens at
2268    /// commit) and — the common case — when nothing is pinned, where it costs one
2269    /// atomic load. Freezes at most once per pinned generation: the freeze
2270    /// installs a fresh unpinned `current`, so later writes in the same bulk
2271    /// import see no pin and merge in place, and the snapshot keeps reading the
2272    /// frozen pre-import buffer.
2273    async fn freeze_for_non_tx_write_if_pinned(&self, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) {
2274        // Self-gates on the runtime SSI toggle: nothing pins a snapshot unless a
2275        // transaction began under `ssi_enabled`, so `is_current_pinned()` is
2276        // always false (one atomic load) when SSI is off.
2277        if tx_l0.is_none() && self.l0_manager.is_current_pinned() {
2278            let _flush_lock_guard = self.flush_lock.lock().await;
2279            // Re-check under the lock: a concurrent commit may have frozen first.
2280            if self.l0_manager.is_current_pinned() {
2281                self.l0_manager.freeze_current_for_snapshot();
2282                metrics::counter!("uni_l0_snapshot_freezes_total").increment(1);
2283            }
2284        }
2285    }
2286
2287    #[instrument(skip(self, properties, labels), level = "trace")]
2288    pub async fn insert_vertex_with_labels(
2289        &self,
2290        vid: Vid,
2291        mut properties: Properties,
2292        labels: &[String],
2293        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2294    ) -> Result<Properties> {
2295        let start = std::time::Instant::now();
2296        self.check_write_pressure().await?;
2297        self.check_transaction_memory(tx_l0)?;
2298
2299        // Component C1 (G4): a non-transactional write (`tx_l0 == None`, e.g. bulk
2300        // import / LOAD CSV) mutates main L0 directly, outside the commit-time
2301        // snapshot freeze. Freeze the pinned generation aside first so snapshots
2302        // taken before this write stay isolated from it.
2303        self.freeze_for_non_tx_write_if_pinned(tx_l0).await;
2304
2305        if !self.try_defer_embedding(labels, &properties, vid, tx_l0) {
2306            self.process_embeddings_for_labels(labels, &mut properties)
2307                .await?;
2308        }
2309        self.validate_vertex_constraints(vid, &properties, labels, tx_l0)
2310            .await?;
2311        self.prepare_vertex_upsert(
2312            vid,
2313            &mut properties,
2314            labels.first().map(|s| s.as_str()),
2315            tx_l0,
2316        )
2317        .await?;
2318
2319        // Clone properties and labels before moving into L0 to return them and populate constraint index
2320        let properties_copy = properties.clone();
2321        let labels_copy = labels.to_vec();
2322
2323        {
2324            // For a non-tx write, re-resolve the live `current` buffer and hold
2325            // `flush_lock` across the (synchronous) write so a concurrent flush
2326            // rotate (which takes `flush_lock` via `begin_flush`) cannot install
2327            // a fresh `current` between resolve and write, dropping our write
2328            // (Bug #4). For a tx write `resolve_l0` returns the tx-private
2329            // buffer, never the rotating `current`, so no `flush_lock` is needed
2330            // (and taking it would risk re-entrancy with the commit path).
2331            let _flush_lock_guard = if tx_l0.is_none() {
2332                Some(self.flush_lock.lock().await)
2333            } else {
2334                None
2335            };
2336            let l0 = self.resolve_l0(tx_l0);
2337            let mut l0_guard = l0.write();
2338            // Generation chaining: a non-tx CRDT write into the (post-freeze,
2339            // possibly empty) current buffer must merge against the chained
2340            // committed state, not shadow it. No-op when the chain is empty
2341            // or this is a tx-private write.
2342            if tx_l0.is_none() {
2343                let pending = self.l0_manager.get_pending_flush();
2344                if !pending.is_empty() {
2345                    Self::seed_crdt_props(&pending, vid, &properties, &mut l0_guard);
2346                }
2347            }
2348            l0_guard.insert_vertex_with_labels(vid, properties, labels);
2349
2350            // Populate constraint index for O(1) duplicate detection
2351            let schema = self.schema_manager.schema();
2352            for label in &labels_copy {
2353                if schema.get_label_case_insensitive(label).is_none() {
2354                    if self.config.strict_schema {
2355                        return Err(anyhow::anyhow!(
2356                            "Label '{}' is not defined in the schema \
2357                             (strict_schema is enabled).",
2358                            label
2359                        ));
2360                    }
2361                    continue; // Schemaless: skip unknown labels.
2362                }
2363
2364                // For each unique constraint on this label, insert into constraint index
2365                for constraint in &schema.constraints {
2366                    if !constraint.enabled {
2367                        continue;
2368                    }
2369                    if let ConstraintTarget::Label(l) = &constraint.target {
2370                        if l != label {
2371                            continue;
2372                        }
2373                    } else {
2374                        continue;
2375                    }
2376
2377                    if let ConstraintType::Unique {
2378                        properties: unique_props,
2379                    } = &constraint.constraint_type
2380                    {
2381                        let mut key_values = Vec::new();
2382                        let mut all_present = true;
2383                        for prop in unique_props {
2384                            if let Some(val) = properties_copy.get(prop) {
2385                                key_values.push((prop.clone(), val.clone()));
2386                            } else {
2387                                all_present = false;
2388                                break;
2389                            }
2390                        }
2391
2392                        if all_present {
2393                            let key = serialize_constraint_key(label, &key_values);
2394                            l0_guard.insert_constraint_key(key, vid);
2395                        }
2396                    }
2397                }
2398            }
2399        }
2400
2401        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2402        self.update_metrics();
2403
2404        if tx_l0.is_none() {
2405            self.check_flush().await?;
2406        }
2407        if start.elapsed().as_millis() > 100 {
2408            log::warn!("Slow insert_vertex: {}ms", start.elapsed().as_millis());
2409        }
2410        Ok(properties_copy)
2411    }
2412
2413    /// True iff routing this partial write through MergeInsert would
2414    /// miss a constraint check. Specifically: a multi-key UNIQUE
2415    /// constraint where the touched-set doesn't cover all member keys
2416    /// requires the unchanged keys from the existing row to compute
2417    /// the composite. Conservative: also returns true if any touched
2418    /// key is `ext_id` (uniqueness checked globally — handled in the
2419    /// full-row path).
2420    fn touched_needs_full_read(&self, touched: &Properties, labels: &[String]) -> bool {
2421        if touched.contains_key("ext_id") {
2422            return true;
2423        }
2424        let schema = self.schema_manager.schema();
2425        for label in labels {
2426            if schema.get_label_case_insensitive(label).is_none() {
2427                continue;
2428            }
2429            for constraint in &schema.constraints {
2430                if !constraint.enabled {
2431                    continue;
2432                }
2433                if let ConstraintTarget::Label(l) = &constraint.target {
2434                    if !l.eq_ignore_ascii_case(label) {
2435                        continue;
2436                    }
2437                } else {
2438                    continue;
2439                }
2440                if let ConstraintType::Unique {
2441                    properties: unique_props,
2442                } = &constraint.constraint_type
2443                {
2444                    if unique_props.len() < 2 {
2445                        continue; // single-key UNIQUE — partial path sees the key
2446                    }
2447                    if unique_props.iter().any(|p| touched.contains_key(p)) {
2448                        return true;
2449                    }
2450                }
2451            }
2452        }
2453        false
2454    }
2455
2456    /// Insert a vertex's FULL property row plus a touched-keys hint so
2457    /// the flush emits ONLY those columns via Lance MergeInsert.
2458    ///
2459    /// Caller must have read the full row (via PropertyManager) and
2460    /// applied SET-touched values on top before calling — same input
2461    /// shape as `insert_vertex_with_labels`. The new arg `touched_keys`
2462    /// is the set of property keys this SET statement actually
2463    /// assigned; L0 records it in `vertex_partial_keys[vid]` and the
2464    /// flush filters the MergeInsert source schema down to those keys.
2465    /// When `UniConfig::partial_lance_writes == false`, falls through
2466    /// to `insert_vertex_with_labels` (Append) — preserving bit-for-bit
2467    /// equivalence with prior releases.
2468    #[instrument(skip(self, props, touched_keys, labels), level = "trace")]
2469    pub async fn insert_vertex_partial_full(
2470        &self,
2471        vid: Vid,
2472        mut props: Properties,
2473        touched_keys: HashSet<String>,
2474        labels: &[String],
2475        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2476    ) -> Result<()> {
2477        if !self.config.partial_lance_writes
2478            || self.touched_needs_full_read(&props_subset(&props, &touched_keys), labels)
2479        {
2480            self.insert_vertex_with_labels(vid, props, labels, tx_l0)
2481                .await?;
2482            return Ok(());
2483        }
2484
2485        self.check_write_pressure().await?;
2486        self.check_transaction_memory(tx_l0)?;
2487        if !self.try_defer_embedding(labels, &props, vid, tx_l0) {
2488            self.process_embeddings_for_labels(labels, &mut props)
2489                .await?;
2490        }
2491        // Full-row validation runs because we have the complete map;
2492        // no need for the partial-only validator.
2493        self.validate_vertex_constraints(vid, &props, labels, tx_l0)
2494            .await?;
2495        {
2496            let l0 = self.resolve_l0(tx_l0);
2497            let mut l0_guard = l0.write();
2498            l0_guard.insert_vertex_partial_full(vid, props, touched_keys, labels);
2499        }
2500        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2501        metrics::counter!("uni_partial_writes_total").increment(1);
2502        self.update_metrics();
2503        if tx_l0.is_none() {
2504            self.check_flush().await?;
2505        }
2506        Ok(())
2507    }
2508
2509    /// Insert a vertex's *partial* property set without first reading the
2510    /// full row.
2511    ///
2512    /// When `WriterConfig::partial_lance_writes` is `true`, the touched
2513    /// keys flow into `L0Buffer::vertex_partial_keys` so the next flush
2514    /// emits them via Lance `MergeInsertBuilder` against a subset-of-
2515    /// schema source — preserving untouched columns (e.g., embeddings)
2516    /// byte-equal in Lance with no read at the caller and no write of
2517    /// those columns.
2518    ///
2519    /// When the flag is `false`, this falls back to the existing
2520    /// `insert_vertex_with_labels` path after merging `touched` with
2521    /// the current properties from L0/storage. The caller can therefore
2522    /// use this entry point unconditionally; the optimization activates
2523    /// only when the flag is on.
2524    #[instrument(skip(self, touched, labels), level = "trace")]
2525    pub async fn insert_vertex_partial(
2526        &self,
2527        vid: Vid,
2528        touched: Properties,
2529        labels: &[String],
2530        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2531    ) -> Result<()> {
2532        let needs_full_read =
2533            !self.config.partial_lance_writes || self.touched_needs_full_read(&touched, labels);
2534        if needs_full_read {
2535            // Flag-off fallback (or constraint-driven fallback): merge
2536            // `touched` with the current full property snapshot from
2537            // L0/storage and route through the existing path. Preserves
2538            // bit-for-bit equivalence with the pre-Round-11 release.
2539            let existing = if let Some(pm) = &self.property_manager {
2540                pm.get_all_vertex_props_with_ctx(vid, None)
2541                    .await
2542                    .unwrap_or_default()
2543                    .unwrap_or_default()
2544            } else {
2545                Properties::new()
2546            };
2547            let mut merged = existing;
2548            for (k, v) in touched {
2549                merged.insert(k, v);
2550            }
2551            self.insert_vertex_with_labels(vid, merged, labels, tx_l0)
2552                .await?;
2553            return Ok(());
2554        }
2555
2556        // Flag-on fast path: stage the partial update directly. Pressure
2557        // checks, embedding generation, constraint validation all still
2558        // run — but the validator is the partial-aware variant that
2559        // skips NOT NULL / multi-key UNIQUE / CHECK / EXISTS for
2560        // properties not present in `touched`. Multi-key UNIQUE that
2561        // overlaps the touched set forces a fallback above via
2562        // `touched_needs_full_read`.
2563        let mut touched = touched;
2564        self.check_write_pressure().await?;
2565        self.check_transaction_memory(tx_l0)?;
2566        if !self.try_defer_embedding(labels, &touched, vid, tx_l0) {
2567            self.process_embeddings_for_labels(labels, &mut touched)
2568                .await?;
2569        }
2570        self.validate_vertex_constraints_partial(vid, &touched, labels, tx_l0)
2571            .await?;
2572
2573        {
2574            let l0 = self.resolve_l0(tx_l0);
2575            let mut l0_guard = l0.write();
2576            l0_guard.insert_vertex_partial(vid, touched, labels);
2577        }
2578
2579        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2580        metrics::counter!("uni_partial_writes_total").increment(1);
2581        self.update_metrics();
2582        if tx_l0.is_none() {
2583            self.check_flush().await?;
2584        }
2585        Ok(())
2586    }
2587
2588    /// Insert multiple vertices with batched operations.
2589    ///
2590    /// This method uses batched operations to achieve O(N) complexity instead of O(N²)
2591    /// for bulk inserts with unique constraints.
2592    ///
2593    /// # Performance Improvements
2594    /// - Batch VID allocation: 1 call instead of N calls
2595    /// - Batch constraint validation: O(N) instead of O(N²)
2596    /// - Batch embedding generation: 1 API call per config instead of N calls
2597    /// - Transaction wrapping: Automatic flush deferral, atomicity
2598    ///
2599    /// # Arguments
2600    /// * `vids` - Pre-allocated VIDs for the vertices
2601    /// * `properties_batch` - Properties for each vertex
2602    /// * `labels` - Labels for all vertices (assumes single label for simplicity)
2603    ///
2604    /// # Errors
2605    /// Returns error if:
2606    /// - VID/properties length mismatch
2607    /// - Constraint violation detected
2608    /// - Embedding generation fails
2609    /// - Transaction commit fails
2610    ///
2611    /// # Atomicity
2612    /// If this method fails, all changes are rolled back (if transaction was started here).
2613    pub async fn insert_vertices_batch(
2614        &self,
2615        vids: Vec<Vid>,
2616        mut properties_batch: Vec<Properties>,
2617        labels: Vec<String>,
2618        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2619    ) -> Result<Vec<Properties>> {
2620        let start = std::time::Instant::now();
2621
2622        // Validate inputs
2623        if vids.len() != properties_batch.len() {
2624            return Err(anyhow!(
2625                "VID/properties size mismatch: {} vids, {} properties",
2626                vids.len(),
2627                properties_batch.len()
2628            ));
2629        }
2630
2631        if vids.is_empty() {
2632            return Ok(Vec::new());
2633        }
2634
2635        // Batch operations — writes go directly to the resolved L0.
2636        // Atomicity is guaranteed by the caller holding the writer lock.
2637        let result = async {
2638            self.check_write_pressure().await?;
2639            self.check_transaction_memory(tx_l0)?;
2640
2641            // Component C1 (G4): batch bulk-import is the canonical non-tx write —
2642            // freeze the pinned generation aside before merging so snapshot
2643            // readers stay isolated. No-op when unpinned or transactional.
2644            self.freeze_for_non_tx_write_if_pinned(tx_l0).await;
2645
2646            // Batch embedding generation (1 API call per config)
2647            self.process_embeddings_for_batch(&labels, &mut properties_batch)
2648                .await?;
2649
2650            // Batch constraint validation (O(N) instead of O(N²))
2651            let label = labels
2652                .first()
2653                .ok_or_else(|| anyhow!("No labels provided"))?;
2654            self.validate_vertex_batch_constraints(&vids, &properties_batch, label, tx_l0)
2655                .await?;
2656
2657            // Batch prepare (CRDT merging if needed)
2658            // Check schema once: skip entirely if no CRDT properties for this label.
2659            // For new vertices (freshly allocated VIDs), there are no existing CRDT
2660            // values to merge, so the per-vertex lookup is unnecessary in that case.
2661            let has_crdt_fields = {
2662                let schema = self.schema_manager.schema();
2663                schema
2664                    .properties
2665                    .get(label.as_str())
2666                    .is_some_and(|props_meta| {
2667                        props_meta.values().any(|meta| {
2668                            matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2669                        })
2670                    })
2671            };
2672
2673            if has_crdt_fields {
2674                // Layer-1 variant enforcement (G3): the batch path must reject a
2675                // declared-CRDT variant mismatch exactly as the single-vertex
2676                // `prepare_vertex_upsert` does. Without this, a wrong-variant CRDT
2677                // written via batch import slips past write-time validation and
2678                // the OCC carve-out then masks the overwrite as a lost update.
2679                {
2680                    let schema = self.schema_manager.schema();
2681                    if let Some(props_meta) = schema.properties.get(label.as_str()) {
2682                        for props in &properties_batch {
2683                            Self::enforce_crdt_variants(props_meta, props)?;
2684                        }
2685                    }
2686                }
2687
2688                // Batch fetch existing CRDT values: collect VIDs that need merging,
2689                // then query once via PropertyManager instead of per-vertex lookups.
2690                let schema = self.schema_manager.schema();
2691                let crdt_keys: Vec<String> = schema
2692                    .properties
2693                    .get(label.as_str())
2694                    .map(|props_meta| {
2695                        props_meta
2696                            .iter()
2697                            .filter(|(_, meta)| {
2698                                matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2699                            })
2700                            .map(|(key, _)| key.clone())
2701                            .collect()
2702                    })
2703                    .unwrap_or_default();
2704
2705                if let Some(pm) = &self.property_manager {
2706                    let ctx = self.get_query_context(tx_l0).await;
2707                    for (vid, props) in vids.iter().zip(&mut properties_batch) {
2708                        for key in &crdt_keys {
2709                            if props.contains_key(key) {
2710                                let existing =
2711                                    pm.get_vertex_prop_with_ctx(*vid, key, ctx.as_ref()).await?;
2712                                if !existing.is_null()
2713                                    && let Some(val) = props.get_mut(key)
2714                                {
2715                                    *val = pm.merge_crdt_values(&existing, val)?;
2716                                }
2717                            }
2718                        }
2719                    }
2720                }
2721            }
2722
2723            // Batch L0 writes — route to active L0 (transaction L0 if active, else current).
2724            let target_l0 = self.resolve_l0(tx_l0);
2725
2726            let properties_result = properties_batch.clone();
2727            {
2728                let mut l0_guard = target_l0.write();
2729                for (vid, props) in vids.iter().zip(properties_batch.iter()) {
2730                    l0_guard.insert_vertex_with_labels(*vid, props.clone(), &labels);
2731                }
2732            }
2733
2734            // Update metrics (batch increment)
2735            metrics::counter!("uni_l0_buffer_mutations_total").increment(vids.len() as u64);
2736            self.update_metrics();
2737
2738            Ok::<Vec<Properties>, anyhow::Error>(properties_result)
2739        }
2740        .await;
2741
2742        let props = result?;
2743
2744        if start.elapsed().as_millis() > 100 {
2745            log::warn!(
2746                "Slow insert_vertices_batch ({} vertices): {}ms",
2747                vids.len(),
2748                start.elapsed().as_millis()
2749            );
2750        }
2751
2752        Ok(props)
2753    }
2754
2755    /// Delete a vertex by VID.
2756    ///
2757    /// When `labels` is provided, uses them directly to populate L0 for
2758    /// correct tombstone flushing. Otherwise discovers labels from L0
2759    /// buffers and storage (which can be slow for many vertices).
2760    ///
2761    /// # Errors
2762    ///
2763    /// Returns an error if write pressure stalls, label lookup fails, or
2764    /// the L0 delete operation fails.
2765    #[instrument(skip(self, labels), level = "trace")]
2766    pub async fn delete_vertex(
2767        &self,
2768        vid: Vid,
2769        labels: Option<Vec<String>>,
2770        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2771    ) -> Result<()> {
2772        let start = std::time::Instant::now();
2773        self.check_write_pressure().await?;
2774        self.check_transaction_memory(tx_l0)?;
2775        self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
2776
2777        // Before deleting, ensure we have the vertex's labels stored in L0 so
2778        // the tombstone can be flushed to the correct label datasets. Discover
2779        // them up front (this may await storage) WITHOUT pinning the buffer we
2780        // will eventually mutate — for non-tx writes the live `current` buffer
2781        // is re-resolved below under `flush_lock`, so a concurrent rotate can't
2782        // drop our write (Bug #4). `resolve_l0` here is only used for cheap
2783        // reads that tolerate a racing rotate.
2784        let has_labels = {
2785            let l0_guard = self.resolve_l0(tx_l0);
2786            let guard = l0_guard.read();
2787            guard.vertex_labels.contains_key(&vid)
2788        };
2789
2790        let backfill_labels = if has_labels {
2791            None
2792        } else if let Some(provided) = labels {
2793            // Caller provided labels — skip the lookup entirely
2794            Some(provided)
2795        } else {
2796            // Discover labels from pending flush L0s, then storage
2797            let mut found = None;
2798            for pending_l0 in self.l0_manager.get_pending_flush() {
2799                let pending_guard = pending_l0.read();
2800                if let Some(l) = pending_guard.get_vertex_labels(vid) {
2801                    found = Some(l.to_vec());
2802                    break;
2803                }
2804            }
2805            if found.is_none() {
2806                found = self.find_vertex_labels_in_storage(vid).await?;
2807            }
2808            found
2809        };
2810
2811        // Test-only seam (no-op without the `failpoints` feature): pause a
2812        // non-transactional delete AFTER the awaited label discovery but BEFORE
2813        // it re-resolves the live buffer and writes the tombstone. A concurrent
2814        // flush can rotate+complete a buffer in this window; the fix re-resolves
2815        // `get_current()` and mutates it under `flush_lock`, so the tombstone
2816        // always lands in the live buffer (Bug #4 — silent lost delete across
2817        // L0 rotation).
2818        fail::fail_point!("nontx::after-capture");
2819
2820        // Apply the label backfill and the tombstone together. For a non-tx
2821        // write, hold `flush_lock` across the (synchronous) re-resolve + write
2822        // so a concurrent flush rotate (which takes `flush_lock` via
2823        // `begin_flush`) cannot install a fresh `current` between our resolve
2824        // and our write. For a tx write `resolve_l0` returns the tx-private
2825        // buffer (never the rotating `current`), so no `flush_lock` is needed
2826        // — and taking it there would risk re-entrancy with the commit path.
2827        if tx_l0.is_none() {
2828            let _flush_lock_guard = self.flush_lock.lock().await;
2829            let l0 = self.l0_manager.get_current();
2830            let mut guard = l0.write();
2831            if let Some(found_labels) = backfill_labels {
2832                guard.vertex_labels.insert(vid, found_labels);
2833            }
2834            guard.delete_vertex(vid)?;
2835        } else {
2836            let l0 = self.resolve_l0(tx_l0);
2837            let mut guard = l0.write();
2838            if let Some(found_labels) = backfill_labels {
2839                guard.vertex_labels.insert(vid, found_labels);
2840            }
2841            guard.delete_vertex(vid)?;
2842        }
2843        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2844        self.update_metrics();
2845
2846        if tx_l0.is_none() {
2847            self.check_flush().await?;
2848        }
2849        if start.elapsed().as_millis() > 100 {
2850            log::warn!("Slow delete_vertex: {}ms", start.elapsed().as_millis());
2851        }
2852        Ok(())
2853    }
2854
2855    /// Find vertex labels from storage by querying the main vertices table.
2856    /// Returns the labels from the latest non-deleted version of the vertex.
2857    async fn find_vertex_labels_in_storage(&self, vid: Vid) -> Result<Option<Vec<String>>> {
2858        use crate::backend::types::ScanRequest;
2859        use arrow_array::Array;
2860        use arrow_array::cast::AsArray;
2861
2862        let backend = self.storage.backend();
2863        let table_name = MainVertexDataset::table_name();
2864
2865        // Check if table exists first; if not, vertex hasn't been flushed to storage yet
2866        if !backend.table_exists(table_name).await? {
2867            return Ok(None);
2868        }
2869
2870        // Query for this specific vid (don't filter by _deleted yet - we need to find the latest version first)
2871        let filter = format!("_vid = {}", vid.as_u64());
2872        let batches = backend
2873            .scan(
2874                ScanRequest::all(table_name)
2875                    .with_filter(filter)
2876                    .with_columns(vec![
2877                        "_vid".to_string(),
2878                        "labels".to_string(),
2879                        "_version".to_string(),
2880                        "_deleted".to_string(),
2881                    ]),
2882            )
2883            .await
2884            .unwrap_or_default();
2885
2886        // Find the row with the highest version number
2887        let mut max_version: Option<u64> = None;
2888        let mut labels: Option<Vec<String>> = None;
2889        let mut is_deleted = false;
2890
2891        for batch in batches {
2892            if batch.num_rows() == 0 {
2893                continue;
2894            }
2895
2896            let version_array = batch
2897                .column_by_name("_version")
2898                .unwrap()
2899                .as_primitive::<arrow_array::types::UInt64Type>();
2900
2901            let deleted_array = batch.column_by_name("_deleted").unwrap().as_boolean();
2902
2903            let labels_array = batch.column_by_name("labels").unwrap().as_list::<i32>();
2904
2905            for row_idx in 0..batch.num_rows() {
2906                let version = version_array.value(row_idx);
2907
2908                if max_version.is_none_or(|mv| version > mv) {
2909                    is_deleted = deleted_array.value(row_idx);
2910
2911                    let labels_list = labels_array.value(row_idx);
2912                    let string_array = labels_list.as_string::<i32>();
2913                    let vertex_labels: Vec<String> = (0..string_array.len())
2914                        .filter(|&i| !string_array.is_null(i))
2915                        .map(|i| string_array.value(i).to_string())
2916                        .collect();
2917
2918                    max_version = Some(version);
2919                    labels = Some(vertex_labels);
2920                }
2921            }
2922        }
2923
2924        // If the latest version is deleted, return None
2925        if is_deleted { Ok(None) } else { Ok(labels) }
2926    }
2927
2928    #[expect(clippy::too_many_arguments)]
2929    #[instrument(skip(self, props, touched_keys), level = "trace")]
2930    pub async fn insert_edge_partial_full(
2931        &self,
2932        src_vid: Vid,
2933        dst_vid: Vid,
2934        edge_type: u32,
2935        eid: Eid,
2936        props: Properties,
2937        edge_type_name: Option<String>,
2938        touched_keys: HashSet<String>,
2939        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2940    ) -> Result<()> {
2941        self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
2942        if !self.config.partial_lance_writes {
2943            return self
2944                .insert_edge(
2945                    src_vid,
2946                    dst_vid,
2947                    edge_type,
2948                    eid,
2949                    props,
2950                    edge_type_name,
2951                    tx_l0,
2952                )
2953                .await;
2954        }
2955
2956        let start = std::time::Instant::now();
2957        self.check_write_pressure().await?;
2958        self.check_transaction_memory(tx_l0)?;
2959        let mut props = props;
2960        self.prepare_edge_upsert(eid, &mut props, tx_l0).await?;
2961
2962        let l0 = self.resolve_l0(tx_l0);
2963        l0.write().insert_edge_partial_full(
2964            src_vid,
2965            dst_vid,
2966            edge_type,
2967            eid,
2968            props,
2969            edge_type_name,
2970            touched_keys,
2971        )?;
2972
2973        if tx_l0.is_none() {
2974            let version = l0.read().current_version;
2975            self.adjacency_manager
2976                .insert_edge(src_vid, dst_vid, eid, edge_type, version);
2977        }
2978
2979        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2980        metrics::counter!("uni_partial_writes_total").increment(1);
2981        self.update_metrics();
2982        if tx_l0.is_none() {
2983            self.check_flush().await?;
2984        }
2985        if start.elapsed().as_millis() > 100 {
2986            log::warn!(
2987                "Slow insert_edge_partial_full: {}ms",
2988                start.elapsed().as_millis()
2989            );
2990        }
2991        Ok(())
2992    }
2993
2994    #[expect(clippy::too_many_arguments)]
2995    pub async fn insert_edge(
2996        &self,
2997        src_vid: Vid,
2998        dst_vid: Vid,
2999        edge_type: u32,
3000        eid: Eid,
3001        mut properties: Properties,
3002        edge_type_name: Option<String>,
3003        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
3004    ) -> Result<()> {
3005        let start = std::time::Instant::now();
3006        self.check_write_pressure().await?;
3007        self.check_transaction_memory(tx_l0)?;
3008        self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
3009        self.prepare_edge_upsert(eid, &mut properties, tx_l0)
3010            .await?;
3011
3012        let l0 = self.resolve_l0(tx_l0);
3013        l0.write()
3014            .insert_edge(src_vid, dst_vid, edge_type, eid, properties, edge_type_name)?;
3015
3016        // Dual-write to AdjacencyManager overlay (survives flush).
3017        // Skip for transaction-local L0 -- transaction edges are overlaid separately.
3018        if tx_l0.is_none() {
3019            let version = l0.read().current_version;
3020            self.adjacency_manager
3021                .insert_edge(src_vid, dst_vid, eid, edge_type, version);
3022        }
3023
3024        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
3025        self.update_metrics();
3026
3027        if tx_l0.is_none() {
3028            self.check_flush().await?;
3029        }
3030        if start.elapsed().as_millis() > 100 {
3031            log::warn!("Slow insert_edge: {}ms", start.elapsed().as_millis());
3032        }
3033        Ok(())
3034    }
3035
3036    #[instrument(skip(self), level = "trace")]
3037    pub async fn delete_edge(
3038        &self,
3039        eid: Eid,
3040        src_vid: Vid,
3041        dst_vid: Vid,
3042        edge_type: u32,
3043        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
3044    ) -> Result<()> {
3045        let start = std::time::Instant::now();
3046        self.check_write_pressure().await?;
3047        self.check_transaction_memory(tx_l0)?;
3048        self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
3049        let l0 = self.resolve_l0(tx_l0);
3050
3051        l0.write().delete_edge(eid, src_vid, dst_vid, edge_type)?;
3052
3053        // Dual-write tombstone to AdjacencyManager overlay.
3054        if tx_l0.is_none() {
3055            let version = l0.read().current_version;
3056            self.adjacency_manager
3057                .add_tombstone(eid, src_vid, dst_vid, edge_type, version);
3058        }
3059        metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
3060        self.update_metrics();
3061
3062        if tx_l0.is_none() {
3063            self.check_flush().await?;
3064        }
3065        if start.elapsed().as_millis() > 100 {
3066            log::warn!("Slow delete_edge: {}ms", start.elapsed().as_millis());
3067        }
3068        Ok(())
3069    }
3070
3071    /// Decide whether a flush should be triggered based on mutation count
3072    /// or elapsed time since the last flush.
3073    ///
3074    /// Extracted from [`Writer::check_flush`] so `commit_transaction_l0` can
3075    /// reuse the decision while bypassing the lock-acquiring entry point
3076    /// (it already holds `flush_lock`).
3077    fn should_flush(&self) -> bool {
3078        let count = self.l0_manager.get_current().read().mutation_count;
3079        if count == 0 {
3080            return false;
3081        }
3082        if count >= self.config.auto_flush_threshold {
3083            return true;
3084        }
3085        if let Some(interval) = self.config.auto_flush_interval
3086            && self.last_flush_time.lock().elapsed() >= interval
3087            && count >= self.config.auto_flush_min_mutations
3088        {
3089            return true;
3090        }
3091        false
3092    }
3093
3094    /// Check if flush should be triggered based on mutation count or time elapsed.
3095    /// This method is called after each write operation and can also be called
3096    /// by a background task for time-based flushing.
3097    pub async fn check_flush(&self) -> Result<()> {
3098        if self.should_flush() {
3099            self.flush_to_l1(None).await?;
3100        }
3101        Ok(())
3102    }
3103
3104    /// Process embeddings for a vertex using labels passed directly.
3105    /// Use this when labels haven't been stored to L0 yet.
3106    async fn process_embeddings_for_labels(
3107        &self,
3108        labels: &[String],
3109        properties: &mut Properties,
3110    ) -> Result<()> {
3111        let label_name = labels.first().map(|s| s.as_str());
3112        self.process_embeddings_impl(label_name, properties).await
3113    }
3114
3115    /// Phase B: if `defer_embeddings` is enabled in `UniConfig` and the
3116    /// vertex has an embedding config that hasn't been satisfied by the
3117    /// caller-provided properties, enqueue the VID in
3118    /// `L0Buffer::pending_embeddings` and return `true`. The caller then
3119    /// skips `process_embeddings_for_labels` and the embedding is computed
3120    /// in a single batched call at flush time via
3121    /// `drain_pending_embeddings`.
3122    ///
3123    /// Returns `false` (caller falls back to today's per-row eager embed)
3124    /// if any of:
3125    ///  - the flag is off,
3126    ///  - no label has an embedding config,
3127    ///  - the user already provided the target property (matches the
3128    ///    existing skip-if-present semantics at writer.rs:2727).
3129    ///
3130    /// Trade-off: when deferral is active, in-tx reads of the embedding
3131    /// column return only what was already in storage (or nothing for
3132    /// brand-new vertices). Existing tests that RETURN n.embedding in
3133    /// the same tx as a SET on the source column must run with the flag
3134    /// off; opt in only when no such reads happen between write and
3135    /// commit.
3136    fn try_defer_embedding(
3137        &self,
3138        labels: &[String],
3139        properties: &Properties,
3140        vid: Vid,
3141        tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
3142    ) -> bool {
3143        if !self.config.defer_embeddings {
3144            return false;
3145        }
3146        let Some(label) = labels.first() else {
3147            return false;
3148        };
3149
3150        let schema = self.schema_manager.schema();
3151        let mut has_unsatisfied_cfg = false;
3152        for idx in &schema.indexes {
3153            if let IndexDefinition::Vector(v_cfg) = idx
3154                && v_cfg.label == *label
3155                && v_cfg.embedding_config.is_some()
3156                && !properties.contains_key(&v_cfg.property)
3157            {
3158                has_unsatisfied_cfg = true;
3159                break;
3160            }
3161        }
3162        if !has_unsatisfied_cfg {
3163            return false;
3164        }
3165
3166        let l0 = self.resolve_l0(tx_l0);
3167        let mut guard = l0.write();
3168        guard.pending_embeddings.insert(vid, label.clone());
3169        true
3170    }
3171
3172    /// Drain `pending_embeddings` from the rotated old-L0 right before
3173    /// `flush_stream_l1` reads it. Groups by label, issues one batched
3174    /// `process_embeddings_for_batch` call per label, and writes the
3175    /// resulting embedding vectors into each VID's `vertex_properties`
3176    /// map. After this returns, the flush proceeds against an L0 that
3177    /// looks no different from one whose embeddings were generated
3178    /// per-row at insert.
3179    ///
3180    /// Idempotent: a VID whose embedding was already materialized
3181    /// (e.g., by on-demand read paths in a future Phase B revision) is
3182    /// detected via `properties.contains_key(target_prop)` inside
3183    /// `process_embeddings_for_batch` (writer.rs:~2650), so re-running
3184    /// the drain is safe.
3185    async fn drain_pending_embeddings(&self, old_l0_arc: &Arc<RwLock<L0Buffer>>) -> Result<()> {
3186        let by_label: HashMap<String, Vec<Vid>> = {
3187            let guard = old_l0_arc.read();
3188            if guard.pending_embeddings.is_empty() {
3189                return Ok(());
3190            }
3191            let mut m: HashMap<String, Vec<Vid>> = HashMap::new();
3192            for (vid, label) in &guard.pending_embeddings {
3193                m.entry(label.clone()).or_default().push(*vid);
3194            }
3195            m
3196        };
3197
3198        for (label, vids) in by_label {
3199            let mut properties_batch: Vec<Properties> = {
3200                let guard = old_l0_arc.read();
3201                vids.iter()
3202                    .map(|vid| {
3203                        guard
3204                            .vertex_properties
3205                            .get(vid)
3206                            .cloned()
3207                            .unwrap_or_default()
3208                    })
3209                    .collect()
3210            };
3211
3212            self.process_embeddings_for_batch(std::slice::from_ref(&label), &mut properties_batch)
3213                .await?;
3214
3215            let mut guard = old_l0_arc.write();
3216            for (vid, props) in vids.iter().zip(properties_batch) {
3217                let target = guard.vertex_properties.entry(*vid).or_default();
3218                for (k, v) in props {
3219                    target.insert(k, v);
3220                }
3221                guard.pending_embeddings.remove(vid);
3222            }
3223        }
3224        Ok(())
3225    }
3226
3227    /// Process embeddings for a batch of vertices efficiently.
3228    ///
3229    /// Groups vertices by embedding config and makes batched API calls to the
3230    /// embedding service instead of calling once per vertex.
3231    ///
3232    /// # Performance
3233    /// For N vertices with embedding config:
3234    /// - Old approach: N API calls to embedding service
3235    /// - New approach: 1 API call per embedding config (usually 1 total)
3236    async fn process_embeddings_for_batch(
3237        &self,
3238        labels: &[String],
3239        properties_batch: &mut [Properties],
3240    ) -> Result<()> {
3241        let label_name = labels.first().map(|s| s.as_str());
3242        let schema = self.schema_manager.schema();
3243
3244        if let Some(label) = label_name {
3245            // Find vector indexes with embedding config for this label
3246            let mut configs = Vec::new();
3247            for idx in &schema.indexes {
3248                if let IndexDefinition::Vector(v_config) = idx
3249                    && v_config.label == label
3250                    && let Some(emb_config) = &v_config.embedding_config
3251                {
3252                    configs.push((v_config.property.clone(), emb_config.clone()));
3253                }
3254            }
3255
3256            if configs.is_empty() {
3257                return Ok(());
3258            }
3259
3260            for (target_prop, emb_config) in configs {
3261                // Collect input texts from all vertices that need embeddings
3262                let mut input_texts: Vec<String> = Vec::new();
3263                let mut needs_embedding: Vec<usize> = Vec::new();
3264
3265                for (idx, properties) in properties_batch.iter().enumerate() {
3266                    // Skip if target property already exists
3267                    if properties.contains_key(&target_prop) {
3268                        continue;
3269                    }
3270
3271                    // Check if source properties exist
3272                    let mut inputs = Vec::new();
3273                    for src_prop in &emb_config.source_properties {
3274                        if let Some(val) = properties.get(src_prop)
3275                            && let Some(s) = val.as_str()
3276                        {
3277                            inputs.push(s.to_string());
3278                        }
3279                    }
3280
3281                    if !inputs.is_empty() {
3282                        let input_text = inputs.join(" ");
3283                        let input_text = match &emb_config.document_prefix {
3284                            Some(prefix) => format!("{prefix}{input_text}"),
3285                            None => input_text,
3286                        };
3287                        input_texts.push(input_text);
3288                        needs_embedding.push(idx);
3289                    }
3290                }
3291
3292                if input_texts.is_empty() {
3293                    continue;
3294                }
3295
3296                let runtime = self.xervo_runtime.get().ok_or_else(|| {
3297                    anyhow!("Uni-Xervo runtime not configured for auto-embedding")
3298                })?;
3299                let embedder = runtime.embedding(&emb_config.alias).await?;
3300
3301                // Batch generate embeddings (single API call)
3302                let input_refs: Vec<&str> = input_texts.iter().map(|s| s.as_str()).collect();
3303                let embeddings = embedder.embed(input_refs).await?;
3304
3305                // Distribute results back to properties
3306                for (embedding_idx, &prop_idx) in needs_embedding.iter().enumerate() {
3307                    if let Some(vec) = embeddings.get(embedding_idx) {
3308                        let vals: Vec<Value> =
3309                            vec.iter().map(|f| Value::Float(*f as f64)).collect();
3310                        properties_batch[prop_idx].insert(target_prop.clone(), Value::List(vals));
3311                    }
3312                }
3313            }
3314        }
3315
3316        Ok(())
3317    }
3318
3319    async fn process_embeddings_impl(
3320        &self,
3321        label_name: Option<&str>,
3322        properties: &mut Properties,
3323    ) -> Result<()> {
3324        let schema = self.schema_manager.schema();
3325
3326        if let Some(label) = label_name {
3327            // Find vector indexes with embedding config for this label
3328            let mut configs = Vec::new();
3329            for idx in &schema.indexes {
3330                if let IndexDefinition::Vector(v_config) = idx
3331                    && v_config.label == label
3332                    && let Some(emb_config) = &v_config.embedding_config
3333                {
3334                    configs.push((v_config.property.clone(), emb_config.clone()));
3335                }
3336            }
3337
3338            if configs.is_empty() {
3339                log::info!("No embedding config found for label {}", label);
3340            }
3341
3342            for (target_prop, emb_config) in configs {
3343                // If target property already exists, skip (assume user provided it)
3344                if properties.contains_key(&target_prop) {
3345                    continue;
3346                }
3347
3348                // Check if source properties exist
3349                let mut inputs = Vec::new();
3350                for src_prop in &emb_config.source_properties {
3351                    if let Some(val) = properties.get(src_prop)
3352                        && let Some(s) = val.as_str()
3353                    {
3354                        inputs.push(s.to_string());
3355                    }
3356                }
3357
3358                if inputs.is_empty() {
3359                    continue;
3360                }
3361
3362                let input_text = inputs.join(" ");
3363                let input_text = match &emb_config.document_prefix {
3364                    Some(prefix) => format!("{prefix}{input_text}"),
3365                    None => input_text,
3366                };
3367
3368                let runtime = self.xervo_runtime.get().ok_or_else(|| {
3369                    anyhow!("Uni-Xervo runtime not configured for auto-embedding")
3370                })?;
3371                let embedder = runtime.embedding(&emb_config.alias).await?;
3372
3373                // Generate
3374                let embeddings = embedder.embed(vec![input_text.as_str()]).await?;
3375                if let Some(vec) = embeddings.first() {
3376                    // Store as array of floats
3377                    let vals: Vec<Value> = vec.iter().map(|f| Value::Float(*f as f64)).collect();
3378                    properties.insert(target_prop.clone(), Value::List(vals));
3379                }
3380            }
3381        }
3382        Ok(())
3383    }
3384
3385    /// Flushes the current in-memory L0 buffer to L1 storage.
3386    ///
3387    /// # Lock Ordering
3388    ///
3389    /// To prevent deadlocks, locks must be acquired in the following order:
3390    /// 1. `Writer` lock (held by caller via outer `Arc<RwLock<Writer>>`; removed in Phase 4)
3391    /// 2. `flush_lock` (acquired by this entry point; held across the whole flush)
3392    /// 3. `L0Manager` lock (via `begin_flush` / `get_current`)
3393    /// 4. `L0Buffer` lock (individual buffer RWLocks)
3394    /// 5. `Index` / `Storage` locks (during actual flush)
3395    ///
3396    /// Callers that already hold `flush_lock` (today only `commit_transaction_l0`)
3397    /// must call `flush_inline_under_lock` (private) directly to avoid a re-entrant
3398    /// `tokio::sync::Mutex` deadlock — see concurrent_writer.md §5.5.
3399    pub async fn flush_to_l1(&self, name: Option<String>) -> Result<String> {
3400        // Drain any in-flight async flushes first. `flush_to_l1` is a
3401        // SYNCHRONIZATION BARRIER — callers (test fixtures, fork
3402        // setup, shutdown paths) rely on it as "all writes are now
3403        // durably in Lance". Without the drain, an async stream from
3404        // a recent commit might still be writing to Lance when
3405        // `flush_to_l1` returns, leaving a window where forks branch
3406        // off pre-write Lance state and lose data.
3407        if let Some(coord) = self.flush_coordinator.as_ref() {
3408            let _ = coord.drain(self.config.drop_fork_drain_timeout).await;
3409        }
3410        let _flush_lock_guard = self.flush_lock.lock().await;
3411        self.flush_inline_under_lock(name).await
3412    }
3413
3414    /// Async-flush entry point: rotate under `flush_lock`, release the
3415    /// lock, then submit the stream phase to the [`FlushCoordinator`].
3416    /// Returns a [`FlushTicket`](crate::runtime::flush_coordinator::FlushTicket)
3417    /// that resolves when finalize completes.
3418    ///
3419    /// Errors if `config.async_flush_enabled = false` (the coordinator
3420    /// is `None` in that case — see `flush_coordinator` field doc).
3421    pub async fn flush_to_l1_async(
3422        self: &Arc<Self>,
3423        name: Option<String>,
3424    ) -> Result<crate::runtime::flush_coordinator::FlushTicket> {
3425        let coord = self
3426            .flush_coordinator
3427            .as_ref()
3428            .ok_or_else(|| anyhow!("async flush not enabled (config.async_flush_enabled=false)"))?
3429            .clone();
3430        // 1. Acquire permit FIRST (outside flush_lock) so we don't
3431        //    introduce a permit-while-holding-flush-lock convoy.
3432        let permit = coord.acquire_permit().await?;
3433        // 2. Rotate under flush_lock (µs work), then allocate the rotate seq
3434        //    and bump pending ONLY after the rotate succeeds (Bug #3). A failed
3435        //    rotate (the `?` below) must consume neither: the finalizer
3436        //    advances in strictly consecutive seq order and only decrements
3437        //    pending on finalize, so a leaked seq/pending would wedge it
3438        //    forever. The seq is allocated under `flush_lock`, immediately
3439        //    after the rotate and before the guard drops, so concurrent rotates
3440        //    keep seq order == rotation order, and the seq is unused until
3441        //    submit. On the `?` error path the permit drops, freeing the slot.
3442        let (
3443            RotateOutput {
3444                old_l0_arc,
3445                wal_lsn,
3446                current_version,
3447                flush_in_progress_guard,
3448            },
3449            seq,
3450        ) = {
3451            let _flush_lock_guard = self.flush_lock.lock().await;
3452            let rotate_out = self.flush_l0_rotate().await?;
3453            let seq = coord.next_rotate_seq();
3454            coord.note_pending();
3455            (rotate_out, seq)
3456        };
3457        // 3. Build the coordinator's RotatedFlush. parent_manifest is the
3458        //    cached_manifest snapshot at this moment.
3459        let parent_manifest = self.cached_manifest.lock().clone();
3460        let rotated = RotatedFlush {
3461            seq,
3462            old_l0_arc: old_l0_arc.clone(),
3463            wal_lsn,
3464            current_version,
3465            name: name.clone(),
3466            parent_manifest,
3467            permit,
3468            flush_in_progress_guard,
3469        };
3470        // 4. Spawn the stream phase via the coordinator. The closure
3471        //    captures Arc<Writer> transiently — drops when stream
3472        //    completes (bounded, ~50-500 ms).
3473        let writer = self.clone();
3474        let ticket = coord.submit_for_stream(rotated, move |old_l0, wal, ver, n| async move {
3475            let outcome = writer.flush_stream_l1(old_l0, wal, ver, n).await?;
3476            Ok(crate::runtime::flush_coordinator::FlushOutcome {
3477                new_manifest: outcome.manifest,
3478                snapshot_id: outcome.snapshot_id,
3479            })
3480        });
3481        Ok(ticket)
3482    }
3483
3484    /// Phase A+B+C of the flush: flush the WAL, rotate L0 (so the
3485    /// to-be-flushed buffer moves to `pending_flush` and a fresh L0 takes
3486    /// its place), and hand off the WAL to the new L0.
3487    ///
3488    /// Runs in microseconds. Must be called under `flush_lock` (the caller
3489    /// is responsible). The returned [`RotateOutput`] carries everything
3490    /// the subsequent stream + finalize phases need; in particular the
3491    /// [`FlushInProgressGuard`] is bound to the return value so it stays
3492    /// alive for the full flush lifetime — including any future async
3493    /// path where stream runs on a spawned task.
3494    async fn flush_l0_rotate(&self) -> Result<RotateOutput> {
3495        // Acquire the in-progress counter BEFORE any heavy work. The
3496        // guard lives on RotateOutput; dropping RotateOutput drops the
3497        // guard, so the counter goes back to zero exactly when the flush
3498        // is fully done.
3499        let flush_in_progress_guard = FlushInProgressGuard::new(&self.storage);
3500
3501        // A. Flush WAL BEFORE rotating L0. If WAL flush fails, the
3502        // current L0 is still active and mutations are retained in
3503        // memory until restart/retry.
3504        let wal_for_truncate = {
3505            let current_l0 = self.l0_manager.get_current();
3506            let l0_guard = current_l0.read();
3507            l0_guard.wal.clone()
3508        };
3509        // Test-only seam (no-op without the `failpoints` feature): inject a
3510        // WAL-flush failure here to drive the "failed async rotate wedges the
3511        // finalizer" regression (Bug #3). When configured to "return" it makes
3512        // `flush_l0_rotate` return Err exactly as a real WAL-flush failure would.
3513        fail::fail_point!("flush::rotate-fail", |_| {
3514            Err(anyhow!("flush::rotate-fail injected WAL-flush failure"))
3515        });
3516        let wal_lsn = if let Some(ref w) = wal_for_truncate {
3517            w.flush().await?
3518        } else {
3519            0
3520        };
3521
3522        // B. Begin flush: rotate L0 and keep old L0 visible to reads via
3523        // pending_flush until complete_flush is called by finalize.
3524        let old_l0_arc = self.l0_manager.begin_flush(0, None);
3525        metrics::counter!("uni_l0_buffer_rotations_total").increment(1);
3526
3527        // C. WAL handoff: record wal_lsn on old L0, transfer WAL handle
3528        // and current_version to the new L0.
3529        let current_version;
3530        {
3531            let mut old_l0_guard = old_l0_arc.write();
3532            current_version = old_l0_guard.current_version;
3533            old_l0_guard.wal_lsn_at_flush = wal_lsn;
3534            let wal = old_l0_guard.wal.take();
3535            let new_l0_arc = self.l0_manager.get_current();
3536            let mut new_l0_guard = new_l0_arc.write();
3537            new_l0_guard.wal = wal;
3538            new_l0_guard.current_version = current_version;
3539        }
3540
3541        Ok(RotateOutput {
3542            old_l0_arc,
3543            wal_lsn,
3544            current_version,
3545            flush_in_progress_guard,
3546        })
3547    }
3548
3549    /// Phases D, E, F, G of the flush: L1 collect, orphan resolve,
3550    /// manifest seed, Lance writes. Reads from `old_l0_arc` (kept in
3551    /// pending_flush by Phase B); writes append-only Lance datasets; does
3552    /// NOT call save_snapshot / set_latest_snapshot — those are
3553    /// finalize's job, so the manifest doesn't get published until the
3554    /// next phase.
3555    ///
3556    /// Today takes `&self`; in a follow-up commit this becomes a
3557    /// static `Send + 'static` function over `SharedFlushCtx` so it can
3558    /// run on a spawned task while concurrent commits proceed.
3559    async fn flush_stream_l1(
3560        &self,
3561        old_l0_arc: Arc<RwLock<L0Buffer>>,
3562        wal_lsn: u64,
3563        current_version: u64,
3564        name: Option<String>,
3565    ) -> Result<FlushOutcome> {
3566        // Test-only seam (no-op without the `failpoints` feature): the rotate
3567        // (begin_flush) already moved the to-be-flushed buffer onto
3568        // pending_flush and installed a fresh empty current buffer, but the
3569        // rotated rows are NOT yet durable in Lance. Pausing here holds that
3570        // window open to drive the unique-constraint-hole regression
3571        // (Bug #9 Mechanism A).
3572        fail::fail_point!("flush::after-rotate-before-lance");
3573
3574        // Phase B: materialize any deferred embeddings before column
3575        // extraction. No-op when `defer_embeddings` is off (the set will
3576        // be empty). On-demand reads of the embedding column are a TODO
3577        // for a future revision (see UniConfig::defer_embeddings docs).
3578        self.drain_pending_embeddings(&old_l0_arc).await?;
3579
3580        let schema = self.schema_manager.schema();
3581        // 2. Acquire Read lock on Old L0 for flushing
3582        let mut entries_by_type: HashMap<u32, Vec<L1Entry>> = HashMap::new();
3583        // (Vid, labels, properties, deleted, version)
3584        type VertexEntry = (Vid, Vec<String>, Properties, bool, u64);
3585        let mut vertices_by_label: HashMap<u16, Vec<VertexEntry>> = HashMap::new();
3586        // Partial-column updates (Lance MergeInsert path). Per-VID tuple:
3587        // (vid, full L0 properties map, version, set of keys to update).
3588        // Only the keys in the HashSet are emitted to the partial source;
3589        // the full props map is retained so the per-row column extractor
3590        // can read each touched key's value.
3591        type PartialEntry = (Vid, Properties, u64, std::collections::HashSet<String>);
3592        let mut partial_by_label: HashMap<u16, Vec<PartialEntry>> = HashMap::new();
3593        // DELETE-via-MergeInsert (Round-12 §B): tombstones flush as a
3594        // partial source with just `_vid`, `_deleted=true`, `_version`,
3595        // `_updated_at`. Skips the wide-row Append payload that adds
3596        // nothing on a soft-delete.
3597        let mut tombstones_by_label: HashMap<u16, Vec<(Vid, u64)>> = HashMap::new();
3598        let mut main_vertex_tombstones: Vec<(Vid, u64)> = Vec::new();
3599        // Collect vertex timestamps from L0 for flushing to storage
3600        let mut vertex_created_at: HashMap<Vid, i64> = HashMap::new();
3601        let mut vertex_updated_at: HashMap<Vid, i64> = HashMap::new();
3602        // Track tombstones missing labels for storage query fallback
3603        let mut orphaned_tombstones: Vec<(Vid, u64)> = Vec::new();
3604
3605        {
3606            let old_l0 = old_l0_arc.read();
3607
3608            // 1. Collect all edges and tombstones from L0
3609            for edge in old_l0.graph.edges() {
3610                let properties = old_l0
3611                    .edge_properties
3612                    .get(&edge.eid)
3613                    .cloned()
3614                    .unwrap_or_default();
3615                let version = old_l0.edge_versions.get(&edge.eid).copied().unwrap_or(0);
3616
3617                // Get timestamps from L0 buffer (populated during insert)
3618                let created_at = old_l0.edge_created_at.get(&edge.eid).copied();
3619                let updated_at = old_l0.edge_updated_at.get(&edge.eid).copied();
3620
3621                entries_by_type
3622                    .entry(edge.edge_type)
3623                    .or_default()
3624                    .push(L1Entry {
3625                        src_vid: edge.src_vid,
3626                        dst_vid: edge.dst_vid,
3627                        eid: edge.eid,
3628                        op: Op::Insert,
3629                        version,
3630                        properties,
3631                        created_at,
3632                        updated_at,
3633                    });
3634            }
3635
3636            // From tombstones
3637            for tombstone in old_l0.tombstones.values() {
3638                let version = old_l0
3639                    .edge_versions
3640                    .get(&tombstone.eid)
3641                    .copied()
3642                    .unwrap_or(0);
3643                // Get timestamps - for deletes, updated_at reflects deletion time
3644                let created_at = old_l0.edge_created_at.get(&tombstone.eid).copied();
3645                let updated_at = old_l0.edge_updated_at.get(&tombstone.eid).copied();
3646
3647                entries_by_type
3648                    .entry(tombstone.edge_type)
3649                    .or_default()
3650                    .push(L1Entry {
3651                        src_vid: tombstone.src_vid,
3652                        dst_vid: tombstone.dst_vid,
3653                        eid: tombstone.eid,
3654                        op: Op::Delete,
3655                        version,
3656                        properties: HashMap::new(),
3657                        created_at,
3658                        updated_at,
3659                    });
3660            }
3661
3662            // 1b. Collect vertices by label (using vertex_labels from L0)
3663            //
3664            // Helper: fan-out a single vertex entry into per-label buckets.
3665            // Each per-label table row carries the full label set so multi-label
3666            // info is preserved after flush.
3667            let push_vertex_to_labels =
3668                |vid: Vid,
3669                 all_labels: &[String],
3670                 props: Properties,
3671                 deleted: bool,
3672                 version: u64,
3673                 out: &mut HashMap<u16, Vec<VertexEntry>>| {
3674                    for label in all_labels {
3675                        if let Some(label_id) = schema.label_id_by_name(label) {
3676                            out.entry(label_id).or_default().push((
3677                                vid,
3678                                all_labels.to_vec(),
3679                                props.clone(),
3680                                deleted,
3681                                version,
3682                            ));
3683                        }
3684                    }
3685                };
3686
3687            for (vid, props) in &old_l0.vertex_properties {
3688                let version = old_l0.vertex_versions.get(vid).copied().unwrap_or(0);
3689                // Collect timestamps for this vertex
3690                if let Some(&ts) = old_l0.vertex_created_at.get(vid) {
3691                    vertex_created_at.insert(*vid, ts);
3692                }
3693                if let Some(&ts) = old_l0.vertex_updated_at.get(vid) {
3694                    vertex_updated_at.insert(*vid, ts);
3695                }
3696                if let Some(labels) = old_l0.vertex_labels.get(vid) {
3697                    // Partial-write routing: when this VID was last
3698                    // touched via `insert_vertex_partial` AND the
3699                    // partial_lance_writes flag is on, send only the
3700                    // touched columns to a MergeInsert batch. Otherwise
3701                    // (CREATE, MERGE-ON-CREATE, full-replace SET, DELETE
3702                    // — or flag off) use the existing full-row Append.
3703                    let is_partial = self.config.partial_lance_writes
3704                        && old_l0.vertex_partial_keys.contains_key(vid);
3705                    if is_partial {
3706                        if let Some(touched) = old_l0.vertex_partial_keys.get(vid) {
3707                            for label in labels {
3708                                if let Some(label_id) = schema.label_id_by_name(label) {
3709                                    partial_by_label.entry(label_id).or_default().push((
3710                                        *vid,
3711                                        props.clone(),
3712                                        version,
3713                                        touched.clone(),
3714                                    ));
3715                                }
3716                            }
3717                        }
3718                    } else {
3719                        push_vertex_to_labels(
3720                            *vid,
3721                            labels,
3722                            props.clone(),
3723                            false,
3724                            version,
3725                            &mut vertices_by_label,
3726                        );
3727                    }
3728                }
3729            }
3730            for &vid in &old_l0.vertex_tombstones {
3731                let version = old_l0.vertex_versions.get(&vid).copied().unwrap_or(0);
3732                if let Some(&ts) = old_l0.vertex_updated_at.get(&vid) {
3733                    vertex_updated_at.insert(vid, ts);
3734                }
3735                if let Some(labels) = old_l0.vertex_labels.get(&vid) {
3736                    // Round-12 §B: tombstones flush via Lance MergeInsert
3737                    // (just `_vid`, `_deleted=true`, `_version`,
3738                    // `_updated_at`) — skipping the wide-row Append.
3739                    // Unconditional (no `partial_lance_writes` gating);
3740                    // tombstone Append carries no useful payload.
3741                    for label in labels {
3742                        if let Some(label_id) = schema.label_id_by_name(label) {
3743                            tombstones_by_label
3744                                .entry(label_id)
3745                                .or_default()
3746                                .push((vid, version));
3747                        }
3748                    }
3749                } else {
3750                    // Tombstone missing labels (old WAL format) - collect for storage query fallback
3751                    orphaned_tombstones.push((vid, version));
3752                }
3753            }
3754        } // Drop read lock
3755
3756        // Resolve orphaned tombstones (missing labels) from storage
3757        if !orphaned_tombstones.is_empty() {
3758            tracing::warn!(
3759                count = orphaned_tombstones.len(),
3760                "Tombstones missing labels in L0, querying storage as fallback"
3761            );
3762            for (vid, version) in orphaned_tombstones {
3763                if let Ok(Some(labels)) = self.find_vertex_labels_in_storage(vid).await
3764                    && !labels.is_empty()
3765                {
3766                    for label in &labels {
3767                        if let Some(label_id) = schema.label_id_by_name(label) {
3768                            // Round-12 §B: route through partial tombstone too.
3769                            tombstones_by_label
3770                                .entry(label_id)
3771                                .or_default()
3772                                .push((vid, version));
3773                        }
3774                    }
3775                }
3776            }
3777        }
3778
3779        // 1. Load previous snapshot from cache, or fall back to storage.
3780        //
3781        // Use clone() not take(): for the async path, multiple
3782        // concurrent streams may run; if we take() here, a sibling
3783        // stream sees cached_manifest = None and seeds from
3784        // load_latest_snapshot (stale), losing the chain. clone()
3785        // preserves the parent. Finalize writes back the new manifest
3786        // unconditionally.
3787        let mut manifest = if let Some(cached) = self.cached_manifest.lock().clone() {
3788            cached
3789        } else {
3790            self.storage
3791                .snapshot_manager()
3792                .load_latest_snapshot()
3793                .await?
3794                .unwrap_or_else(|| {
3795                    SnapshotManifest::new(Uuid::new_v4().to_string(), schema.schema_version)
3796                })
3797        };
3798
3799        // Update snapshot metadata
3800        // Save parent snapshot ID before generating new one (for lineage tracking)
3801        let parent_id = manifest.snapshot_id.clone();
3802        manifest.parent_snapshot = Some(parent_id);
3803        manifest.snapshot_id = Uuid::new_v4().to_string();
3804        manifest.name = name;
3805        manifest.created_at = Utc::now();
3806        manifest.version_high_water_mark = current_version;
3807        manifest.wal_high_water_mark = wal_lsn;
3808        let snapshot_id = manifest.snapshot_id.clone();
3809
3810        tracing::Span::current().record("snapshot_id", &snapshot_id);
3811
3812        // 2. Write main unified tables FIRST (before deltas).
3813        //    Ensures the dual-write invariant: by the time an EID appears in a
3814        //    delta table, it already exists in main_edges. This prevents the
3815        //    compaction debug_assert from firing when compaction interleaves
3816        //    with flush at async yield points.
3817        //
3818        // 2.1 Main edges table
3819        let (main_edges, edge_created_at_map, edge_updated_at_map) = {
3820            let _old_l0 = old_l0_arc.read();
3821            let mut main_edges: Vec<(
3822                uni_common::core::id::Eid,
3823                Vid,
3824                Vid,
3825                String,
3826                Properties,
3827                bool,
3828                u64,
3829            )> = Vec::new();
3830            let mut edge_created_at_map: HashMap<uni_common::core::id::Eid, i64> = HashMap::new();
3831            let mut edge_updated_at_map: HashMap<uni_common::core::id::Eid, i64> = HashMap::new();
3832
3833            for (&edge_type_id, entries) in entries_by_type.iter() {
3834                for entry in entries {
3835                    let edge_type_name = self
3836                        .storage
3837                        .schema_manager()
3838                        .edge_type_name_by_id_unified(edge_type_id)
3839                        .unwrap_or_else(|| "unknown".to_string());
3840
3841                    let deleted = matches!(entry.op, Op::Delete);
3842                    main_edges.push((
3843                        entry.eid,
3844                        entry.src_vid,
3845                        entry.dst_vid,
3846                        edge_type_name,
3847                        entry.properties.clone(),
3848                        deleted,
3849                        entry.version,
3850                    ));
3851
3852                    if let Some(ts) = entry.created_at {
3853                        edge_created_at_map.insert(entry.eid, ts);
3854                    }
3855                    if let Some(ts) = entry.updated_at {
3856                        edge_updated_at_map.insert(entry.eid, ts);
3857                    }
3858                }
3859            }
3860
3861            (main_edges, edge_created_at_map, edge_updated_at_map)
3862        };
3863
3864        if !main_edges.is_empty() {
3865            let main_edge_batch = MainEdgeDataset::build_record_batch(
3866                &main_edges,
3867                Some(&edge_created_at_map),
3868                Some(&edge_updated_at_map),
3869            )?;
3870            MainEdgeDataset::write_batch(self.storage.backend(), main_edge_batch).await?;
3871            MainEdgeDataset::ensure_default_indexes(self.storage.backend()).await?;
3872        }
3873
3874        // 2.2 Main vertices table
3875        let main_vertices: Vec<(Vid, Vec<String>, Properties, bool, u64)> = {
3876            let old_l0 = old_l0_arc.read();
3877            let mut vertices = Vec::new();
3878
3879            // Live vertices: full-row Append on the main table (the
3880            // props_json blob is required for global ID lookups). For
3881            // partial-row VIDs (vertex_partial_keys non-empty), the
3882            // main table still needs the full props for the
3883            // ext_id-uniqueness path; we keep the Append here. The
3884            // per-label Lance write IS partial via MergeInsert.
3885            for (vid, props) in &old_l0.vertex_properties {
3886                let version = old_l0.vertex_versions.get(vid).copied().unwrap_or(0);
3887                let labels = old_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
3888                vertices.push((*vid, labels, props.clone(), false, version));
3889            }
3890
3891            // Tombstones: collected into `main_vertex_tombstones` for
3892            // the MergeInsert path below; skipping the wide-row Append.
3893            for &vid in &old_l0.vertex_tombstones {
3894                let version = old_l0.vertex_versions.get(&vid).copied().unwrap_or(0);
3895                main_vertex_tombstones.push((vid, version));
3896            }
3897
3898            vertices
3899        };
3900
3901        if !main_vertices.is_empty() {
3902            let main_vertex_batch = MainVertexDataset::build_record_batch(
3903                &main_vertices,
3904                Some(&vertex_created_at),
3905                Some(&vertex_updated_at),
3906            )?;
3907            MainVertexDataset::write_batch(self.storage.backend(), main_vertex_batch).await?;
3908        }
3909        // Round-12 §B: tombstones via MergeInsert on the main vertices
3910        // table. Independent of `vertex_properties` length.
3911        if !main_vertex_tombstones.is_empty() {
3912            let tomb_batch = MainVertexDataset::build_tombstone_partial_batch(
3913                &main_vertex_tombstones,
3914                Some(&vertex_updated_at),
3915            )?;
3916            MainVertexDataset::merge_insert_tombstone_batch(self.storage.backend(), tomb_batch)
3917                .await?;
3918        }
3919        if !main_vertices.is_empty() || !main_vertex_tombstones.is_empty() {
3920            MainVertexDataset::ensure_default_indexes(self.storage.backend()).await?;
3921        }
3922
3923        // 3. For each edge type, write FWD and BWD delta runs
3924        for (&edge_type_id, entries) in entries_by_type.iter() {
3925            // Get edge type name from unified lookup (handles both schema'd and schemaless)
3926            let edge_type_name = self
3927                .storage
3928                .schema_manager()
3929                .edge_type_name_by_id_unified(edge_type_id)
3930                .ok_or_else(|| anyhow!("Edge type ID {} not found", edge_type_id))?;
3931
3932            // FWD Run (sorted by src_vid)
3933            // Round-12 §A: split entries into full-row Append and
3934            // partial MergeInsert routes based on `edge_partial_keys`.
3935            // Edges in `edge_partial_keys` were last written via
3936            // `insert_edge_partial_full`; the per-edge-type delta
3937            // tables receive only the touched schema columns plus
3938            // (when any overflow key was touched) the regenerated
3939            // `overflow_json` blob. Untouched columns retain their
3940            // previous-version value via Lance MergeInsert.
3941            let partial_eids: std::collections::HashSet<Eid> = {
3942                let old_l0 = old_l0_arc.read();
3943                entries
3944                    .iter()
3945                    .filter(|e| {
3946                        self.config.partial_lance_writes
3947                            && old_l0.edge_partial_keys.contains_key(&e.eid)
3948                    })
3949                    .map(|e| e.eid)
3950                    .collect()
3951            };
3952            let touched_union_by_eid: HashMap<Eid, std::collections::HashSet<String>> = {
3953                let old_l0 = old_l0_arc.read();
3954                partial_eids
3955                    .iter()
3956                    .filter_map(|eid| old_l0.edge_partial_keys.get(eid).map(|s| (*eid, s.clone())))
3957                    .collect()
3958            };
3959            let (full_entries, partial_entries): (Vec<L1Entry>, Vec<L1Entry>) = entries
3960                .clone()
3961                .into_iter()
3962                .partition(|e| !partial_eids.contains(&e.eid));
3963
3964            let backend = self.storage.backend();
3965
3966            // FWD run (sorted by src_vid)
3967            let mut fwd_full = full_entries.clone();
3968            fwd_full.sort_by_key(|e| e.src_vid);
3969            let mut fwd_partial = partial_entries.clone();
3970            fwd_partial.sort_by_key(|e| e.src_vid);
3971            let fwd_ds = self.storage.delta_dataset(&edge_type_name, "fwd")?;
3972            if !fwd_full.is_empty() {
3973                let fwd_batch = fwd_ds.build_record_batch(&fwd_full, &schema)?;
3974                fwd_ds.write_run(backend, fwd_batch).await?;
3975            }
3976            if !fwd_partial.is_empty() {
3977                let touched_union: std::collections::HashSet<String> = fwd_partial
3978                    .iter()
3979                    .flat_map(|e| {
3980                        touched_union_by_eid
3981                            .get(&e.eid)
3982                            .cloned()
3983                            .unwrap_or_default()
3984                            .into_iter()
3985                    })
3986                    .collect();
3987                let fwd_partial_batch =
3988                    fwd_ds.build_partial_record_batch(&fwd_partial, &touched_union, &schema)?;
3989                fwd_ds
3990                    .merge_insert_partial_run(backend, fwd_partial_batch)
3991                    .await?;
3992            }
3993            fwd_ds.ensure_eid_index(backend).await?;
3994
3995            // BWD Run (sorted by dst_vid)
3996            let mut bwd_full = full_entries.clone();
3997            bwd_full.sort_by_key(|e| e.dst_vid);
3998            let mut bwd_partial = partial_entries.clone();
3999            bwd_partial.sort_by_key(|e| e.dst_vid);
4000            let bwd_ds = self.storage.delta_dataset(&edge_type_name, "bwd")?;
4001            if !bwd_full.is_empty() {
4002                let bwd_batch = bwd_ds.build_record_batch(&bwd_full, &schema)?;
4003                bwd_ds.write_run(backend, bwd_batch).await?;
4004            }
4005            if !bwd_partial.is_empty() {
4006                let touched_union: std::collections::HashSet<String> = bwd_partial
4007                    .iter()
4008                    .flat_map(|e| {
4009                        touched_union_by_eid
4010                            .get(&e.eid)
4011                            .cloned()
4012                            .unwrap_or_default()
4013                            .into_iter()
4014                    })
4015                    .collect();
4016                let bwd_partial_batch =
4017                    bwd_ds.build_partial_record_batch(&bwd_partial, &touched_union, &schema)?;
4018                bwd_ds
4019                    .merge_insert_partial_run(backend, bwd_partial_batch)
4020                    .await?;
4021            }
4022            bwd_ds.ensure_eid_index(backend).await?;
4023
4024            // Update Manifest
4025            let current_snap =
4026                manifest
4027                    .edges
4028                    .entry(edge_type_name.to_string())
4029                    .or_insert(EdgeSnapshot {
4030                        version: 0,
4031                        count: 0,
4032                        lance_version: 0,
4033                    });
4034            current_snap.version += 1;
4035            current_snap.count += entries.len() as u64;
4036            // LanceDB tables don't expose Lance version directly
4037            current_snap.lance_version = 0;
4038
4039            // Note: No CSR invalidation needed. AdjacencyManager's overlay
4040            // already has these edges via dual-write in insert_edge/delete_edge.
4041        }
4042
4043        // 4. Per-label vertex table writes
4044        // Iterate all labels that have either full-row OR partial-write
4045        // data pending. A label may appear in only one of the two maps
4046        // (e.g., all updates on this label were partial-only).
4047        let all_label_ids: std::collections::HashSet<u16> = vertices_by_label
4048            .keys()
4049            .chain(partial_by_label.keys())
4050            .chain(tombstones_by_label.keys())
4051            .copied()
4052            .collect();
4053        for label_id in all_label_ids {
4054            let vertices = vertices_by_label.remove(&label_id).unwrap_or_default();
4055            let label_name = schema
4056                .label_name_by_id(label_id)
4057                .ok_or_else(|| anyhow!("Label ID {} not found", label_id))?;
4058
4059            let ds = self.storage.vertex_dataset(label_name)?;
4060
4061            // Collect inverted index updates before consuming vertices
4062            // Maps: cfg.property -> (added, removed)
4063            type InvertedUpdateMap = HashMap<String, (HashMap<Vid, Vec<String>>, HashSet<Vid>)>;
4064            let mut inverted_updates: InvertedUpdateMap = HashMap::new();
4065
4066            for idx in &schema.indexes {
4067                if let IndexDefinition::Inverted(cfg) = idx
4068                    && cfg.label == label_name
4069                {
4070                    let mut added: HashMap<Vid, Vec<String>> = HashMap::new();
4071                    let mut removed: HashSet<Vid> = HashSet::new();
4072
4073                    for (vid, _labels, props, deleted, _version) in &vertices {
4074                        if *deleted {
4075                            removed.insert(*vid);
4076                        } else if let Some(prop_value) = props.get(&cfg.property) {
4077                            // Extract terms from the property value (List<String>)
4078                            if let Some(arr) = prop_value.as_array() {
4079                                let terms: Vec<String> = arr
4080                                    .iter()
4081                                    .filter_map(|v| v.as_str().map(ToString::to_string))
4082                                    .collect();
4083                                if !terms.is_empty() {
4084                                    added.insert(*vid, terms);
4085                                }
4086                            }
4087                        }
4088                    }
4089                    // Round-12 §B: tombstones no longer in `vertices`;
4090                    // pull them from `tombstones_by_label` for inverted
4091                    // index removal.
4092                    if let Some(tomb_rows) = tombstones_by_label.get(&label_id) {
4093                        for (vid, _) in tomb_rows {
4094                            removed.insert(*vid);
4095                        }
4096                    }
4097
4098                    if !added.is_empty() || !removed.is_empty() {
4099                        inverted_updates.insert(cfg.property.clone(), (added, removed));
4100                    }
4101                }
4102            }
4103
4104            let mut v_data = Vec::new();
4105            let mut d_data = Vec::new();
4106            let mut ver_data = Vec::new();
4107            for (vid, labels, props, deleted, version) in vertices {
4108                v_data.push((vid, labels, props));
4109                d_data.push(deleted);
4110                ver_data.push(version);
4111            }
4112
4113            let backend = self.storage.backend();
4114
4115            // Skip the full-row Append entirely if this label only has
4116            // partial-write rows pending.
4117            if !v_data.is_empty() {
4118                let batch = ds.build_record_batch_with_timestamps(
4119                    &v_data,
4120                    &d_data,
4121                    &ver_data,
4122                    &schema,
4123                    Some(&vertex_created_at),
4124                    Some(&vertex_updated_at),
4125                )?;
4126                ds.write_batch(backend, batch, &schema).await?;
4127            }
4128
4129            // Partial-column batch (Lance MergeInsert path). The flag
4130            // gates whether the routing classified any VIDs as partial;
4131            // outside the flag this collection is always empty so the
4132            // call below is a cheap no-op.
4133            if let Some(partial_rows) = partial_by_label.remove(&label_id)
4134                && !partial_rows.is_empty()
4135            {
4136                let touched_union: std::collections::HashSet<String> = partial_rows
4137                    .iter()
4138                    .flat_map(|(_, _, _, keys)| keys.iter().cloned())
4139                    .collect();
4140                let pairs: Vec<(Vid, Properties)> = partial_rows
4141                    .iter()
4142                    .map(|(vid, props, _, _)| (*vid, props.clone()))
4143                    .collect();
4144                let versions: Vec<u64> = partial_rows.iter().map(|(_, _, v, _)| *v).collect();
4145                let partial_batch = ds.build_partial_record_batch(
4146                    &pairs,
4147                    &versions,
4148                    &touched_union,
4149                    &schema,
4150                    Some(&vertex_updated_at),
4151                )?;
4152                if partial_batch.num_rows() > 0 {
4153                    ds.merge_insert_batch(backend, partial_batch).await?;
4154                }
4155            }
4156
4157            // Tombstone batch (Round-12 §B): always MergeInsert with
4158            // just `_vid`, `_deleted=true`, `_version`, `_updated_at`.
4159            // No partial_lance_writes gating — tombstones never carry
4160            // useful property payload to write. Captured tombstone vids
4161            // also drive `remove_from_vid_labels_index` below.
4162            let tombstone_rows = tombstones_by_label.remove(&label_id).unwrap_or_default();
4163            if !tombstone_rows.is_empty() {
4164                let tomb_batch =
4165                    ds.build_tombstone_partial_batch(&tombstone_rows, Some(&vertex_updated_at))?;
4166                if tomb_batch.num_rows() > 0 {
4167                    ds.merge_insert_batch(backend, tomb_batch).await?;
4168                }
4169            }
4170
4171            ds.ensure_default_indexes(backend).await?;
4172
4173            // Update VidLabelsIndex (if enabled). v_data carries live
4174            // vertices; tombstone removals come from the
4175            // `tombstone_rows` captured above the MergeInsert call.
4176            for ((vid, labels, _props), &deleted) in v_data.iter().zip(d_data.iter()) {
4177                if deleted {
4178                    self.storage.remove_from_vid_labels_index(*vid);
4179                } else {
4180                    self.storage.update_vid_labels_index(*vid, labels.clone());
4181                }
4182            }
4183            for (vid, _) in &tombstone_rows {
4184                self.storage.remove_from_vid_labels_index(*vid);
4185            }
4186
4187            // Update Manifest
4188            let current_snap =
4189                manifest
4190                    .vertices
4191                    .entry(label_name.to_string())
4192                    .or_insert(LabelSnapshot {
4193                        version: 0,
4194                        count: 0,
4195                        lance_version: 0,
4196                    });
4197            current_snap.version += 1;
4198            current_snap.count += v_data.len() as u64;
4199            // LanceDB tables don't expose Lance version directly
4200            current_snap.lance_version = 0;
4201
4202            // Invalidate table cache to ensure next read picks up new version
4203            self.storage.invalidate_table_cache(label_name);
4204
4205            // Apply inverted index updates incrementally
4206            #[cfg(feature = "lance-backend")]
4207            for idx in &schema.indexes {
4208                if let IndexDefinition::Inverted(cfg) = idx
4209                    && cfg.label == label_name
4210                    && let Some((added, removed)) = inverted_updates.get(&cfg.property)
4211                {
4212                    self.storage
4213                        .index_manager()
4214                        .update_inverted_index_incremental(cfg, added, removed)
4215                        .await?;
4216                }
4217            }
4218
4219            // Update UID index with new vertex mappings
4220            // Collect (UniId, Vid) mappings from non-deleted vertices
4221            #[cfg(feature = "lance-backend")]
4222            {
4223                let mut uid_mappings: Vec<(uni_common::core::id::UniId, Vid)> = Vec::new();
4224                for (vid, _labels, props) in &v_data {
4225                    let ext_id = props.get("ext_id").and_then(|v| v.as_str());
4226                    let uid = crate::storage::vertex::VertexDataset::compute_vertex_uid(
4227                        label_name, ext_id, props,
4228                    );
4229                    uid_mappings.push((uid, *vid));
4230                }
4231
4232                if !uid_mappings.is_empty()
4233                    && let Ok(uid_index) = self.storage.uid_index(label_name)
4234                {
4235                    // Stamp mappings with this flush's MVCC version so a later
4236                    // re-create of the same UID deterministically outranks the
4237                    // stale mapping (review C3).
4238                    uid_index
4239                        .write_mapping_versioned(&uid_mappings, current_version)
4240                        .await?;
4241                }
4242            }
4243        }
4244        Ok(FlushOutcome {
4245            manifest,
4246            snapshot_id,
4247        })
4248    }
4249
4250    /// Composition entry that assumes the caller already holds `flush_lock`.
4251    /// Runs rotate + stream + finalize_locked in sequence. Used by
4252    /// [`Writer::flush_to_l1`] (acquires the lock first) and by
4253    /// `commit_transaction_l0`'s post-merge auto-flush branch (which already
4254    /// holds the lock from the commit critical section).
4255    #[instrument(
4256        skip(self),
4257        fields(snapshot_id, mutations_count, size_bytes),
4258        level = "info"
4259    )]
4260    async fn flush_inline_under_lock(&self, name: Option<String>) -> Result<String> {
4261        let start = std::time::Instant::now();
4262
4263        let (initial_size, initial_count) = {
4264            let l0_arc = self.l0_manager.get_current();
4265            let l0 = l0_arc.read();
4266            (l0.estimated_size, l0.mutation_count)
4267        };
4268        tracing::Span::current().record("size_bytes", initial_size);
4269        tracing::Span::current().record("mutations_count", initial_count);
4270
4271        debug!("Starting L0 flush to L1");
4272
4273        // Phases A (WAL pre-flush), B (rotate), C (WAL handoff).
4274        // FlushInProgressGuard lives on RotateOutput and stays alive for
4275        // the full flush — including the finalize_locked call below.
4276        let RotateOutput {
4277            old_l0_arc,
4278            wal_lsn,
4279            current_version,
4280            flush_in_progress_guard: _flush_guard,
4281        } = self.flush_l0_rotate().await?;
4282
4283        // Phases D (L1 collect), E (orphan resolve), F (manifest seed),
4284        // G (Lance writes). Builds the manifest but does NOT publish it.
4285        let FlushOutcome {
4286            manifest,
4287            snapshot_id,
4288        } = self
4289            .flush_stream_l1(old_l0_arc.clone(), wal_lsn, current_version, name)
4290            .await?;
4291
4292        // Phases H..S: publish manifest, complete_flush, WAL truncate,
4293        // property cache clear, last_flush_time, metrics, l1_runs++,
4294        // compaction trigger, index-rebuild scheduling, fork tick.
4295        self.flush_finalize_locked(
4296            old_l0_arc,
4297            wal_lsn,
4298            manifest,
4299            snapshot_id,
4300            initial_size,
4301            initial_count,
4302            start,
4303        )
4304        .await
4305    }
4306
4307    /// Phases H..S of the flush: publish the manifest and run all
4308    /// post-publish bookkeeping. Assumes the caller already holds
4309    /// `flush_lock` — see [`Writer::flush_finalize_now`] for the
4310    /// lock-acquiring variant used by the async finalize path.
4311    #[allow(clippy::too_many_arguments)]
4312    async fn flush_finalize_locked(
4313        &self,
4314        old_l0_arc: Arc<RwLock<L0Buffer>>,
4315        wal_lsn: u64,
4316        manifest: SnapshotManifest,
4317        snapshot_id: String,
4318        initial_size: usize,
4319        initial_count: usize,
4320        start: std::time::Instant,
4321    ) -> Result<String> {
4322        Self::flush_finalize_body(
4323            &self.shared_ctx(),
4324            old_l0_arc,
4325            wal_lsn,
4326            manifest,
4327            snapshot_id,
4328            initial_size,
4329            initial_count,
4330            start,
4331        )
4332        .await
4333    }
4334
4335    /// Phases H..S of the flush, lock-acquiring variant. Used by the
4336    /// async-flush finalizer task (running on a spawned tokio task),
4337    /// which holds neither `&self` nor `flush_lock`. Briefly re-acquires
4338    /// `flush_lock` to serialize the publish boundary, then runs the
4339    /// same body as `flush_finalize_locked` but over a SharedFlushCtx.
4340    #[allow(clippy::too_many_arguments)]
4341    pub(crate) async fn flush_finalize_now(
4342        shared: SharedFlushCtx,
4343        old_l0_arc: Arc<RwLock<L0Buffer>>,
4344        wal_lsn: u64,
4345        manifest: SnapshotManifest,
4346        snapshot_id: String,
4347        initial_size: usize,
4348        initial_count: usize,
4349        start: std::time::Instant,
4350    ) -> Result<String> {
4351        let _flush_lock_guard = shared.flush_lock.clone().lock_owned().await;
4352        Self::flush_finalize_body(
4353            &shared,
4354            old_l0_arc,
4355            wal_lsn,
4356            manifest,
4357            snapshot_id,
4358            initial_size,
4359            initial_count,
4360            start,
4361        )
4362        .await
4363    }
4364
4365    /// Shared body of `flush_finalize_locked` and `flush_finalize_now`.
4366    /// Static over `SharedFlushCtx`; the caller is responsible for
4367    /// holding `flush_lock`.
4368    #[allow(clippy::too_many_arguments)]
4369    async fn flush_finalize_body(
4370        shared: &SharedFlushCtx,
4371        old_l0_arc: Arc<RwLock<L0Buffer>>,
4372        wal_lsn: u64,
4373        mut manifest: SnapshotManifest,
4374        snapshot_id: String,
4375        initial_size: usize,
4376        initial_count: usize,
4377        start: std::time::Instant,
4378    ) -> Result<String> {
4379        // Parent-snapshot fixup. The stream phase built `manifest` with
4380        // parent_snapshot set from cached_manifest at stream time. If
4381        // OTHER flushes (sync or async) have finalized since then,
4382        // cached_manifest has advanced. Re-link this manifest to the
4383        // current cached chain so we don't orphan their data when we
4384        // overwrite cached_manifest below.
4385        let current_parent_id = shared
4386            .cached_manifest
4387            .lock()
4388            .as_ref()
4389            .map(|m| m.snapshot_id.clone());
4390        if current_parent_id.is_some() && manifest.parent_snapshot != current_parent_id {
4391            manifest.parent_snapshot = current_parent_id;
4392            metrics::counter!("uni_flush_parent_chain_fixups_total").increment(1);
4393        }
4394
4395        // H. Publish manifest (body first, then pointer — recovery is
4396        // idempotent if we crash between the two).
4397        shared
4398            .storage
4399            .snapshot_manager()
4400            .save_snapshot(&manifest)
4401            .await?;
4402        shared
4403            .storage
4404            .snapshot_manager()
4405            .set_latest_snapshot(&manifest.snapshot_id)
4406            .await?;
4407
4408        // H2. Durability barrier (review C4). `save_snapshot` / `set_latest_snapshot`
4409        // wrote the manifest body and the `catalog/latest` pointer through the
4410        // object store, which does NOT fsync. WAL truncation (K, below) removes
4411        // the only other durable copy of this flush's data, so a crash after K
4412        // but before the OS flushed those writes would lose the snapshot —
4413        // recovery could not resolve `latest`. Make them durable now (local-fs
4414        // only; remote stores provide their own durability on `put`).
4415        crate::snapshot::manager::fsync_snapshot_pointer(
4416            shared.storage.local_fs_root().as_deref(),
4417            &manifest.snapshot_id,
4418        )
4419        .map_err(|e| {
4420            anyhow!(
4421                "fsync snapshot {} before WAL truncate: {}",
4422                manifest.snapshot_id,
4423                e
4424            )
4425        })?;
4426
4427        // I. Cache manifest for next flush to avoid re-reading from object store.
4428        *shared.cached_manifest.lock() = Some(manifest.clone());
4429
4430        // L. Invalidate the property cache BEFORE removing the flushed buffer
4431        // from the L0 chain (Bug #10). `clear_cache` has no dependency on the
4432        // complete_flush (J) / WAL-truncate (K) steps below, so clearing it
4433        // first closes the non-monotonic-read window: once the buffer leaves
4434        // the L0 chain at J a freshly-written value would otherwise miss the
4435        // chain and fall through to a stale cache entry. By the time finalize
4436        // runs the streamed rows are already durable in L1, so a post-clear
4437        // read falls through to fresh storage instead. The finalizer holds
4438        // `flush_lock` throughout, so reordering L ahead of J is safe.
4439        if let Some(ref pm) = shared.property_manager {
4440            pm.clear_cache().await;
4441        }
4442
4443        // J. Complete flush: remove old L0 from pending_flush. MUST happen
4444        // BEFORE WAL truncation so min_pending_wal_lsn is accurate.
4445        shared.l0_manager.complete_flush(&old_l0_arc);
4446
4447        // Test-only seam (no-op without the `failpoints` feature): pause AFTER
4448        // complete_flush removed the buffer from the L0 chain (J) but BEFORE
4449        // WAL truncation (K). The property cache is already cleared (L moved
4450        // ahead of J above), so a read in this window falls through to fresh
4451        // L1 storage rather than a stale cache entry (Bug #10 — non-monotonic
4452        // read after flush finalize).
4453        fail::fail_point!("flush::after-complete-before-cache-clear");
4454
4455        // K. Truncate WAL up to the safe LSN.
4456        let wal_handle = shared.l0_manager.get_current().read().wal.clone();
4457        if let Some(w) = wal_handle {
4458            let safe_lsn = shared
4459                .l0_manager
4460                .min_pending_wal_lsn()
4461                .map(|min_pending| min_pending.min(wal_lsn))
4462                .unwrap_or(wal_lsn);
4463            w.truncate_before(safe_lsn).await?;
4464        }
4465
4466        // M. Reset last flush time for time-based auto-flush.
4467        *shared.last_flush_time.lock() = std::time::Instant::now();
4468
4469        info!(
4470            snapshot_id,
4471            mutations_count = initial_count,
4472            size_bytes = initial_size,
4473            "L0 flush to L1 completed successfully"
4474        );
4475        metrics::histogram!("uni_flush_duration_seconds").record(start.elapsed().as_secs_f64());
4476        metrics::counter!("uni_flush_bytes_total").increment(initial_size as u64);
4477        metrics::counter!("uni_flush_rows_total").increment(initial_count as u64);
4478
4479        // P. Increment flush generation counter for write throttling.
4480        {
4481            let mut status = uni_common::sync::acquire_mutex(
4482                &shared.storage.compaction_status,
4483                "compaction_status",
4484            )?;
4485            status.l1_runs += 1;
4486        }
4487
4488        // Q. Trigger CSR compaction if enough frozen segments have accumulated.
4489        let am = shared.adjacency_manager.clone();
4490        if am.should_compact(shared.compaction_config.frozen_segments_compact_threshold) {
4491            let previous_still_running = {
4492                let guard = shared.compaction_handle.read();
4493                guard.as_ref().is_some_and(|h| !h.is_finished())
4494            };
4495            if previous_still_running {
4496                info!("Skipping compaction: previous compaction still in progress");
4497            } else {
4498                let handle = tokio::spawn(async move {
4499                    am.compact();
4500                });
4501                *shared.compaction_handle.write() = Some(handle);
4502            }
4503        }
4504
4505        // R. Post-flush: check if any indexes need rebuilding based on thresholds.
4506        if shared.auto_rebuild_enabled
4507            && let Some(rebuild_mgr) = shared.index_rebuild_manager.get()
4508        {
4509            Self::schedule_index_rebuilds_if_needed_static(
4510                &manifest,
4511                rebuild_mgr.clone(),
4512                shared.schema_manager.clone(),
4513                shared.index_rebuild_config.clone(),
4514            );
4515        }
4516
4517        // S. Emit fork-fragment observability after a successful forked flush.
4518        Self::tick_fork_fragment_observability_static(
4519            shared.fork_id,
4520            shared.fork_flush_count.clone(),
4521            shared.fork_fragment_warn_fired.clone(),
4522            shared.fork_fragment_warn_threshold,
4523        );
4524
4525        Ok(snapshot_id)
4526    }
4527
4528    /// Increment fork-flush bookkeeping and fire the fragment warn
4529    /// once if the threshold is crossed.
4530    ///
4531    /// Each flush typically appends ~1 fragment per touched dataset on
4532    /// the fork's branches; without compaction (deferred to Phase 5)
4533    /// long-lived heavy-write forks degrade. The flush count is a
4534    /// proxy for actual fragment growth — reading
4535    /// `Dataset::manifest().fragments.len()` per dataset would add a
4536    /// per-flush object-store roundtrip on the hot commit path, which
4537    /// is too costly for a purely observational guard rail.
4538    ///
4539    /// No-op for primary writers (`fork_id == None`).
4540    #[allow(dead_code)] // called by tests; production path uses _static
4541    pub(crate) fn tick_fork_fragment_observability(&self) {
4542        Self::tick_fork_fragment_observability_static(
4543            self.fork_id,
4544            self.fork_flush_count.clone(),
4545            self.fork_fragment_warn_fired.clone(),
4546            self.config.fork_fragment_warn_threshold,
4547        );
4548    }
4549
4550    /// Static variant of [`Writer::tick_fork_fragment_observability`].
4551    /// Used by the async-flush finalize path, where we hold a
4552    /// [`SharedFlushCtx`] bundle of Arcs rather than `&Writer`.
4553    pub(crate) fn tick_fork_fragment_observability_static(
4554        fork_id: Option<ForkId>,
4555        fork_flush_count: Arc<AtomicU64>,
4556        fork_fragment_warn_fired: Arc<AtomicBool>,
4557        warn_threshold: usize,
4558    ) {
4559        let Some(fork_id) = fork_id else { return };
4560        // `Relaxed` is sufficient: observational counter, no synchronizes-with.
4561        let new_count = fork_flush_count.fetch_add(1, Ordering::Relaxed) + 1;
4562        let fork_label = fork_id.to_string();
4563        metrics::gauge!(
4564            "uni_fork_l1_flushes",
4565            "fork" => fork_label.clone(),
4566        )
4567        .set(new_count as f64);
4568        let threshold = warn_threshold as u64;
4569        if !fork_fragment_warn_fired.load(Ordering::Relaxed)
4570            && threshold > 0
4571            && new_count >= threshold
4572        {
4573            fork_fragment_warn_fired.store(true, Ordering::Relaxed);
4574            tracing::warn!(
4575                fork = %fork_label,
4576                flush_count = new_count,
4577                threshold,
4578                "fork has exceeded the L1 flush-count threshold; \
4579                 fork compaction is deferred to Phase 5 — consider \
4580                 drop+recreate or promotion to bound fragment growth"
4581            );
4582        }
4583    }
4584
4585    /// Check rebuild thresholds and schedule background index rebuilds for
4586    /// labels that exceed growth or age limits. Marks affected indexes as
4587    /// `Stale` and spawns an async task to schedule the rebuild.
4588    #[allow(dead_code)] // production path uses _static; kept as the
4589    // documented instance entry point.
4590    fn schedule_index_rebuilds_if_needed(
4591        &self,
4592        manifest: &SnapshotManifest,
4593        rebuild_mgr: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
4594    ) {
4595        Self::schedule_index_rebuilds_if_needed_static(
4596            manifest,
4597            rebuild_mgr,
4598            self.schema_manager.clone(),
4599            self.config.index_rebuild.clone(),
4600        );
4601    }
4602
4603    /// Static variant of [`Writer::schedule_index_rebuilds_if_needed`].
4604    /// Used by the async-flush finalize path, where we hold the
4605    /// [`SchemaManager`] via `SharedFlushCtx` rather than `&Writer`.
4606    pub(crate) fn schedule_index_rebuilds_if_needed_static(
4607        manifest: &SnapshotManifest,
4608        rebuild_mgr: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
4609        schema_manager: Arc<uni_common::core::schema::SchemaManager>,
4610        index_rebuild_config: uni_common::config::IndexRebuildConfig,
4611    ) {
4612        let checker =
4613            crate::storage::index_rebuild::RebuildTriggerChecker::new(index_rebuild_config);
4614        let schema = schema_manager.schema();
4615        let labels = checker.labels_needing_rebuild(manifest, &schema.indexes);
4616
4617        if labels.is_empty() {
4618            return;
4619        }
4620
4621        // Mark affected indexes as Stale
4622        for label in &labels {
4623            for idx in &schema.indexes {
4624                if idx.label() == label {
4625                    let _ = schema_manager.update_index_metadata(idx.name(), |m| {
4626                        m.status = uni_common::core::schema::IndexStatus::Stale;
4627                    });
4628                }
4629            }
4630        }
4631
4632        tokio::spawn(async move {
4633            if let Err(e) = rebuild_mgr.schedule(labels).await {
4634                tracing::warn!("Failed to schedule index rebuild: {e}");
4635            }
4636        });
4637    }
4638}
4639
4640/// `FinalizeFn` implementation that the `FlushCoordinator` invokes from
4641/// its single-task finalizer loop. Unit struct on purpose: it must NOT
4642/// hold `Arc<Writer>` (that would create a reference cycle Writer ->
4643/// FlushCoordinator -> Arc<dyn FinalizeFn> -> Writer). All state needed
4644/// for finalize travels in via `SharedFlushCtx`.
4645pub(crate) struct WriterFinalizer;
4646
4647impl FinalizeFn for WriterFinalizer {
4648    fn finalize<'a>(
4649        &'a self,
4650        rotated: RotatedFlush,
4651        outcome: AsyncFlushOutcome,
4652        shared: SharedFlushCtx,
4653    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<String>> + Send + 'a>> {
4654        Box::pin(async move {
4655            // Read initial_size / initial_count from the rotated L0 so
4656            // we don't have to plumb them through the coordinator
4657            // submission. The buffer is still alive in pending_flush
4658            // until `complete_flush` (J) below pops it.
4659            let (initial_size, initial_count) = {
4660                let l0 = rotated.old_l0_arc.read();
4661                (l0.estimated_size, l0.mutation_count)
4662            };
4663            let result = Writer::flush_finalize_now(
4664                shared,
4665                rotated.old_l0_arc.clone(),
4666                rotated.wal_lsn,
4667                outcome.new_manifest,
4668                outcome.snapshot_id,
4669                initial_size,
4670                initial_count,
4671                std::time::Instant::now(),
4672            )
4673            .await;
4674            // `rotated` (permit + flush_in_progress_guard) drops here.
4675            drop(rotated.permit);
4676            result
4677        })
4678    }
4679
4680    fn finalize_failure<'a>(
4681        &'a self,
4682        rotated: RotatedFlush,
4683        err: anyhow::Error,
4684        _shared: SharedFlushCtx,
4685    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Error> + Send + 'a>> {
4686        Box::pin(async move {
4687            tracing::warn!(
4688                error = %err,
4689                seq = rotated.seq,
4690                "async flush stream failed; old L0 remains in pending_flush, \
4691                 WAL retains its data, recovery via WAL replay on restart"
4692            );
4693            metrics::counter!("uni_flush_failures_total").increment(1);
4694            // Permit + guard drop here so back-pressure releases even on
4695            // failure.
4696            drop(rotated.permit);
4697            err
4698        })
4699    }
4700}
4701
4702#[cfg(test)]
4703mod tests {
4704    use super::*;
4705    use tempfile::tempdir;
4706
4707    /// Test that commit_transaction writes mutations to WAL before merging to main L0.
4708    /// This verifies fix for issue #137 (transaction commit atomicity).
4709    #[tokio::test]
4710    async fn test_commit_transaction_wal_before_merge() -> Result<()> {
4711        use crate::runtime::wal::WriteAheadLog;
4712        use crate::storage::manager::StorageManager;
4713        use object_store::local::LocalFileSystem;
4714        use object_store::path::Path as ObjectStorePath;
4715        use uni_common::core::schema::SchemaManager;
4716
4717        let dir = tempdir()?;
4718        let path = dir.path().to_str().unwrap();
4719        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
4720        let schema_path = ObjectStorePath::from("schema.json");
4721
4722        let schema_manager =
4723            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
4724        let _label_id = schema_manager.add_label("Test")?;
4725        schema_manager.save().await?;
4726
4727        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
4728
4729        // Create WAL for main L0
4730        let wal_path = ObjectStorePath::from("wal");
4731        let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
4732
4733        let writer = Writer::new_with_config(
4734            storage.clone(),
4735            schema_manager.clone(),
4736            1,
4737            UniConfig::default(),
4738            Some(wal),
4739            None,
4740        )
4741        .await?;
4742
4743        // Begin transaction — create a transaction L0
4744        let tx_l0 = writer.create_transaction_l0();
4745
4746        // Insert data in transaction
4747        let vid_a = writer.next_vid().await?;
4748        let vid_b = writer.next_vid().await?;
4749
4750        let mut props = std::collections::HashMap::new();
4751        props.insert("test".to_string(), Value::String("data".to_string()));
4752
4753        writer
4754            .insert_vertex_with_labels(vid_a, props.clone(), &["Test".to_string()], Some(&tx_l0))
4755            .await?;
4756        writer
4757            .insert_vertex_with_labels(
4758                vid_b,
4759                std::collections::HashMap::new(),
4760                &["Test".to_string()],
4761                Some(&tx_l0),
4762            )
4763            .await?;
4764
4765        let eid = writer.next_eid(1).await?;
4766        writer
4767            .insert_edge(
4768                vid_a,
4769                vid_b,
4770                1,
4771                eid,
4772                std::collections::HashMap::new(),
4773                None,
4774                Some(&tx_l0),
4775            )
4776            .await?;
4777
4778        // Get WAL before commit
4779        let l0 = writer.l0_manager.get_current();
4780        let wal = l0.read().wal.clone().expect("Main L0 should have WAL");
4781        let mutations_before = wal.replay().await?;
4782        let count_before = mutations_before.len();
4783
4784        // Commit transaction - this should write to WAL first
4785        let writer = Arc::new(writer);
4786        writer.commit_transaction_l0(tx_l0).await?;
4787
4788        // Verify WAL has the new mutations
4789        let mutations_after = wal.replay().await?;
4790        assert!(
4791            mutations_after.len() > count_before,
4792            "WAL should contain transaction mutations after commit"
4793        );
4794
4795        // Verify mutations are in correct order: vertices first, then edges
4796        let new_mutations: Vec<_> = mutations_after.into_iter().skip(count_before).collect();
4797
4798        let mut saw_vertex_a = false;
4799        let mut saw_vertex_b = false;
4800        let mut saw_edge = false;
4801
4802        for mutation in &new_mutations {
4803            match mutation {
4804                crate::runtime::wal::Mutation::InsertVertex { vid, .. } => {
4805                    if *vid == vid_a {
4806                        saw_vertex_a = true;
4807                    }
4808                    if *vid == vid_b {
4809                        saw_vertex_b = true;
4810                    }
4811                    // Vertices should come before edges
4812                    assert!(!saw_edge, "Vertices should be logged to WAL before edges");
4813                }
4814                crate::runtime::wal::Mutation::InsertEdge { eid: e, .. } => {
4815                    if *e == eid {
4816                        saw_edge = true;
4817                    }
4818                    // Edges should come after vertices
4819                    assert!(
4820                        saw_vertex_a && saw_vertex_b,
4821                        "Edge should be logged after both vertices"
4822                    );
4823                }
4824                _ => {}
4825            }
4826        }
4827
4828        assert!(saw_vertex_a, "Vertex A should be in WAL");
4829        assert!(saw_vertex_b, "Vertex B should be in WAL");
4830        assert!(saw_edge, "Edge should be in WAL");
4831
4832        // Verify data is also in main L0
4833        let l0_read = l0.read();
4834        assert!(
4835            l0_read.vertex_properties.contains_key(&vid_a),
4836            "Vertex A should be in main L0"
4837        );
4838        assert!(
4839            l0_read.vertex_properties.contains_key(&vid_b),
4840            "Vertex B should be in main L0"
4841        );
4842        assert!(
4843            l0_read.edge_endpoints.contains_key(&eid),
4844            "Edge should be in main L0"
4845        );
4846
4847        Ok(())
4848    }
4849
4850    /// Test that failed WAL flush leaves transaction intact for retry or rollback.
4851    #[tokio::test]
4852    async fn test_commit_transaction_wal_failure_rollback() -> Result<()> {
4853        use crate::runtime::wal::WriteAheadLog;
4854        use crate::storage::manager::StorageManager;
4855        use object_store::local::LocalFileSystem;
4856        use object_store::path::Path as ObjectStorePath;
4857        use uni_common::core::schema::SchemaManager;
4858
4859        let dir = tempdir()?;
4860        let path = dir.path().to_str().unwrap();
4861        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
4862        let schema_path = ObjectStorePath::from("schema.json");
4863
4864        let schema_manager =
4865            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
4866        let _label_id = schema_manager.add_label("Test")?;
4867        let _baseline_label_id = schema_manager.add_label("Baseline")?;
4868        let _txdata_label_id = schema_manager.add_label("TxData")?;
4869        schema_manager.save().await?;
4870
4871        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
4872
4873        // Create WAL for main L0
4874        let wal_path = ObjectStorePath::from("wal");
4875        let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
4876
4877        let writer = Writer::new_with_config(
4878            storage.clone(),
4879            schema_manager.clone(),
4880            1,
4881            UniConfig::default(),
4882            Some(wal),
4883            None,
4884        )
4885        .await?;
4886
4887        // Insert baseline data (outside transaction)
4888        let baseline_vid = writer.next_vid().await?;
4889        writer
4890            .insert_vertex_with_labels(
4891                baseline_vid,
4892                [("baseline".to_string(), Value::Bool(true))]
4893                    .into_iter()
4894                    .collect(),
4895                &["Baseline".to_string()],
4896                None,
4897            )
4898            .await?;
4899
4900        // Begin transaction — create a transaction L0
4901        let tx_l0 = writer.create_transaction_l0();
4902
4903        // Insert data in transaction
4904        let tx_vid = writer.next_vid().await?;
4905        writer
4906            .insert_vertex_with_labels(
4907                tx_vid,
4908                [("tx_data".to_string(), Value::Bool(true))]
4909                    .into_iter()
4910                    .collect(),
4911                &["TxData".to_string()],
4912                Some(&tx_l0),
4913            )
4914            .await?;
4915
4916        // Capture main L0 state before rollback
4917        let l0 = writer.l0_manager.get_current();
4918        let vertex_count_before = l0.read().vertex_properties.len();
4919
4920        // Rollback transaction (simulating what would happen after WAL flush failure)
4921        drop(tx_l0);
4922
4923        // Verify main L0 is unchanged
4924        let vertex_count_after = l0.read().vertex_properties.len();
4925        assert_eq!(
4926            vertex_count_before, vertex_count_after,
4927            "Main L0 should not change after rollback"
4928        );
4929
4930        // Baseline should still be present
4931        assert!(
4932            l0.read().vertex_properties.contains_key(&baseline_vid),
4933            "Baseline data should remain"
4934        );
4935
4936        // Transaction data should NOT be in main L0
4937        assert!(
4938            !l0.read().vertex_properties.contains_key(&tx_vid),
4939            "Transaction data should not be in main L0 after rollback"
4940        );
4941
4942        Ok(())
4943    }
4944
4945    /// Test that batch insert with shared labels does not clone labels per vertex.
4946    /// This verifies fix for issue #161 (redundant label cloning).
4947    #[tokio::test]
4948    async fn test_batch_insert_shared_labels() -> Result<()> {
4949        use crate::storage::manager::StorageManager;
4950        use object_store::local::LocalFileSystem;
4951        use object_store::path::Path as ObjectStorePath;
4952        use uni_common::core::schema::SchemaManager;
4953
4954        let dir = tempdir()?;
4955        let path = dir.path().to_str().unwrap();
4956        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
4957        let schema_path = ObjectStorePath::from("schema.json");
4958
4959        let schema_manager =
4960            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
4961        let _label_id = schema_manager.add_label("Person")?;
4962        schema_manager.save().await?;
4963
4964        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
4965
4966        let writer = Writer::new(storage.clone(), schema_manager.clone(), 1).await?;
4967
4968        // Shared labels - should not be cloned per vertex
4969        let labels = &["Person".to_string()];
4970
4971        // Insert batch of vertices with same labels
4972        let mut vids = Vec::new();
4973        for i in 0..100 {
4974            let vid = writer.next_vid().await?;
4975            let mut props = std::collections::HashMap::new();
4976            props.insert("id".to_string(), Value::Int(i));
4977            writer
4978                .insert_vertex_with_labels(vid, props, labels, None)
4979                .await?;
4980            vids.push(vid);
4981        }
4982
4983        // Verify all vertices have the correct labels
4984        let l0 = writer.l0_manager.get_current();
4985        for vid in vids {
4986            let l0_guard = l0.read();
4987            let vertex_labels = l0_guard.vertex_labels.get(&vid);
4988            assert!(vertex_labels.is_some(), "Vertex should have labels");
4989            assert_eq!(
4990                vertex_labels.unwrap(),
4991                &vec!["Person".to_string()],
4992                "Labels should match"
4993            );
4994        }
4995
4996        Ok(())
4997    }
4998
4999    /// Test that estimated_size tracks mutations correctly and approximates size_bytes().
5000    /// This verifies fix for issue #147 (O(V+E) size_bytes() in metrics).
5001    #[tokio::test]
5002    async fn test_estimated_size_tracks_mutations() -> Result<()> {
5003        use crate::storage::manager::StorageManager;
5004        use object_store::local::LocalFileSystem;
5005        use object_store::path::Path as ObjectStorePath;
5006        use uni_common::core::schema::SchemaManager;
5007
5008        let dir = tempdir()?;
5009        let path = dir.path().to_str().unwrap();
5010        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5011        let schema_path = ObjectStorePath::from("schema.json");
5012
5013        let schema_manager =
5014            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5015        let _label_id = schema_manager.add_label("Test")?;
5016        schema_manager.save().await?;
5017
5018        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5019
5020        let writer = Writer::new(storage.clone(), schema_manager.clone(), 1).await?;
5021
5022        let l0 = writer.l0_manager.get_current();
5023
5024        // Initial state should be empty
5025        let initial_estimated = l0.read().estimated_size;
5026        let initial_actual = l0.read().size_bytes();
5027        assert_eq!(initial_estimated, 0, "Initial estimated_size should be 0");
5028        assert_eq!(initial_actual, 0, "Initial size_bytes should be 0");
5029
5030        // Insert vertices with properties
5031        let mut vids = Vec::new();
5032        for i in 0..10 {
5033            let vid = writer.next_vid().await?;
5034            let mut props = std::collections::HashMap::new();
5035            props.insert("name".to_string(), Value::String(format!("vertex_{}", i)));
5036            props.insert("index".to_string(), Value::Int(i));
5037            writer
5038                .insert_vertex_with_labels(vid, props, &[], None)
5039                .await?;
5040            vids.push(vid);
5041        }
5042
5043        // Verify estimated_size grew
5044        let after_vertices_estimated = l0.read().estimated_size;
5045        let after_vertices_actual = l0.read().size_bytes();
5046        assert!(
5047            after_vertices_estimated > 0,
5048            "estimated_size should grow after insertions"
5049        );
5050
5051        // Verify estimated_size is within reasonable bounds of actual size (within 2x)
5052        let ratio = after_vertices_estimated as f64 / after_vertices_actual as f64;
5053        assert!(
5054            (0.5..=2.0).contains(&ratio),
5055            "estimated_size ({}) should be within 2x of size_bytes ({}), ratio: {}",
5056            after_vertices_estimated,
5057            after_vertices_actual,
5058            ratio
5059        );
5060
5061        // Insert edges with a simple edge type
5062        let edge_type = 1u32;
5063        for i in 0..9 {
5064            let eid = writer.next_eid(edge_type).await?;
5065            writer
5066                .insert_edge(
5067                    vids[i],
5068                    vids[i + 1],
5069                    edge_type,
5070                    eid,
5071                    std::collections::HashMap::new(),
5072                    Some("NEXT".to_string()),
5073                    None,
5074                )
5075                .await?;
5076        }
5077
5078        // Verify estimated_size grew further
5079        let after_edges_estimated = l0.read().estimated_size;
5080        let after_edges_actual = l0.read().size_bytes();
5081        assert!(
5082            after_edges_estimated > after_vertices_estimated,
5083            "estimated_size should grow after edge insertions"
5084        );
5085
5086        // Verify still within reasonable bounds
5087        let ratio = after_edges_estimated as f64 / after_edges_actual as f64;
5088        assert!(
5089            (0.5..=2.0).contains(&ratio),
5090            "estimated_size ({}) should be within 2x of size_bytes ({}), ratio: {}",
5091            after_edges_estimated,
5092            after_edges_actual,
5093            ratio
5094        );
5095
5096        Ok(())
5097    }
5098
5099    /// Test that flushing WAL on a writer with no mutations succeeds cleanly.
5100    #[tokio::test]
5101    async fn test_flush_wal_empty_l0_is_noop() -> Result<()> {
5102        use crate::runtime::wal::WriteAheadLog;
5103        use crate::storage::manager::StorageManager;
5104        use object_store::local::LocalFileSystem;
5105        use object_store::path::Path as ObjectStorePath;
5106        use uni_common::core::schema::SchemaManager;
5107
5108        let dir = tempdir()?;
5109        let path = dir.path().to_str().unwrap();
5110        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5111        let schema_path = ObjectStorePath::from("schema.json");
5112
5113        let schema_manager =
5114            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5115        schema_manager.save().await?;
5116
5117        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5118
5119        let wal_path = ObjectStorePath::from("wal");
5120        let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
5121
5122        let writer = Writer::new_with_config(
5123            storage.clone(),
5124            schema_manager.clone(),
5125            1,
5126            UniConfig::default(),
5127            Some(wal.clone()),
5128            None,
5129        )
5130        .await?;
5131
5132        // Flush with no mutations — should succeed cleanly
5133        let lsn = writer.flush_wal().await?;
5134        // LSN should be 0 or 1 (no real mutations flushed)
5135        assert!(lsn <= 1, "Empty flush should produce low LSN, got {}", lsn);
5136
5137        Ok(())
5138    }
5139
5140    /// Test that transaction data does not leak into main L0 without commit.
5141    #[tokio::test]
5142    async fn test_transaction_isolation_without_commit() -> Result<()> {
5143        use crate::runtime::wal::WriteAheadLog;
5144        use crate::storage::manager::StorageManager;
5145        use object_store::local::LocalFileSystem;
5146        use object_store::path::Path as ObjectStorePath;
5147        use uni_common::core::schema::SchemaManager;
5148
5149        let dir = tempdir()?;
5150        let path = dir.path().to_str().unwrap();
5151        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5152        let schema_path = ObjectStorePath::from("schema.json");
5153
5154        let schema_manager =
5155            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5156        let _label_id = schema_manager.add_label("Person")?;
5157        schema_manager.save().await?;
5158
5159        let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5160
5161        let wal_path = ObjectStorePath::from("wal");
5162        let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
5163
5164        let writer = Writer::new_with_config(
5165            storage.clone(),
5166            schema_manager.clone(),
5167            1,
5168            UniConfig::default(),
5169            Some(wal),
5170            None,
5171        )
5172        .await?;
5173
5174        // Create transaction L0
5175        let tx_l0 = writer.create_transaction_l0();
5176
5177        // Insert vertex into transaction L0
5178        let vid = writer.next_vid().await?;
5179        writer
5180            .insert_vertex_with_labels(
5181                vid,
5182                [("name".to_string(), Value::String("Ghost".to_string()))]
5183                    .into_iter()
5184                    .collect(),
5185                &["Person".to_string()],
5186                Some(&tx_l0),
5187            )
5188            .await?;
5189
5190        // Verify data is in transaction L0
5191        assert!(
5192            tx_l0.read().vertex_properties.contains_key(&vid),
5193            "Transaction L0 should contain the vertex"
5194        );
5195
5196        // Verify data is NOT in main L0
5197        let main_l0 = writer.l0_manager.get_current();
5198        assert!(
5199            !main_l0.read().vertex_properties.contains_key(&vid),
5200            "Main L0 should NOT contain uncommitted transaction data"
5201        );
5202
5203        // Drop transaction without committing — data should be lost
5204        drop(tx_l0);
5205
5206        // Main L0 still should not have it
5207        assert!(
5208            !main_l0.read().vertex_properties.contains_key(&vid),
5209            "Main L0 should remain clean after dropped transaction"
5210        );
5211
5212        Ok(())
5213    }
5214
5215    /// Phase 2 Day 12: the fork-fragment warn fires exactly once when
5216    /// the flush count crosses the configured threshold and stays
5217    /// silent on subsequent flushes for the lifetime of the writer.
5218    /// Primary writers (`fork_id == None`) never fire it.
5219    ///
5220    /// Tested directly against `tick_fork_fragment_observability` so
5221    /// the contract is locked in independently of the broader
5222    /// `flush_to_l1` path (the end-to-end fork-flush path is blocked
5223    /// on Day 10's on-the-fly schema overlay growth).
5224    #[tokio::test]
5225    async fn fork_fragment_warn_fires_once_then_silences() -> Result<()> {
5226        use crate::storage::manager::StorageManager;
5227        use object_store::local::LocalFileSystem;
5228        use object_store::path::Path as ObjectStorePath;
5229        use uni_common::core::fork::ForkId;
5230        use uni_common::core::schema::SchemaManager;
5231
5232        let dir = tempdir()?;
5233        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5234        let schema_path = ObjectStorePath::from("schema.json");
5235        let schema_manager =
5236            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5237        let storage = Arc::new(
5238            StorageManager::new(dir.path().to_str().unwrap(), schema_manager.clone()).await?,
5239        );
5240
5241        let config = UniConfig {
5242            fork_fragment_warn_threshold: 3,
5243            ..Default::default()
5244        };
5245        let mut writer =
5246            Writer::new_with_config(storage, schema_manager, 1, config, None, None).await?;
5247
5248        // Primary path: never fires.
5249        for _ in 0..10 {
5250            writer.tick_fork_fragment_observability();
5251        }
5252        assert!(!writer.fork_fragment_warn_fired.load(Ordering::Relaxed));
5253        assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 0);
5254
5255        // Fork path: tag and tick. Below threshold → no fire.
5256        writer.fork_id = Some(ForkId::new());
5257        writer.tick_fork_fragment_observability();
5258        writer.tick_fork_fragment_observability();
5259        assert!(!writer.fork_fragment_warn_fired.load(Ordering::Relaxed));
5260        assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 2);
5261
5262        // Crossing threshold → fires once.
5263        writer.tick_fork_fragment_observability();
5264        assert!(writer.fork_fragment_warn_fired.load(Ordering::Relaxed));
5265        assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 3);
5266
5267        // Subsequent ticks bump the gauge but do not re-fire.
5268        let fired_after = writer.fork_fragment_warn_fired.load(Ordering::Relaxed);
5269        for _ in 0..5 {
5270            writer.tick_fork_fragment_observability();
5271        }
5272        assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 8);
5273        assert_eq!(
5274            writer.fork_fragment_warn_fired.load(Ordering::Relaxed),
5275            fired_after
5276        );
5277
5278        Ok(())
5279    }
5280
5281    /// The hot-path mutators must not write to any `Writer` struct field.
5282    /// Phase 2 of the refactor
5283    /// gave them `&self` receivers, which the compiler enforces against
5284    /// direct `self.x = y` assignment — but interior-mutable writes
5285    /// (Mutex/Atomic/OnceLock) still compile. This regression test snapshots
5286    /// every potentially-writable field, calls each hot-path mutator, and
5287    /// asserts no field changed.
5288    ///
5289    /// Cold-path methods (`flush_to_l1`, `commit_transaction_l0`,
5290    /// `tick_fork_fragment_observability`) DO mutate fields by design and
5291    /// are intentionally out of scope here.
5292    #[tokio::test]
5293    async fn hot_path_mutators_do_not_change_writer_fields() -> Result<()> {
5294        use crate::storage::manager::StorageManager;
5295        use object_store::local::LocalFileSystem;
5296        use object_store::path::Path as ObjectStorePath;
5297        use uni_common::core::schema::SchemaManager;
5298
5299        let dir = tempdir()?;
5300        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5301        let schema_path = ObjectStorePath::from("schema.json");
5302        let schema_manager =
5303            Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5304        schema_manager.add_label("Person")?;
5305        schema_manager.save().await?;
5306        let storage = Arc::new(
5307            StorageManager::new(dir.path().to_str().unwrap(), schema_manager.clone()).await?,
5308        );
5309
5310        let writer =
5311            Writer::new_with_config(storage, schema_manager, 1, UniConfig::default(), None, None)
5312                .await?;
5313
5314        /// Captures every `Writer` field that *could* be written by a
5315        /// hot-path mutator (i.e., every non-Arc, non-immutable-after-
5316        /// construction field). Arc'd substructures (`l0_manager`,
5317        /// `storage`, etc.) are intentionally not checked — they are
5318        /// re-pointed only at construction.
5319        #[derive(Debug, PartialEq)]
5320        struct Snapshot {
5321            last_flush_time: std::time::Instant,
5322            cached_manifest_some: bool,
5323            fork_flush_count: u64,
5324            fork_fragment_warn_fired: bool,
5325            xervo_runtime_some: bool,
5326            index_rebuild_manager_some: bool,
5327            fork_id: Option<ForkId>,
5328        }
5329
5330        fn snap(w: &Writer) -> Snapshot {
5331            Snapshot {
5332                last_flush_time: *w.last_flush_time.lock(),
5333                cached_manifest_some: w.cached_manifest.lock().is_some(),
5334                fork_flush_count: w.fork_flush_count.load(Ordering::Relaxed),
5335                fork_fragment_warn_fired: w.fork_fragment_warn_fired.load(Ordering::Relaxed),
5336                xervo_runtime_some: w.xervo_runtime.get().is_some(),
5337                index_rebuild_manager_some: w.index_rebuild_manager.get().is_some(),
5338                fork_id: w.fork_id,
5339            }
5340        }
5341
5342        // 1. insert_vertex_with_labels
5343        let before = snap(&writer);
5344        let vid = writer.next_vid().await?;
5345        writer
5346            .insert_vertex_with_labels(vid, Properties::new(), &["Person".to_string()], None)
5347            .await?;
5348        assert_eq!(
5349            snap(&writer),
5350            before,
5351            "insert_vertex_with_labels mutated a Writer field"
5352        );
5353
5354        // 2. insert_vertices_batch
5355        let before = snap(&writer);
5356        let vids = writer.allocate_vids(2).await?;
5357        writer
5358            .insert_vertices_batch(
5359                vids,
5360                vec![Properties::new(), Properties::new()],
5361                vec!["Person".into()],
5362                None,
5363            )
5364            .await?;
5365        assert_eq!(
5366            snap(&writer),
5367            before,
5368            "insert_vertices_batch mutated a Writer field"
5369        );
5370
5371        // 3. delete_vertex
5372        let before = snap(&writer);
5373        writer.delete_vertex(vid, None, None).await?;
5374        assert_eq!(
5375            snap(&writer),
5376            before,
5377            "delete_vertex mutated a Writer field"
5378        );
5379
5380        // (insert_edge / delete_edge are skipped here: their fixture cost is
5381        // disproportionate to the audit's marginal value, and the same
5382        // structural argument plus the compiler-enforced `&self` covers them.)
5383
5384        Ok(())
5385    }
5386}