uni_store/runtime/writer.rs
1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::runtime::context::QueryContext;
5use crate::runtime::flush_coordinator::{
6 FinalizeFn, FlushCoordinator, FlushOutcome as AsyncFlushOutcome, RotatedFlush, SharedFlushCtx,
7};
8use crate::runtime::id_allocator::IdAllocator;
9use crate::runtime::l0::{L0Buffer, serialize_constraint_key};
10use crate::runtime::l0_manager::L0Manager;
11use crate::runtime::property_manager::PropertyManager;
12use crate::runtime::wal::WriteAheadLog;
13use crate::storage::adjacency_manager::AdjacencyManager;
14use crate::storage::delta::{L1Entry, Op};
15use crate::storage::main_edge::MainEdgeDataset;
16use crate::storage::main_vertex::MainVertexDataset;
17use crate::storage::manager::StorageManager;
18use anyhow::{Result, anyhow};
19use chrono::Utc;
20use metrics;
21use parking_lot::{Mutex as PlMutex, RwLock};
22use std::collections::{BTreeMap, HashMap, HashSet};
23use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
24use std::sync::{Arc, OnceLock};
25use tracing::{debug, info, instrument};
26use uni_common::Properties;
27use uni_common::Value;
28use uni_common::config::UniConfig;
29use uni_common::core::fork::ForkId;
30use uni_common::core::id::{Eid, Vid};
31use uni_common::core::schema::{ConstraintTarget, ConstraintType, IndexDefinition};
32use uni_common::core::snapshot::{EdgeSnapshot, LabelSnapshot, SnapshotManifest};
33use uni_xervo::runtime::ModelRuntime;
34use uuid::Uuid;
35
36#[derive(Clone, Debug)]
37pub struct WriterConfig {
38 pub max_mutations: usize,
39 /// Enable the partial-column MergeInsert path for SET-only flushes.
40 ///
41 /// When `true`, `Writer::insert_vertex_partial` records the touched
42 /// property keys into `L0Buffer::vertex_partial_keys` and the flush
43 /// routes those VIDs through Lance `MergeInsertBuilder` with a
44 /// subset-of-schema source, skipping the read of (and write of)
45 /// the unchanged columns — including wide ones like embeddings.
46 ///
47 /// When `false`, `insert_vertex_partial` falls back to the
48 /// read-modify-write `insert_vertex_with_labels` path (preserving
49 /// bit-for-bit equivalence with prior releases). Default `false`
50 /// for the first release; flip to `true` after telemetry on the
51 /// issue #72 ingest workload confirms the win.
52 ///
53 /// See the soundness probe at
54 /// `crates/uni-store/tests/common/storage/lance_merge_insert_probe.rs`.
55 pub partial_lance_writes: bool,
56}
57
58impl Default for WriterConfig {
59 fn default() -> Self {
60 Self {
61 max_mutations: 10_000,
62 partial_lance_writes: false,
63 }
64 }
65}
66
67/// Parent state captured atomically at a fork point under `flush_lock`.
68///
69/// Holds the allocator high-water marks and every existing dataset's
70/// Lance main-branch version at the instant the parent's L0 was
71/// flushed. Because [`Writer::flush_and_capture_fork_point`] reads these
72/// while still holding `flush_lock`, no concurrent commit or flush can
73/// advance the allocator or any dataset tip between the flush and the
74/// reads. A fork built from these values therefore cannot collide VIDs
75/// with the parent nor inherit rows committed after the fork point.
76#[derive(Clone, Debug, Default)]
77pub struct ForkPoint {
78 /// Next vertex id the parent would allocate at the fork point.
79 pub vid_hwm: u64,
80 /// Next edge id the parent would allocate at the fork point.
81 pub eid_hwm: u64,
82 /// `dataset_name` → Lance main-branch version at the fork point.
83 ///
84 /// Keys use the same dataset naming as the fork branch loop
85 /// (`vertices`, `edges`, `vertices_{label}`, `deltas_{type}_{dir}`,
86 /// `adjacency_{type}_{dir}`). A dataset with no `.lance` directory
87 /// on disk at the fork point has no entry.
88 pub dataset_versions: BTreeMap<String, u64>,
89 /// Parent's MVCC version high-water-mark at the fork point: the
90 /// largest `_version` any inherited row can carry. A fork bootstraps
91 /// its own version counter to this floor so a fork transaction's
92 /// `_version <= pin` read still sees inherited (base_paths) rows,
93 /// while the fork's own writes get versions above it.
94 pub version_hwm: u64,
95}
96
97/// RAII latch on [`StorageManager::flush_in_progress`].
98///
99/// Sets the flag to `true` on construction (via CAS) and back to `false` on
100/// drop, so any `?` early-exit inside `flush_to_l1` cannot leave the flag
101/// stuck. Returns `None` if a flush is already in progress, providing
102/// forward-compatible exclusion once the outer writer-RwLock is removed in
103/// Phase 4 of the concurrent-writer refactor.
104// FlushInProgressGuard moved to storage/manager.rs so flush_coordinator.rs
105// can hold it on RotatedFlush without a writer.rs back-import cycle.
106pub use crate::storage::manager::FlushInProgressGuard;
107
108/// Output of [`Writer::flush_l0_rotate`]: the to-be-flushed L0 buffer,
109/// captured WAL LSN, current_version, and the in-progress guard whose
110/// lifetime spans the full flush (including the future async stream
111/// phase that runs on a spawned task).
112struct RotateOutput {
113 old_l0_arc: Arc<RwLock<L0Buffer>>,
114 wal_lsn: u64,
115 current_version: u64,
116 flush_in_progress_guard: FlushInProgressGuard,
117}
118
119/// Project a property map to a subset selected by `keys`. Used to
120/// run `touched_needs_full_read` against just the SET-touched keys
121/// when the caller passes a fully-merged `props` map.
122fn props_subset(props: &Properties, keys: &HashSet<String>) -> Properties {
123 let mut out = Properties::new();
124 for k in keys {
125 if let Some(v) = props.get(k) {
126 out.insert(k.clone(), v.clone());
127 }
128 }
129 out
130}
131
132/// Join a storage base URI and a dataset name into a `.lance` URI.
133///
134/// Mirrors the fork branch loop's `join_uri` so the versions captured
135/// by [`Writer::flush_and_capture_fork_point`] key the exact same
136/// datasets the fork later branches.
137fn join_lance_uri(base: &str, dataset: &str) -> String {
138 if base.ends_with('/') {
139 format!("{base}{dataset}.lance")
140 } else {
141 format!("{base}/{dataset}.lance")
142 }
143}
144
145/// Cheap on-disk existence check for a dataset `.lance` directory.
146///
147/// Local-fs heuristic: a URI with a `://` scheme is assumed remote and
148/// reported present, deferring the real check to `current_version`.
149/// Mirrors the fork branch loop's `path_exists`.
150fn lance_path_exists(uri: &str) -> bool {
151 if uri.contains("://") {
152 return true;
153 }
154 std::path::Path::new(uri).exists()
155}
156
157/// Output of [`Writer::flush_stream_l1`]: the built (but not yet
158/// published) snapshot manifest and its id. Finalize is responsible
159/// for `save_snapshot` + `set_latest_snapshot` + `cached_manifest`
160/// update.
161struct FlushOutcome {
162 manifest: SnapshotManifest,
163 snapshot_id: String,
164}
165
166pub struct Writer {
167 pub l0_manager: Arc<L0Manager>,
168 pub storage: Arc<StorageManager>,
169 pub schema_manager: Arc<uni_common::core::schema::SchemaManager>,
170 pub allocator: Arc<IdAllocator>,
171 pub config: UniConfig,
172 /// Optional embedding runtime. `OnceLock` so the initializer can run
173 /// on `&self` after the `Writer` has been wrapped in `Arc<Writer>`
174 /// (Phase 4 of concurrent_writer.md). Read through
175 /// [`Writer::xervo_runtime`] — the field itself is private to keep
176 /// callers oblivious to the OnceLock representation.
177 xervo_runtime: OnceLock<Arc<ModelRuntime>>,
178 /// Property manager for cache invalidation after flush
179 pub property_manager: Option<Arc<PropertyManager>>,
180 /// Adjacency manager for dual-write (edges survive flush).
181 adjacency_manager: Arc<AdjacencyManager>,
182 /// Timestamp of last flush or creation. Interior-mutable so that
183 /// `&self` callers can update it; uncontended in practice because all
184 /// writes happen inside the single-flusher critical section.
185 /// Arc-wrapped so it can travel into the SharedFlushCtx that the
186 /// async-flush coordinator passes to spawned stream/finalize tasks.
187 last_flush_time: Arc<PlMutex<std::time::Instant>>,
188 /// Background compaction task handle (prevents concurrent compaction races)
189 compaction_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
190 /// Optional index rebuild manager for post-flush automatic rebuild scheduling.
191 /// `OnceLock` for the same reason as `xervo_runtime`.
192 /// Wrapped in `Arc` so the async-flush finalize path can read it
193 /// from a spawned task via `SharedFlushCtx`.
194 index_rebuild_manager: Arc<OnceLock<Arc<crate::storage::index_rebuild::IndexRebuildManager>>>,
195 /// Cached snapshot manifest from the last flush. Avoids re-reading from
196 /// object store on every flush_to_l1 call. Wrapped in a `Mutex` for
197 /// `&self` access; uncontended because all access is inside the
198 /// single-flusher critical section.
199 cached_manifest: Arc<PlMutex<Option<SnapshotManifest>>>,
200 /// Identifier of the fork this writer serves, if any. `None` for
201 /// primary's writer. Set by [`crate::fork::writer_factory::new_for_fork`]
202 /// and read in `flush_to_l1` to emit fork-tagged metrics and to fire
203 /// the fragment-count guard rail (Phase 2 Day 12).
204 pub fork_id: Option<ForkId>,
205 /// Number of `flush_to_l1` calls since this writer was constructed.
206 /// Used as a proxy for L1 fragment growth on the fork's branches:
207 /// each flush typically appends ~1 fragment per touched dataset, so
208 /// the count tracks the order of magnitude of fragment accumulation.
209 /// Reading the actual `Dataset::manifest().fragments.len()` per
210 /// flush would add a per-dataset object-store roundtrip on the hot
211 /// commit path; the proxy keeps the guard rail purely observational
212 /// (Phase 5 introduces fork compaction proper). Only meaningful when
213 /// `fork_id.is_some()`. `Relaxed` is sufficient — observational only.
214 fork_flush_count: Arc<AtomicU64>,
215 /// Whether the fork-fragment warning has already fired at the
216 /// configured threshold. One-shot per writer lifetime. `Relaxed` is
217 /// sufficient — observational only.
218 fork_fragment_warn_fired: Arc<AtomicBool>,
219 /// Dedicated lock for the genuinely-exclusive flush path. Acquired by
220 /// the [`Writer::flush_to_l1`] entry and by `commit_transaction_l0`
221 /// across its WAL-append + L0-merge window. Replaces the outer
222 /// `Arc<RwLock<Writer>>` for flush exclusion once Phase 4 drops it.
223 /// Arc-wrapped so async-flush coordinator's finalize path can
224 /// re-acquire it from a spawned task via SharedFlushCtx.
225 flush_lock: Arc<tokio::sync::Mutex<()>>,
226 /// Coordinator for async-flush pipeline. Owns the back-pressure
227 /// semaphore, rotate-order sequence, single-finalizer task, and
228 /// pending-flush counter. Always present even when async flush is
229 /// disabled — the sync `flush_to_l1` path uses it for the future
230 /// `FlushInProgressGuard`/permit ownership model.
231 /// Coordinator is `None` when `async_flush_enabled = false`. The
232 /// coordinator's finalizer task captures `SharedFlushCtx` which
233 /// includes `Arc<StorageManager>`; on a fork-scoped Writer that
234 /// also pins the fork's `ForkScope` via `storage.fork_scope`, so
235 /// the holder count never drops. Constructing it only when the
236 /// feature is actually on avoids that side-effect for all
237 /// existing sync-flush paths. When async-flush graduates from
238 /// opt-in to default (Commit 12), `drop_fork` (Commit 8) handles
239 /// the drain explicitly.
240 #[allow(dead_code)] // first production use lands in Commit 6/7
241 pub(crate) flush_coordinator: Option<Arc<crate::runtime::flush_coordinator::FlushCoordinator>>,
242 /// Optimistic-concurrency commit-sequence counter (SSI). Incremented once
243 /// per successful commit under `flush_lock`; a transaction captures the
244 /// current value at begin as its read sequence (`L0Buffer::occ_read_seq`).
245 ///
246 /// Always allocated; consulted only when `config.ssi_enabled` is `true`.
247 ///
248 /// Typed through the [`crate::runtime::sync`] shim so the OCC commit core can
249 /// be model-checked under loom/shuttle; aliases to `std::AtomicU64` normally.
250 commit_sequence: Arc<crate::runtime::sync::AtomicU64>,
251 /// Bounded log of recently-committed write-sets for OCC conflict detection.
252 /// Read and updated only under `flush_lock`.
253 ///
254 /// Always allocated; consulted only when `config.ssi_enabled` is `true`.
255 committed_writes: Arc<PlMutex<crate::runtime::occ::CommitRegistry>>,
256 /// Per-row pessimistic locks for `FOR UPDATE` (SSI escape hatch), keyed by
257 /// canonical (label, key-props) bytes. A transaction holds the lock from
258 /// MATCH until commit/rollback, serializing concurrent `FOR UPDATE` writers
259 /// on the same key (avoiding optimistic abort-retry on hot keys).
260 ///
261 /// Always allocated; populated only when `config.ssi_enabled` is `true`.
262 for_update_locks: Arc<dashmap::DashMap<Vec<u8>, Arc<tokio::sync::Mutex<()>>>>,
263}
264
265/// Number of recent commits retained for OCC conflict detection. Large enough
266/// that under-run — and the resulting conservative abort — is rare in practice;
267/// each entry is a small set of touched ids.
268const OCC_REGISTRY_CAPACITY: usize = 4096;
269
270impl Writer {
271 pub async fn new(
272 storage: Arc<StorageManager>,
273 schema_manager: Arc<uni_common::core::schema::SchemaManager>,
274 start_version: u64,
275 ) -> Result<Self> {
276 Self::new_with_config(
277 storage,
278 schema_manager,
279 start_version,
280 UniConfig::default(),
281 None,
282 None,
283 )
284 .await
285 }
286
287 pub async fn new_with_config(
288 storage: Arc<StorageManager>,
289 schema_manager: Arc<uni_common::core::schema::SchemaManager>,
290 start_version: u64,
291 config: UniConfig,
292 wal: Option<Arc<WriteAheadLog>>,
293 allocator: Option<Arc<IdAllocator>>,
294 ) -> Result<Self> {
295 let allocator = if let Some(a) = allocator {
296 a
297 } else {
298 let store = storage.store();
299 let path = object_store::path::Path::from("id_allocator.json");
300 Arc::new(IdAllocator::new(store, path, 1000).await?)
301 };
302
303 let l0_manager = Arc::new(L0Manager::new(start_version, wal));
304
305 let property_manager = Some(Arc::new(PropertyManager::new(
306 storage.clone(),
307 schema_manager.clone(),
308 1000,
309 )));
310
311 let adjacency_manager = storage.adjacency_manager();
312
313 // Hoist the Arc'd fields so we can both stash them on Writer and
314 // hand the same Arcs to the SharedFlushCtx that FlushCoordinator
315 // captures. Single-source-of-truth for each piece of mutable
316 // shared state.
317 let last_flush_time = Arc::new(PlMutex::new(std::time::Instant::now()));
318 let cached_manifest = Arc::new(PlMutex::new(None));
319 let fork_flush_count = Arc::new(AtomicU64::new(0));
320 let fork_fragment_warn_fired = Arc::new(AtomicBool::new(false));
321 let flush_lock = Arc::new(tokio::sync::Mutex::new(()));
322 let compaction_handle = Arc::new(RwLock::new(None));
323 let index_rebuild_manager: Arc<
324 OnceLock<Arc<crate::storage::index_rebuild::IndexRebuildManager>>,
325 > = Arc::new(OnceLock::new());
326
327 let flush_coordinator = if config.async_flush_enabled {
328 let shared = SharedFlushCtx {
329 storage: storage.clone(),
330 l0_manager: l0_manager.clone(),
331 adjacency_manager: adjacency_manager.clone(),
332 property_manager: property_manager.clone(),
333 schema_manager: schema_manager.clone(),
334 cached_manifest: cached_manifest.clone(),
335 last_flush_time: last_flush_time.clone(),
336 fork_id: None,
337 fork_flush_count: fork_flush_count.clone(),
338 fork_fragment_warn_fired: fork_fragment_warn_fired.clone(),
339 fork_fragment_warn_threshold: config.fork_fragment_warn_threshold,
340 flush_lock: flush_lock.clone(),
341 index_rebuild_manager: index_rebuild_manager.clone(),
342 compaction_handle: compaction_handle.clone(),
343 compaction_config: config.compaction.clone(),
344 index_rebuild_config: config.index_rebuild.clone(),
345 auto_rebuild_enabled: config.index_rebuild.auto_rebuild_enabled,
346 };
347 let finalize_fn: Arc<dyn FinalizeFn> = Arc::new(WriterFinalizer);
348 Some(Arc::new(FlushCoordinator::new(
349 config.max_pending_flushes,
350 shared,
351 finalize_fn,
352 )))
353 } else {
354 None
355 };
356
357 let commit_sequence = Arc::new(crate::runtime::sync::AtomicU64::new(0));
358 let committed_writes = Arc::new(PlMutex::new(crate::runtime::occ::CommitRegistry::new(
359 OCC_REGISTRY_CAPACITY,
360 )));
361 let for_update_locks = Arc::new(dashmap::DashMap::new());
362
363 Ok(Self {
364 l0_manager,
365 storage,
366 schema_manager,
367 allocator,
368 config,
369 xervo_runtime: OnceLock::new(),
370 property_manager,
371 adjacency_manager,
372 last_flush_time,
373 compaction_handle,
374 index_rebuild_manager,
375 cached_manifest,
376 fork_id: None,
377 fork_flush_count,
378 fork_fragment_warn_fired,
379 flush_lock,
380 flush_coordinator,
381 commit_sequence,
382 committed_writes,
383 for_update_locks,
384 })
385 }
386
387 /// Returns the shared pessimistic lock handle for a `FOR UPDATE` row key,
388 /// creating it on first use. The caller `.lock_owned().await`s the returned
389 /// mutex and holds the guard for the transaction's lifetime.
390 pub fn row_lock_handle(&self, key: &[u8]) -> Arc<tokio::sync::Mutex<()>> {
391 self.for_update_locks
392 .entry(key.to_vec())
393 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
394 .clone()
395 }
396
397 /// Prunes `FOR UPDATE` lock-map entries for `keys` that no live transaction
398 /// holds anymore, so the map does not grow without bound across the keyspace.
399 ///
400 /// Called when a transaction ends, **after** its guards have been dropped.
401 /// `remove_if` evaluates its predicate under the DashMap shard lock, which is
402 /// the same lock `row_lock_handle` takes to clone an entry — so the check
403 /// `strong_count == 1` (only the map holds the `Arc`) is race-free: a
404 /// concurrent acquirer either already cloned the `Arc` (count ≥ 2 → we skip
405 /// removal) or has not yet taken the shard lock (it will mint a fresh entry
406 /// after we remove). Either way no two transactions ever lock different
407 /// `Mutex` instances for the same key.
408 pub fn release_for_update_locks(&self, keys: &[Vec<u8>]) {
409 for key in keys {
410 self.for_update_locks
411 .remove_if(key, |_, handle| Arc::strong_count(handle) == 1);
412 }
413 }
414
415 /// Number of live entries in the `FOR UPDATE` lock map. Introspection for
416 /// tests that the map does not leak entries across transactions (G5).
417 pub fn for_update_lock_count(&self) -> usize {
418 self.for_update_locks.len()
419 }
420
421 /// The current OCC commit sequence. A `FOR UPDATE` acquisition re-stamps a
422 /// fresh transaction's `occ_read_seq` to this so its conflict-detection
423 /// baseline advances to lock-acquisition time (read-latest under the lock).
424 pub fn current_commit_sequence(&self) -> u64 {
425 self.commit_sequence
426 .load(crate::runtime::sync::Ordering::Relaxed)
427 }
428
429 /// Build a fresh `SharedFlushCtx` from this Writer's current state.
430 /// Used by the async-flush stream/finalize paths to pass into spawned
431 /// tasks without smuggling `Arc<Writer>` (which would create a cycle
432 /// with `flush_coordinator -> FinalizeFn -> Writer`).
433 pub(crate) fn shared_ctx(&self) -> SharedFlushCtx {
434 SharedFlushCtx {
435 storage: self.storage.clone(),
436 l0_manager: self.l0_manager.clone(),
437 adjacency_manager: self.adjacency_manager.clone(),
438 property_manager: self.property_manager.clone(),
439 schema_manager: self.schema_manager.clone(),
440 cached_manifest: self.cached_manifest.clone(),
441 last_flush_time: self.last_flush_time.clone(),
442 fork_id: self.fork_id,
443 fork_flush_count: self.fork_flush_count.clone(),
444 fork_fragment_warn_fired: self.fork_fragment_warn_fired.clone(),
445 fork_fragment_warn_threshold: self.config.fork_fragment_warn_threshold,
446 flush_lock: self.flush_lock.clone(),
447 index_rebuild_manager: self.index_rebuild_manager.clone(),
448 compaction_handle: self.compaction_handle.clone(),
449 compaction_config: self.config.compaction.clone(),
450 index_rebuild_config: self.config.index_rebuild.clone(),
451 auto_rebuild_enabled: self.config.index_rebuild.auto_rebuild_enabled,
452 }
453 }
454
455 /// Borrow the flush coordinator if async flush is enabled.
456 /// Returns `None` when `config.async_flush_enabled = false`.
457 /// External callers (`drop_fork`) use this to drain pending streams.
458 pub fn flush_coordinator(
459 &self,
460 ) -> Option<&Arc<crate::runtime::flush_coordinator::FlushCoordinator>> {
461 self.flush_coordinator.as_ref()
462 }
463
464 /// Set the index rebuild manager for post-flush automatic rebuild scheduling.
465 ///
466 /// One-shot: returns `Err` if already set. The receiver is `&self` so this
467 /// can be called after the `Writer` has been wrapped in `Arc<Writer>`.
468 pub fn set_index_rebuild_manager(
469 &self,
470 manager: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
471 ) -> Result<()> {
472 self.index_rebuild_manager
473 .set(manager)
474 .map_err(|_| anyhow!("index_rebuild_manager already set"))
475 }
476
477 /// Replay WAL mutations into the current L0 buffer.
478 pub async fn replay_wal(&self, wal_high_water_mark: u64) -> Result<usize> {
479 let l0 = self.l0_manager.get_current();
480 let wal = l0.read().wal.clone();
481
482 if let Some(wal) = wal {
483 wal.initialize().await?;
484 let mutations = wal.replay_since(wal_high_water_mark).await?;
485 let count = mutations.len();
486
487 if count > 0 {
488 log::info!(
489 "Replaying {} mutations from WAL (LSN > {})",
490 count,
491 wal_high_water_mark
492 );
493 let mut l0_guard = l0.write();
494 l0_guard.replay_mutations(mutations)?;
495 // Rebuild the UNIQUE constraint index over the recovered rows
496 // (Bug #9 Mechanism B). `replay_mutations` restores
497 // vertices/properties/labels but never repopulates
498 // `constraint_index` (its only other caller is the live insert
499 // path). Without this, a unique key that lives only in the WAL
500 // (committed but not yet flushed to Lance) is invisible to
501 // `check_unique_constraint_multi` after recovery and a
502 // duplicate of it could be created.
503 self.rebuild_constraint_index(&mut l0_guard);
504 }
505
506 Ok(count)
507 } else {
508 Ok(0)
509 }
510 }
511
512 /// Rebuild the UNIQUE constraint index on a recovered L0 buffer.
513 ///
514 /// Scans every recovered vertex's properties and, for each enabled UNIQUE
515 /// constraint whose target label the vertex carries and whose member
516 /// properties are all present, inserts the same constraint key the live
517 /// insert path builds (`serialize_constraint_key`). Tombstoned vertices are
518 /// skipped. Called after [`L0Buffer::replay_mutations`] under the buffer's
519 /// write lock; the schema is already loaded on the `Writer`.
520 fn rebuild_constraint_index(&self, l0_guard: &mut L0Buffer) {
521 let schema = self.schema_manager.schema();
522 // Collect entries first to avoid borrowing `vertex_properties`
523 // immutably while mutating `constraint_index` through the same guard.
524 let mut keys: Vec<(Vec<u8>, Vid)> = Vec::new();
525 for (&vid, props) in &l0_guard.vertex_properties {
526 if l0_guard.vertex_tombstones.contains(&vid) {
527 continue;
528 }
529 let Some(labels) = l0_guard.vertex_labels.get(&vid) else {
530 continue;
531 };
532 for label in labels {
533 for constraint in &schema.constraints {
534 if !constraint.enabled {
535 continue;
536 }
537 let ConstraintTarget::Label(l) = &constraint.target else {
538 continue;
539 };
540 if l != label {
541 continue;
542 }
543 let ConstraintType::Unique {
544 properties: unique_props,
545 } = &constraint.constraint_type
546 else {
547 continue;
548 };
549 let mut key_values = Vec::new();
550 let mut all_present = true;
551 for prop in unique_props {
552 if let Some(val) = props.get(prop) {
553 key_values.push((prop.clone(), val.clone()));
554 } else {
555 all_present = false;
556 break;
557 }
558 }
559 if all_present {
560 keys.push((serialize_constraint_key(label, &key_values), vid));
561 }
562 }
563 }
564 }
565 for (key, vid) in keys {
566 l0_guard.insert_constraint_key(key, vid);
567 }
568 }
569
570 /// Allocates the next VID (pure auto-increment).
571 pub async fn next_vid(&self) -> Result<Vid> {
572 self.allocator.allocate_vid().await
573 }
574
575 /// Allocates multiple VIDs at once for bulk operations.
576 /// This is more efficient than calling next_vid() in a loop.
577 pub async fn allocate_vids(&self, count: usize) -> Result<Vec<Vid>> {
578 self.allocator.allocate_vids(count).await
579 }
580
581 /// Allocates the next EID (pure auto-increment).
582 pub async fn next_eid(&self, _type_id: u32) -> Result<Eid> {
583 self.allocator.allocate_eid().await
584 }
585
586 /// Allocates multiple EIDs at once for bulk operations.
587 /// This is more efficient than calling next_eid() in a loop.
588 pub async fn allocate_eids(&self, count: usize) -> Result<Vec<Eid>> {
589 self.allocator.allocate_eids(count).await
590 }
591
592 /// Install the embedding runtime exactly once. Receiver is `&self` so it
593 /// can be called after the `Writer` has been wrapped in `Arc<Writer>`.
594 pub fn set_xervo_runtime(&self, runtime: Arc<ModelRuntime>) -> Result<()> {
595 self.xervo_runtime
596 .set(runtime)
597 .map_err(|_| anyhow!("xervo_runtime already set"))
598 }
599
600 pub fn xervo_runtime(&self) -> Option<Arc<ModelRuntime>> {
601 self.xervo_runtime.get().cloned()
602 }
603
604 /// Create a new empty L0 buffer for transaction-scoped mutations.
605 ///
606 /// Only reads the current version — no exclusive lock required on Writer.
607 /// The returned buffer has no WAL reference; mutations are logged at
608 /// commit time via [`Self::commit_transaction_l0`].
609 pub fn create_transaction_l0(&self) -> Arc<RwLock<L0Buffer>> {
610 let current_version = self.l0_manager.get_current().read().current_version;
611 // Transaction mutations are logged to WAL at COMMIT time, not during the transaction.
612 let buf = L0Buffer::new(current_version, None);
613 // SSI: stamp the OCC read sequence at begin so commit can detect any
614 // transaction that committed since. Gated on the runtime `ssi_enabled`
615 // toggle — when off, `occ_read_set` stays `None` and every downstream
616 // read-set recording / commit validation self-gates to a no-op.
617 let buf = if self.config.ssi_enabled {
618 let mut buf = buf;
619 buf.occ_read_seq = self
620 .commit_sequence
621 .load(crate::runtime::sync::Ordering::Relaxed);
622 // The read path records observed ids here for SSI antidependency
623 // detection; commit consults it.
624 buf.occ_read_set = Some(Arc::new(parking_lot::Mutex::new(
625 crate::runtime::l0::OccReadSet::default(),
626 )));
627 buf
628 } else {
629 buf
630 };
631 Arc::new(RwLock::new(buf))
632 }
633
634 /// Resolve the target L0 buffer for a mutation.
635 ///
636 /// When `tx_l0` is `Some`, the mutation targets a transaction-private buffer.
637 /// When `None`, it targets the global L0 from the manager.
638 fn resolve_l0(&self, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) -> Arc<RwLock<L0Buffer>> {
639 tx_l0
640 .cloned()
641 .unwrap_or_else(|| self.l0_manager.get_current())
642 }
643
644 fn update_metrics(&self) {
645 let l0 = self.l0_manager.get_current();
646 let size = l0.read().estimated_size;
647 metrics::gauge!("l0_buffer_size_bytes").set(size as f64);
648 }
649
650 /// Overlay-aware issue-#77 edge-endpoint validation.
651 ///
652 /// The current buffer alone does not hold all committed-but-unflushed
653 /// tombstones — a flush rotation moves them onto `pending_flush` until
654 /// the Lance write completes. A vertex is effectively deleted iff,
655 /// walking newest-first (tx → current → pending newest→oldest), the
656 /// first buffer that knows the vid says "tombstoned" (an insert clears
657 /// the tombstone within a buffer, so props/tombstone are mutually
658 /// exclusive per buffer).
659 ///
660 /// Must run under `flush_lock` so the overlay cannot change before the
661 /// merge, and BEFORE the durable WAL flush (see the call site).
662 fn validate_edge_endpoints_overlay(&self, tx_l0: &L0Buffer) -> Result<()> {
663 let chain = self.l0_manager.get_pending_flush();
664 let current = self.l0_manager.get_current();
665 let effectively_deleted = |vid: &Vid| -> bool {
666 if tx_l0.vertex_properties.contains_key(vid) {
667 return false;
668 }
669 if tx_l0.vertex_tombstones.contains(vid) {
670 return true;
671 }
672 {
673 let cur = current.read();
674 if cur.vertex_properties.contains_key(vid) {
675 return false;
676 }
677 if cur.vertex_tombstones.contains(vid) {
678 return true;
679 }
680 }
681 for frozen in chain.iter().rev() {
682 let g = frozen.read();
683 if g.vertex_properties.contains_key(vid) {
684 return false;
685 }
686 if g.vertex_tombstones.contains(vid) {
687 return true;
688 }
689 }
690 false
691 };
692 for (eid, (src_vid, dst_vid, _etype)) in &tx_l0.edge_endpoints {
693 if tx_l0.tombstones.contains_key(eid) {
694 continue; // a deletion, not an insertion — never resurrects a vertex
695 }
696 if effectively_deleted(src_vid) {
697 anyhow::bail!(
698 "Cannot insert edge {}: source vertex {} has been deleted (issue #77)",
699 eid,
700 src_vid
701 );
702 }
703 if effectively_deleted(dst_vid) {
704 anyhow::bail!(
705 "Cannot insert edge {}: destination vertex {} has been deleted (issue #77)",
706 eid,
707 dst_vid
708 );
709 }
710 }
711 Ok(())
712 }
713
714 /// Seed `main_l0` (the current buffer) with the newest pending-overlay
715 /// value for each CRDT property the transaction writes, so the commit
716 /// merge MERGES against the committed CRDT state instead of shadowing it
717 /// (the carve-out lets concurrent CRDT writers commit on the assumption
718 /// that the merge sees the committed value — true only while that value
719 /// lives in current, not mid-flush on `pending_flush`). No-op when
720 /// nothing is pending or the property already exists in current. Vertex
721 /// properties only, mirroring the carve-out itself.
722 fn seed_crdt_state_from_chain(&self, tx_l0: &L0Buffer, main_l0: &mut L0Buffer) {
723 let chain = self.l0_manager.get_pending_flush();
724 if chain.is_empty() {
725 return;
726 }
727 for (vid, props) in &tx_l0.vertex_properties {
728 Self::seed_crdt_props(&chain, *vid, props, main_l0);
729 }
730 }
731
732 /// Per-vertex CRDT seeding (see [`Self::seed_crdt_state_from_chain`]).
733 /// Also used by the non-transactional vertex write path, which CRDT-merges
734 /// into the current buffer directly and has the same shadowing hazard
735 /// during a flush window.
736 fn seed_crdt_props(
737 chain: &[Arc<RwLock<L0Buffer>>],
738 vid: Vid,
739 props: &Properties,
740 target: &mut L0Buffer,
741 ) {
742 for (key, value) in props {
743 if crate::runtime::l0::try_as_crdt(value).is_none() {
744 continue;
745 }
746 if target
747 .vertex_properties
748 .get(&vid)
749 .is_some_and(|p| p.contains_key(key))
750 {
751 continue;
752 }
753 // Newest generation first: the first hit is the live state.
754 for frozen in chain.iter().rev() {
755 let g = frozen.read();
756 if let Some(v) = g.vertex_properties.get(&vid).and_then(|p| p.get(key)) {
757 if crate::runtime::l0::try_as_crdt(v).is_some() {
758 target
759 .vertex_properties
760 .entry(vid)
761 .or_default()
762 .insert(key.clone(), v.clone());
763 }
764 break;
765 }
766 }
767 }
768 }
769
770 /// Commit an externally-owned transaction L0 buffer.
771 ///
772 /// Writes mutations to WAL, flushes, merges into main L0, and replays
773 /// edges into the AdjacencyManager. Returns the WAL LSN of the commit
774 /// (0 when no WAL is configured).
775 /// Commit a transaction's private L0 buffer into main L0.
776 ///
777 /// Returns `(wal_lsn, flush_pending)`. When `flush_pending == true`, the
778 /// post-commit `should_flush()` predicate fired but no flush ran — the
779 /// caller is expected to spawn a background `flush_to_l1`. This is the
780 /// shape used when `UniConfig::async_flush_enabled` is set, so commits
781 /// don't block on L1-streaming I/O.
782 pub async fn commit_transaction_l0(
783 self: &Arc<Self>,
784 tx_l0_arc: Arc<RwLock<L0Buffer>>,
785 ) -> Result<(u64, bool)> {
786 // Hold `flush_lock` across WAL append + flush + main-L0 merge.
787 // Two concurrent commits serialize here; in Phase 3 the outer
788 // `Arc<RwLock<Writer>>` already provides this exclusion, so the
789 // acquisition is uncontended. Phase 4 drops the outer lock and
790 // this becomes the load-bearing serialization point.
791 let _flush_lock_guard = self.flush_lock.lock().await;
792
793 // Crash-recovery seam: simulate process death immediately after winning
794 // the commit serialization point but before any durable work. No-op
795 // unless built with `--features failpoints`. (See ssi_resilience tests.)
796 fail::fail_point!("commit::after-flush-lock");
797
798 // SSI: optimistic conflict detection. This MUST run before any WAL
799 // write — `flush_wal()` below is the durable commit point and the WAL
800 // has no abort marker, so aborting after it would resurrect this
801 // transaction on crash recovery. The write-set is reused for
802 // registration after a successful merge.
803 // Runtime-gated on `config.ssi_enabled`. When off, no validation runs
804 // and `occ_write_set` is `None`, so the post-merge registration below
805 // is skipped — reproducing last-writer-wins exactly.
806 let occ_write_set: Option<crate::runtime::occ::WriteSet> = if self.config.ssi_enabled {
807 let tx_l0 = tx_l0_arc.read();
808 let read_seq = tx_l0.occ_read_seq;
809 let write_set = crate::runtime::occ::WriteSet::from_l0(&tx_l0);
810 if !write_set.is_empty() {
811 // Telemetry: one validation per non-empty (writing) commit. The
812 // ratio of conflicts to validations is the headline abort rate.
813 metrics::counter!("uni_ssi_commit_validations_total").increment(1);
814 // Read-set is consulted only for writing transactions, so a
815 // read-only commit (empty write-set) runs at snapshot isolation.
816 let read_guard = tx_l0.occ_read_set.as_ref().map(|rs| rs.lock());
817 if let Some(conflict) =
818 self.committed_writes
819 .lock()
820 .check(read_seq, &write_set, read_guard.as_deref())
821 {
822 use crate::runtime::occ::Conflict;
823 match &conflict {
824 Conflict::WriteWrite { .. } => metrics::counter!(
825 "uni_ssi_serialization_conflicts_total",
826 "kind" => "write_write",
827 )
828 .increment(1),
829 Conflict::ReadWrite { .. } => metrics::counter!(
830 "uni_ssi_serialization_conflicts_total",
831 "kind" => "read_write",
832 )
833 .increment(1),
834 Conflict::HistoryTruncated { .. } => {
835 metrics::counter!("uni_ssi_history_truncated_total").increment(1)
836 }
837 }
838 return Err(anyhow::Error::new(
839 uni_common::UniError::SerializationConflict {
840 message: conflict.to_string(),
841 },
842 ));
843 }
844 }
845
846 // Validate against the committed-but-unflushed overlay under
847 // `flush_lock`: serializable MERGE uniqueness + CRDT carve-out
848 // soundness. The current buffer alone does not hold all committed
849 // state — a flush rotation moves it onto `pending_flush` until the
850 // Lance write completes (the Bug #9A window, here at the
851 // commit-time layer) — so every check walks [current, pending…].
852 {
853 let pending = self.l0_manager.get_pending_flush();
854 let main_l0 = self.l0_manager.get_current();
855 let overlay: Vec<Arc<RwLock<L0Buffer>>> =
856 std::iter::once(main_l0).chain(pending).collect();
857
858 // SSI / serializable MERGE: abort if a concurrent transaction has
859 // already committed a row with one of this transaction's unique
860 // keys. Commits serialize here, so this closes the race window
861 // left by the per-insert check. (Empty index → no iterations.)
862 for (key, vid) in &tx_l0.constraint_index {
863 if overlay
864 .iter()
865 .any(|b| b.read().has_constraint_key(key, *vid))
866 {
867 metrics::counter!("uni_ssi_constraint_conflicts_total").increment(1);
868 return Err(anyhow::Error::new(
869 uni_common::UniError::ConstraintConflict {
870 message: "unique key already committed by a concurrent \
871 transaction"
872 .to_string(),
873 },
874 ));
875 }
876 }
877
878 // Implicit MERGE phantom guard: a `MERGE` that *created* a node
879 // registered its (label, key-props) here even with no declared
880 // UNIQUE constraint. If a concurrent transaction already committed
881 // the same MERGE key, abort retriably so the two converge to one
882 // node on retry (the loser's MATCH then finds the committed row).
883 // Only MERGE-creates register keys, so a plain CREATE of the same
884 // properties never lands here. (Empty index → no iterations.)
885 for (key, vid) in &tx_l0.merge_guard_index {
886 if overlay
887 .iter()
888 .any(|b| b.read().has_merge_guard_key(key, *vid))
889 {
890 metrics::counter!("uni_ssi_constraint_conflicts_total").increment(1);
891 return Err(anyhow::Error::new(
892 uni_common::UniError::ConstraintConflict {
893 message: "MERGE key already committed by a concurrent \
894 transaction"
895 .to_string(),
896 },
897 ));
898 }
899 }
900
901 // Same race window for global ext_id uniqueness: the per-insert
902 // check ran against an older main L0; re-probe the committed
903 // index here, where commits serialize.
904 for (ext_id, vid) in &tx_l0.extid_index {
905 let taken = overlay.iter().any(|b| {
906 matches!(b.read().extid_index.get(ext_id), Some(&owner) if owner != *vid)
907 });
908 if taken {
909 metrics::counter!("uni_ssi_constraint_conflicts_total").increment(1);
910 return Err(anyhow::Error::new(
911 uni_common::UniError::ConstraintConflict {
912 message: format!(
913 "ext_id '{ext_id}' already committed by a concurrent \
914 transaction"
915 ),
916 },
917 ));
918 }
919 }
920
921 // CRDT carve-out soundness: a pure-CRDT write was dropped from the
922 // write-set assuming its merge commutes. If the overlay holds a
923 // *different* CRDT variant for the same property, the merge would
924 // silently overwrite it — abort instead of losing the update.
925 // (Checked against every overlay buffer: conservative if an old
926 // generation held a different variant that a newer commit already
927 // replaced, but an abort+retry is always sound.)
928 for buf in &overlay {
929 if let Some(conflict) =
930 crate::runtime::occ::crdt_carveout_overwrite(&tx_l0, &buf.read())
931 {
932 metrics::counter!("uni_ssi_crdt_aborts_total").increment(1);
933 return Err(anyhow::Error::new(
934 uni_common::UniError::SerializationConflict {
935 message: conflict.to_string(),
936 },
937 ));
938 }
939 }
940 }
941 Some(write_set)
942 } else {
943 None
944 };
945
946 // Issue #77: an edge whose endpoint is effectively deleted makes the
947 // merge below bail. That bail MUST happen before the durable WAL flush —
948 // after it the transaction is committed-but-unmerged (a ghost commit),
949 // and WAL replay re-hits the same bail, making the database unopenable.
950 // SSI validation was deliberately placed before the flush for exactly
951 // this reason; the endpoint check belongs here too. Runs unconditionally
952 // (issue #77 is not SSI-gated) under `flush_lock`, so the overlay
953 // tombstone state cannot change between here and the merge. A
954 // tombstone may live in a flush-rotated pending buffer rather than
955 // the current buffer, so the check walks the overlay newest-first.
956 {
957 let tx_l0 = tx_l0_arc.read();
958 self.validate_edge_endpoints_overlay(&tx_l0)?;
959 }
960
961 // Crash-recovery seam: SSI validation has passed; the transaction is
962 // about to become durable. A crash here must leave NO trace (validation
963 // happens before the WAL is touched). No-op unless `failpoints`.
964 fail::fail_point!("commit::after-validate");
965
966 // 1. Write transaction mutations to WAL BEFORE merging into main L0
967 // This ensures durability before visibility.
968 {
969 let tx_l0 = tx_l0_arc.read();
970 let main_l0_arc = self.l0_manager.get_current();
971 let main_l0 = main_l0_arc.read();
972
973 // If WAL exists, write mutations to it for durability
974 if let Some(wal) = main_l0.wal.as_ref() {
975 // Order: vertices first, then edges (to ensure src/dst exist on replay)
976
977 // Vertex insertions
978 for (vid, properties) in &tx_l0.vertex_properties {
979 if !tx_l0.vertex_tombstones.contains(vid) {
980 let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
981 wal.append(crate::runtime::wal::Mutation::InsertVertex {
982 vid: *vid,
983 properties: properties.clone(),
984 labels,
985 })?;
986 }
987 }
988
989 // Vertex deletions
990 for vid in &tx_l0.vertex_tombstones {
991 let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
992 wal.append(crate::runtime::wal::Mutation::DeleteVertex { vid: *vid, labels })?;
993 }
994
995 // Label-only mutations (SET n:Label / REMOVE n:Label). After
996 // vertex inserts (so the vertex exists on replay), before edges,
997 // and skipping vertices deleted in this same commit.
998 for vid in &tx_l0.vertex_label_overwrites {
999 if tx_l0.vertex_tombstones.contains(vid) {
1000 continue;
1001 }
1002 let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
1003 wal.append(crate::runtime::wal::Mutation::SetVertexLabels {
1004 vid: *vid,
1005 labels,
1006 })?;
1007 }
1008
1009 // Crash-recovery seam: vertices appended, edges not yet. Tests
1010 // assert that a crash here (before `flush_wal`) recovers NOTHING
1011 // — the durable commit point is the flush below, not append.
1012 fail::fail_point!("commit::mid-wal");
1013
1014 // Edge insertions and deletions from edge_endpoints
1015 for (eid, (src_vid, dst_vid, edge_type)) in &tx_l0.edge_endpoints {
1016 if tx_l0.tombstones.contains_key(eid) {
1017 let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
1018 wal.append(crate::runtime::wal::Mutation::DeleteEdge {
1019 eid: *eid,
1020 src_vid: *src_vid,
1021 dst_vid: *dst_vid,
1022 edge_type: *edge_type,
1023 version,
1024 })?;
1025 } else {
1026 let properties =
1027 tx_l0.edge_properties.get(eid).cloned().unwrap_or_default();
1028 let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
1029 let edge_type_name = tx_l0.edge_types.get(eid).cloned();
1030 wal.append(crate::runtime::wal::Mutation::InsertEdge {
1031 src_vid: *src_vid,
1032 dst_vid: *dst_vid,
1033 edge_type: *edge_type,
1034 eid: *eid,
1035 version,
1036 properties,
1037 edge_type_name,
1038 })?;
1039 }
1040 }
1041
1042 // Tombstones for edges that only exist in the global L0 (not in
1043 // this transaction's edge_endpoints). Without this, deletes of
1044 // pre-existing edges would be silently lost.
1045 for (eid, tombstone) in &tx_l0.tombstones {
1046 if !tx_l0.edge_endpoints.contains_key(eid) {
1047 let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
1048 wal.append(crate::runtime::wal::Mutation::DeleteEdge {
1049 eid: *eid,
1050 src_vid: tombstone.src_vid,
1051 dst_vid: tombstone.dst_vid,
1052 edge_type: tombstone.edge_type,
1053 version,
1054 })?;
1055 }
1056 }
1057 }
1058 }
1059
1060 // 2. Flush WAL to durable storage - THIS IS THE COMMIT POINT
1061 let wal_lsn = self.flush_wal().await?;
1062
1063 // Crash-recovery seam: the WAL is durable but main L0 has NOT merged.
1064 // A crash here must RECOVER the transaction on replay (it is committed),
1065 // even though it was never made visible in-process. No-op unless `failpoints`.
1066 fail::fail_point!("commit::after-wal-flush");
1067
1068 // Component C1: if an outstanding snapshot pins the current generation,
1069 // clone it aside (lazy copy-on-write) before merging, so the pinning
1070 // transaction's reads stay isolated from this commit. No-op — and zero
1071 // cost — when nothing is pinned (the common case). We hold `flush_lock`,
1072 // so this cannot race a flush rotate or another commit's merge; the merge
1073 // below re-fetches `get_current()`, landing in the fresh post-freeze buffer.
1074 // Self-gates on the runtime SSI toggle: a snapshot is only ever pinned by
1075 // a transaction begun under `ssi_enabled`, so `is_current_pinned()` is
1076 // always false when SSI is off and this is a zero-cost no-op.
1077 if self.l0_manager.is_current_pinned() {
1078 self.l0_manager.freeze_current_for_snapshot();
1079 metrics::counter!("uni_l0_snapshot_freezes_total").increment(1);
1080 }
1081
1082 // 3. Merge into main L0 and make visible
1083 {
1084 // Write-lock the tx buffer: `merge_take` moves its property maps
1085 // into main L0 instead of cloning them. The commit consumes the
1086 // transaction, so the drained maps are never observed afterwards;
1087 // everything read below (endpoints, versions, tombstones) is left
1088 // intact.
1089 let mut tx_l0 = tx_l0_arc.write();
1090 let main_l0_arc = self.l0_manager.get_current();
1091 let mut main_l0 = main_l0_arc.write();
1092 // A CRDT property's committed state may live only in a
1093 // flush-rotated pending buffer (the post-rotation current is
1094 // empty until the Lance write completes — the Bug #9A window).
1095 // `merge_crdt_properties` merges against the CURRENT buffer's
1096 // value — without seeding, the tx's CRDT state would SHADOW the
1097 // pending buffer's at read time (newest buffer wins per property)
1098 // and concurrent increments would be lost. Seed the newest
1099 // overlay value for each CRDT property the tx writes that current
1100 // lacks, so the merge below merges instead of replaces.
1101 self.seed_crdt_state_from_chain(&tx_l0, &mut main_l0);
1102 main_l0.merge_take(&mut tx_l0)?;
1103
1104 // Replay transaction edges into the AdjacencyManager overlay
1105 for (eid, (src, dst, etype)) in &tx_l0.edge_endpoints {
1106 let edge_version = tx_l0
1107 .edge_versions
1108 .get(eid)
1109 .copied()
1110 .unwrap_or(main_l0.current_version);
1111 if tx_l0.tombstones.contains_key(eid) {
1112 self.adjacency_manager
1113 .add_tombstone(*eid, *src, *dst, *etype, edge_version);
1114 } else {
1115 self.adjacency_manager
1116 .insert_edge(*src, *dst, *eid, *etype, edge_version);
1117 }
1118 }
1119
1120 // Replay tombstones for edges that only exist in the global L0
1121 // (not in this transaction's edge_endpoints).
1122 for (eid, tombstone) in &tx_l0.tombstones {
1123 if !tx_l0.edge_endpoints.contains_key(eid) {
1124 let edge_version = tx_l0
1125 .edge_versions
1126 .get(eid)
1127 .copied()
1128 .unwrap_or(main_l0.current_version);
1129 self.adjacency_manager.add_tombstone(
1130 *eid,
1131 tombstone.src_vid,
1132 tombstone.dst_vid,
1133 tombstone.edge_type,
1134 edge_version,
1135 );
1136 }
1137 }
1138 }
1139
1140 // Crash-recovery seam: durable AND merged, but the in-memory commit
1141 // registry has not recorded this write-set yet. A crash here is
1142 // indistinguishable from one at `after-wal-flush` on reopen (the
1143 // registry is in-memory and rebuilt empty); the tx still recovers.
1144 fail::fail_point!("commit::after-merge");
1145
1146 // SSI: register this commit's write-set under a fresh commit sequence so
1147 // later transactions detect conflicts against it. Still under
1148 // `flush_lock`, before the async-flush branch can drop the guard.
1149 // `occ_write_set` is `Some` only when `config.ssi_enabled`.
1150 if let Some(write_set) = occ_write_set
1151 && !write_set.is_empty()
1152 {
1153 // Bump-then-record via the shared OCC seam (see `CommitRegistry::commit`)
1154 // so production and the loom/shuttle models exercise identical logic.
1155 self.committed_writes
1156 .lock()
1157 .commit(&self.commit_sequence, write_set);
1158 }
1159
1160 self.update_metrics();
1161
1162 // 4. Best-effort post-commit auto-flush.
1163 //
1164 // Two paths:
1165 // - async_flush_enabled = false (default): inline under our
1166 // existing flush_lock guard via flush_inline_under_lock.
1167 // - async_flush_enabled = true: rotate inline, drop flush_lock,
1168 // then submit the stream phase to the coordinator. Gated on
1169 // `pending_flush_count() < max_pending_flushes` so we don't
1170 // stack up rotations beyond the configured pipeline depth.
1171 // `try_acquire_permit` is non-blocking: if we lose the race
1172 // for the last permit, we just skip this trigger (the next
1173 // commit retries).
1174 let mut flush_pending = false;
1175 if self.should_flush() {
1176 if self.config.async_flush_enabled
1177 && let Some(coord) = self.flush_coordinator.as_ref()
1178 && coord.pending_flush_count() < self.config.max_pending_flushes
1179 {
1180 match coord.try_acquire_permit() {
1181 Some(permit) => {
1182 match self.flush_l0_rotate().await {
1183 Ok(rotate_out) => {
1184 // Allocate the rotate seq and bump pending ONLY
1185 // after the rotate succeeds (Bug #3). A failed
1186 // rotate must consume neither: the finalizer
1187 // advances strictly in consecutive seq order and
1188 // only decrements pending on finalize, so a
1189 // leaked seq/pending from a failed rotate would
1190 // wedge the finalizer forever and climb pending
1191 // toward `max_pending_flushes`. The seq is still
1192 // allocated under `flush_lock` (immediately after
1193 // the rotate, before the guard drops below), so
1194 // concurrent rotates keep seq order == rotation
1195 // order, and the seq is not used until submit.
1196 let seq = coord.next_rotate_seq();
1197 coord.note_pending();
1198 // Release flush_lock BEFORE the spawn so concurrent
1199 // commits can proceed while the stream runs.
1200 drop(_flush_lock_guard);
1201 let parent_manifest = self.cached_manifest.lock().clone();
1202 let rotated = crate::runtime::flush_coordinator::RotatedFlush {
1203 seq,
1204 old_l0_arc: rotate_out.old_l0_arc.clone(),
1205 wal_lsn: rotate_out.wal_lsn,
1206 current_version: rotate_out.current_version,
1207 name: None,
1208 parent_manifest,
1209 permit,
1210 flush_in_progress_guard: rotate_out.flush_in_progress_guard,
1211 };
1212 let writer = self.clone();
1213 let _ticket = coord.submit_for_stream(
1214 rotated,
1215 move |old_l0, wal, ver, n| async move {
1216 let outcome =
1217 writer.flush_stream_l1(old_l0, wal, ver, n).await?;
1218 Ok(crate::runtime::flush_coordinator::FlushOutcome {
1219 new_manifest: outcome.manifest,
1220 snapshot_id: outcome.snapshot_id,
1221 })
1222 },
1223 );
1224 flush_pending = true;
1225 // Early return — flush_lock already dropped.
1226 return Ok((wal_lsn, flush_pending));
1227 }
1228 Err(e) => {
1229 tracing::warn!("Async rotate failed (non-critical): {}", e);
1230 // No seq was allocated and pending was not
1231 // bumped (both moved into the Ok arm for Bug
1232 // #3), so the finalizer is not wedged. The
1233 // permit drops here, freeing the slot.
1234 }
1235 }
1236 }
1237 None => {
1238 // Race: someone else grabbed the last permit. Skip;
1239 // next commit will retry should_flush().
1240 metrics::counter!("uni_flush_trigger_skipped_total").increment(1);
1241 }
1242 }
1243 } else if let Err(e) = self.flush_inline_under_lock(None).await {
1244 tracing::warn!("Post-commit flush check failed (non-critical): {}", e);
1245 }
1246 }
1247
1248 Ok((wal_lsn, flush_pending))
1249 }
1250
1251 /// Flush the WAL buffer to durable storage.
1252 ///
1253 /// Returns the LSN of the flushed segment, or `0` when no WAL is configured.
1254 pub async fn flush_wal(&self) -> Result<u64> {
1255 let l0 = self.l0_manager.get_current();
1256 let wal = l0.read().wal.clone();
1257
1258 match wal {
1259 Some(wal) => Ok(wal.flush().await?),
1260 None => Ok(0),
1261 }
1262 }
1263
1264 /// Record property removals in the active L0 mutation stats.
1265 ///
1266 /// Routes to the transaction L0 if provided, otherwise to the main L0.
1267 pub fn track_properties_removed(&self, count: usize, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) {
1268 if count == 0 {
1269 return;
1270 }
1271 let l0 = self.resolve_l0(tx_l0);
1272 l0.write().mutation_stats.properties_removed += count;
1273 }
1274
1275 /// Validates vertex constraints for the given properties.
1276 /// In the new design, label is passed as a parameter since VID no longer embeds label.
1277 async fn validate_vertex_constraints_for_label(
1278 &self,
1279 vid: Vid,
1280 properties: &Properties,
1281 label: &str,
1282 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1283 ) -> Result<()> {
1284 self.validate_vertex_constraints_for_label_impl(vid, properties, label, tx_l0, false)
1285 .await
1286 }
1287
1288 /// Partial-update sibling: validates only constraints touching keys
1289 /// present in `properties` (the touched set). NOT NULL is checked
1290 /// only for touched keys; multi-key UNIQUE / CHECK / EXISTS are
1291 /// skipped when any referenced key is absent (the caller is
1292 /// expected to have routed to the full-row path in that case via
1293 /// `touched_needs_full_read`).
1294 async fn validate_vertex_constraints_for_label_partial(
1295 &self,
1296 vid: Vid,
1297 properties: &Properties,
1298 label: &str,
1299 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1300 ) -> Result<()> {
1301 self.validate_vertex_constraints_for_label_impl(vid, properties, label, tx_l0, true)
1302 .await
1303 }
1304
1305 async fn validate_vertex_constraints_for_label_impl(
1306 &self,
1307 vid: Vid,
1308 properties: &Properties,
1309 label: &str,
1310 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1311 partial: bool,
1312 ) -> Result<()> {
1313 let schema = self.schema_manager.schema();
1314
1315 {
1316 // 1. Check NOT NULL constraints (from Property definitions).
1317 // Under partial-update mode, skip properties NOT in
1318 // `properties` — they retain their previous (already-
1319 // validated) value.
1320 if let Some(props_meta) = schema.properties.get(label) {
1321 for (prop_name, meta) in props_meta {
1322 if !meta.nullable {
1323 let present = properties.get(prop_name);
1324 if partial && present.is_none() {
1325 continue;
1326 }
1327 if present.is_none_or(|v| v.is_null()) {
1328 log::warn!(
1329 "Constraint violation: Property '{}' cannot be null for label '{}'",
1330 prop_name,
1331 label
1332 );
1333 return Err(anyhow!(
1334 "Constraint violation: Property '{}' cannot be null",
1335 prop_name
1336 ));
1337 }
1338 }
1339 }
1340 }
1341
1342 // 2. Check Explicit Constraints (Unique, Check, etc.)
1343 for constraint in &schema.constraints {
1344 if !constraint.enabled {
1345 continue;
1346 }
1347 match &constraint.target {
1348 ConstraintTarget::Label(l) if l == label => {}
1349 _ => continue,
1350 }
1351
1352 match &constraint.constraint_type {
1353 ConstraintType::Unique {
1354 properties: unique_props,
1355 } => {
1356 // Support single and multi-property unique constraints
1357 if !unique_props.is_empty() {
1358 let mut key_values = Vec::new();
1359 let mut missing = false;
1360 for prop in unique_props {
1361 if let Some(val) = properties.get(prop) {
1362 key_values.push((prop.clone(), val.clone()));
1363 } else {
1364 missing = true; // Can't enforce if property missing (partial update?)
1365 // For INSERT, missing means null?
1366 // If property is nullable, unique constraint typically allows multiple nulls or ignores?
1367 // For now, only check if ALL keys are present
1368 }
1369 }
1370
1371 if !missing {
1372 self.check_unique_constraint_multi(label, &key_values, vid, tx_l0)
1373 .await?;
1374 }
1375 }
1376 }
1377 ConstraintType::Exists { property } => {
1378 if properties.get(property).is_none_or(|v| v.is_null()) {
1379 log::warn!(
1380 "Constraint violation: Property '{}' must exist for label '{}'",
1381 property,
1382 label
1383 );
1384 return Err(anyhow!(
1385 "Constraint violation: Property '{}' must exist",
1386 property
1387 ));
1388 }
1389 }
1390 ConstraintType::Check { expression } => {
1391 if !self.evaluate_check_constraint(expression, properties)? {
1392 return Err(anyhow!(
1393 "CHECK constraint '{}' violated: expression '{}' evaluated to false",
1394 constraint.name,
1395 expression
1396 ));
1397 }
1398 }
1399 _ => {
1400 return Err(anyhow!("Unsupported constraint type"));
1401 }
1402 }
1403 }
1404 }
1405 Ok(())
1406 }
1407
1408 /// Validates vertex constraints for a vertex with the given labels.
1409 /// Labels must be passed explicitly since the vertex may not yet be in L0.
1410 /// Unknown labels (not in schema) are skipped.
1411 async fn validate_vertex_constraints(
1412 &self,
1413 vid: Vid,
1414 properties: &Properties,
1415 labels: &[String],
1416 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1417 ) -> Result<()> {
1418 let schema = self.schema_manager.schema();
1419
1420 // Validate constraints only for known labels
1421 for label in labels {
1422 // Skip unknown labels (schemaless support)
1423 if schema.get_label_case_insensitive(label).is_none() {
1424 continue;
1425 }
1426 self.validate_vertex_constraints_for_label(vid, properties, label, tx_l0)
1427 .await?;
1428 }
1429
1430 // Check global ext_id uniqueness if ext_id is provided
1431 if let Some(ext_id) = properties.get("ext_id").and_then(|v| v.as_str()) {
1432 self.check_extid_globally_unique(ext_id, vid, tx_l0).await?;
1433 }
1434
1435 Ok(())
1436 }
1437
1438 /// Partial sibling of `validate_vertex_constraints` — validates only
1439 /// constraints touching keys present in `properties`. Used by
1440 /// `insert_vertex_partial`'s fast path; the caller pre-screens for
1441 /// multi-key UNIQUE constraints via `touched_needs_full_read`.
1442 async fn validate_vertex_constraints_partial(
1443 &self,
1444 vid: Vid,
1445 touched: &Properties,
1446 labels: &[String],
1447 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1448 ) -> Result<()> {
1449 let schema = self.schema_manager.schema();
1450 for label in labels {
1451 if schema.get_label_case_insensitive(label).is_none() {
1452 continue;
1453 }
1454 self.validate_vertex_constraints_for_label_partial(vid, touched, label, tx_l0)
1455 .await?;
1456 }
1457 if let Some(ext_id) = touched.get("ext_id").and_then(|v| v.as_str()) {
1458 self.check_extid_globally_unique(ext_id, vid, tx_l0).await?;
1459 }
1460 Ok(())
1461 }
1462
1463 /// Collect ext_ids and unique constraint keys from an iterator of vertex properties.
1464 ///
1465 /// Used to build a constraint key index from L0 buffers for batch validation.
1466 fn collect_constraint_keys_from_properties<'a>(
1467 properties_iter: impl Iterator<Item = &'a Properties>,
1468 label: &str,
1469 constraints: &[uni_common::core::schema::Constraint],
1470 existing_keys: &mut HashMap<String, HashSet<String>>,
1471 existing_extids: &mut HashSet<String>,
1472 ) {
1473 for props in properties_iter {
1474 if let Some(ext_id) = props.get("ext_id").and_then(|v| v.as_str()) {
1475 existing_extids.insert(ext_id.to_string());
1476 }
1477
1478 for constraint in constraints {
1479 if !constraint.enabled {
1480 continue;
1481 }
1482 if let ConstraintTarget::Label(l) = &constraint.target {
1483 if l != label {
1484 continue;
1485 }
1486 } else {
1487 continue;
1488 }
1489
1490 if let ConstraintType::Unique {
1491 properties: unique_props,
1492 } = &constraint.constraint_type
1493 {
1494 let mut key_parts = Vec::new();
1495 let mut all_present = true;
1496 for prop in unique_props {
1497 if let Some(val) = props.get(prop) {
1498 key_parts.push(format!("{}:{}", prop, val));
1499 } else {
1500 all_present = false;
1501 break;
1502 }
1503 }
1504 if all_present {
1505 let key = key_parts.join("|");
1506 existing_keys
1507 .entry(constraint.name.clone())
1508 .or_default()
1509 .insert(key);
1510 }
1511 }
1512 }
1513 }
1514 }
1515
1516 /// Validates constraints for a batch of vertices efficiently.
1517 ///
1518 /// This method builds an in-memory index from L0 buffers ONCE instead of scanning
1519 /// per vertex, reducing complexity from O(n²) to O(n) for bulk inserts.
1520 ///
1521 /// # Arguments
1522 /// * `vids` - VIDs of vertices being inserted
1523 /// * `properties_batch` - Properties for each vertex
1524 /// * `label` - Label for all vertices (assumes single label for now)
1525 ///
1526 /// # Performance
1527 /// For N vertices with unique constraints:
1528 /// - Old approach: O(N²) - scan L0 buffer N times
1529 /// - New approach: O(N) - scan L0 buffer once, build HashSet, check each vertex in O(1)
1530 async fn validate_vertex_batch_constraints(
1531 &self,
1532 vids: &[Vid],
1533 properties_batch: &[Properties],
1534 label: &str,
1535 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1536 ) -> Result<()> {
1537 if vids.len() != properties_batch.len() {
1538 return Err(anyhow!("VID/properties length mismatch"));
1539 }
1540
1541 let schema = self.schema_manager.schema();
1542
1543 // 1. Validate NOT NULL constraints for each vertex
1544 if let Some(props_meta) = schema.properties.get(label) {
1545 for (idx, properties) in properties_batch.iter().enumerate() {
1546 for (prop_name, meta) in props_meta {
1547 if !meta.nullable && properties.get(prop_name).is_none_or(|v| v.is_null()) {
1548 return Err(anyhow!(
1549 "Constraint violation at index {}: Property '{}' cannot be null",
1550 idx,
1551 prop_name
1552 ));
1553 }
1554 }
1555 }
1556 }
1557
1558 // 2. Build constraint key index from L0 buffers (ONCE for entire batch)
1559 let mut existing_keys: HashMap<String, HashSet<String>> = HashMap::new();
1560 let mut existing_extids: HashSet<String> = HashSet::new();
1561
1562 // Scan current L0 buffer
1563 {
1564 let l0 = self.l0_manager.get_current();
1565 let l0_guard = l0.read();
1566 Self::collect_constraint_keys_from_properties(
1567 l0_guard.vertex_properties.values(),
1568 label,
1569 &schema.constraints,
1570 &mut existing_keys,
1571 &mut existing_extids,
1572 );
1573 }
1574
1575 // Scan transaction L0 if present
1576 if let Some(tx_l0) = tx_l0 {
1577 let tx_l0_guard = tx_l0.read();
1578 Self::collect_constraint_keys_from_properties(
1579 tx_l0_guard.vertex_properties.values(),
1580 label,
1581 &schema.constraints,
1582 &mut existing_keys,
1583 &mut existing_extids,
1584 );
1585 }
1586
1587 // 3. Check batch vertices against index AND check for duplicates within batch
1588 let mut batch_keys: HashMap<String, HashMap<String, usize>> = HashMap::new();
1589 let mut batch_extids: HashMap<String, usize> = HashMap::new();
1590
1591 for (idx, (_vid, properties)) in vids.iter().zip(properties_batch.iter()).enumerate() {
1592 // Check ext_id uniqueness
1593 if let Some(ext_id) = properties.get("ext_id").and_then(|v| v.as_str()) {
1594 if existing_extids.contains(ext_id) {
1595 return Err(anyhow!(
1596 "Constraint violation at index {}: ext_id '{}' already exists",
1597 idx,
1598 ext_id
1599 ));
1600 }
1601 if let Some(first_idx) = batch_extids.get(ext_id) {
1602 return Err(anyhow!(
1603 "Constraint violation: ext_id '{}' duplicated in batch at indices {} and {}",
1604 ext_id,
1605 first_idx,
1606 idx
1607 ));
1608 }
1609 // Also check the main vertices table — the L0 scans above
1610 // miss vertices already flushed to L1, so without this a
1611 // batch insert (e.g. a fork promote onto primary) silently
1612 // twins a duplicate ext_id instead of erroring. Mirrors the
1613 // single-vertex `check_extid_globally_unique`.
1614 if let Ok(Some(found_vid)) =
1615 MainVertexDataset::find_by_ext_id(self.storage.backend(), ext_id, None).await
1616 {
1617 return Err(anyhow!(
1618 "Constraint violation at index {}: ext_id '{}' already exists (vertex {:?})",
1619 idx,
1620 ext_id,
1621 found_vid
1622 ));
1623 }
1624 batch_extids.insert(ext_id.to_string(), idx);
1625 }
1626
1627 // Check unique constraints
1628 for constraint in &schema.constraints {
1629 if !constraint.enabled {
1630 continue;
1631 }
1632 if let ConstraintTarget::Label(l) = &constraint.target {
1633 if l != label {
1634 continue;
1635 }
1636 } else {
1637 continue;
1638 }
1639
1640 match &constraint.constraint_type {
1641 ConstraintType::Unique {
1642 properties: unique_props,
1643 } => {
1644 let mut key_parts = Vec::new();
1645 let mut all_present = true;
1646 for prop in unique_props {
1647 if let Some(val) = properties.get(prop) {
1648 key_parts.push(format!("{}:{}", prop, val));
1649 } else {
1650 all_present = false;
1651 break;
1652 }
1653 }
1654
1655 if all_present {
1656 let key = key_parts.join("|");
1657
1658 // Check against existing L0 keys
1659 if let Some(keys) = existing_keys.get(&constraint.name)
1660 && keys.contains(&key)
1661 {
1662 return Err(anyhow!(
1663 "Constraint violation at index {}: Duplicate composite key for label '{}' (constraint '{}')",
1664 idx,
1665 label,
1666 constraint.name
1667 ));
1668 }
1669
1670 // Check for duplicates within batch
1671 let batch_constraint_keys =
1672 batch_keys.entry(constraint.name.clone()).or_default();
1673 if let Some(first_idx) = batch_constraint_keys.get(&key) {
1674 return Err(anyhow!(
1675 "Constraint violation: Duplicate key '{}' in batch at indices {} and {}",
1676 key,
1677 first_idx,
1678 idx
1679 ));
1680 }
1681 batch_constraint_keys.insert(key, idx);
1682 }
1683 }
1684 ConstraintType::Exists { property }
1685 if properties.get(property).is_none_or(|v| v.is_null()) =>
1686 {
1687 return Err(anyhow!(
1688 "Constraint violation at index {}: Property '{}' must exist",
1689 idx,
1690 property
1691 ));
1692 }
1693 ConstraintType::Check { expression }
1694 if !self.evaluate_check_constraint(expression, properties)? =>
1695 {
1696 return Err(anyhow!(
1697 "Constraint violation at index {}: CHECK constraint '{}' violated",
1698 idx,
1699 constraint.name
1700 ));
1701 }
1702 _ => {}
1703 }
1704 }
1705 }
1706
1707 // 4. Check storage for unique constraints (can batch this into a single query)
1708 for constraint in &schema.constraints {
1709 if !constraint.enabled {
1710 continue;
1711 }
1712 if let ConstraintTarget::Label(l) = &constraint.target {
1713 if l != label {
1714 continue;
1715 }
1716 } else {
1717 continue;
1718 }
1719
1720 if let ConstraintType::Unique {
1721 properties: unique_props,
1722 } = &constraint.constraint_type
1723 {
1724 // Build compound OR filter for all batch vertices
1725 let mut or_filters = Vec::new();
1726 for properties in properties_batch.iter() {
1727 let mut and_parts = Vec::new();
1728 let mut all_present = true;
1729 for prop in unique_props {
1730 if let Some(val) = properties.get(prop) {
1731 let val_str = match val {
1732 Value::String(s) => format!("'{}'", s.replace('\'', "''")),
1733 Value::Int(n) => n.to_string(),
1734 Value::Float(f) => f.to_string(),
1735 Value::Bool(b) => b.to_string(),
1736 _ => {
1737 all_present = false;
1738 break;
1739 }
1740 };
1741 and_parts.push(format!("{} = {}", prop, val_str));
1742 } else {
1743 all_present = false;
1744 break;
1745 }
1746 }
1747 if all_present {
1748 or_filters.push(format!("({})", and_parts.join(" AND ")));
1749 }
1750 }
1751
1752 #[cfg(feature = "lance-backend")]
1753 if !or_filters.is_empty() {
1754 let vid_list: Vec<String> =
1755 vids.iter().map(|v| v.as_u64().to_string()).collect();
1756 let filter = format!(
1757 "({}) AND _deleted = false AND _vid NOT IN ({})",
1758 or_filters.join(" OR "),
1759 vid_list.join(", ")
1760 );
1761
1762 if let Ok(ds) = self.storage.vertex_dataset(label)
1763 && let Ok(lance_ds) = ds.open_raw().await
1764 {
1765 let count = lance_ds.count_rows(Some(filter.clone())).await?;
1766 if count > 0 {
1767 return Err(anyhow!(
1768 "Constraint violation: Duplicate composite key for label '{}' in storage (constraint '{}')",
1769 label,
1770 constraint.name
1771 ));
1772 }
1773 }
1774 }
1775 }
1776 }
1777
1778 Ok(())
1779 }
1780
1781 /// Checks that ext_id is globally unique across all vertices.
1782 ///
1783 /// Searches L0 buffers (current, transaction, pending) and the main vertices table
1784 /// to ensure no other vertex uses this ext_id.
1785 ///
1786 /// # Errors
1787 ///
1788 /// Returns error if another vertex with the same ext_id exists.
1789 async fn check_extid_globally_unique(
1790 &self,
1791 ext_id: &str,
1792 current_vid: Vid,
1793 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1794 ) -> Result<()> {
1795 // Check L0 buffers: current, transaction, and pending flush
1796 let l0_buffers_to_check: Vec<Arc<RwLock<L0Buffer>>> = {
1797 let mut buffers = vec![self.l0_manager.get_current()];
1798 if let Some(tx_l0) = tx_l0 {
1799 buffers.push(tx_l0.clone());
1800 }
1801 buffers.extend(self.l0_manager.get_pending_flush());
1802 buffers
1803 };
1804
1805 for l0 in &l0_buffers_to_check {
1806 // O(1) per buffer via the maintained `extid_index` (the previous
1807 // full `vertex_properties` scan made constrained ingest O(n²)).
1808 if let Some(&vid) = l0.read().extid_index.get(ext_id)
1809 && vid != current_vid
1810 {
1811 return Err(anyhow!(
1812 "Constraint violation: ext_id '{}' already exists (vertex {:?})",
1813 ext_id,
1814 vid
1815 ));
1816 }
1817 }
1818
1819 // Check main vertices table (if it exists)
1820 // Pass None for global uniqueness check (not snapshot-isolated)
1821 let backend = self.storage.backend();
1822 if let Ok(Some(found_vid)) = MainVertexDataset::find_by_ext_id(backend, ext_id, None).await
1823 && found_vid != current_vid
1824 {
1825 return Err(anyhow!(
1826 "Constraint violation: ext_id '{}' already exists (vertex {:?})",
1827 ext_id,
1828 found_vid
1829 ));
1830 }
1831
1832 Ok(())
1833 }
1834
1835 /// Helper to get vertex labels from L0 buffer.
1836 fn get_vertex_labels_from_l0(&self, vid: Vid) -> Option<Vec<String>> {
1837 let l0 = self.l0_manager.get_current();
1838 let l0_guard = l0.read();
1839 // Check if vertex is tombstoned (deleted) - if so, return None
1840 if l0_guard.vertex_tombstones.contains(&vid) {
1841 return None;
1842 }
1843 l0_guard.get_vertex_labels(vid).map(|l| l.to_vec())
1844 }
1845
1846 /// Get vertex labels from all sources: current L0, pending L0s, and storage.
1847 /// This is the proper way to read vertex labels after a flush, as it checks both
1848 /// in-memory buffers and persisted storage.
1849 pub async fn get_vertex_labels(
1850 &self,
1851 vid: Vid,
1852 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1853 ) -> Option<Vec<String>> {
1854 // 1. Check current L0
1855 if let Some(labels) = self.get_vertex_labels_from_l0(vid) {
1856 return Some(labels);
1857 }
1858
1859 // 2. Check transaction L0 if present
1860 if let Some(tx_l0) = tx_l0 {
1861 let guard = tx_l0.read();
1862 if guard.vertex_tombstones.contains(&vid) {
1863 return None;
1864 }
1865 if let Some(labels) = guard.get_vertex_labels(vid) {
1866 return Some(labels.to_vec());
1867 }
1868 }
1869
1870 // 3. Check pending flush L0s
1871 for pending_l0 in self.l0_manager.get_pending_flush() {
1872 let guard = pending_l0.read();
1873 if guard.vertex_tombstones.contains(&vid) {
1874 return None;
1875 }
1876 if let Some(labels) = guard.get_vertex_labels(vid) {
1877 return Some(labels.to_vec());
1878 }
1879 }
1880
1881 // 4. Check storage
1882 self.find_vertex_labels_in_storage(vid).await.ok().flatten()
1883 }
1884
1885 /// Helper to get edge type from L0 buffer.
1886 fn get_edge_type_from_l0(&self, eid: Eid) -> Option<String> {
1887 let l0 = self.l0_manager.get_current();
1888 let l0_guard = l0.read();
1889 l0_guard.get_edge_type(eid).map(|s| s.to_string())
1890 }
1891
1892 /// Look up the edge type ID (u32) for an EID from the L0 buffer's edge endpoints.
1893 /// Falls back to the transaction L0 if available.
1894 pub fn get_edge_type_id_from_l0(
1895 &self,
1896 eid: Eid,
1897 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1898 ) -> Option<u32> {
1899 // Check transaction L0 first
1900 if let Some(tx_l0) = tx_l0 {
1901 let guard = tx_l0.read();
1902 if let Some((_, _, etype)) = guard.get_edge_endpoint_full(eid) {
1903 return Some(etype);
1904 }
1905 }
1906 // Fall back to main L0
1907 let l0 = self.l0_manager.get_current();
1908 let l0_guard = l0.read();
1909 l0_guard
1910 .get_edge_endpoint_full(eid)
1911 .map(|(_, _, etype)| etype)
1912 }
1913
1914 /// Set the type name for an edge (used for schemaless edge types).
1915 /// This is called during CREATE for edge types not found in the schema.
1916 pub fn set_edge_type(
1917 &self,
1918 eid: Eid,
1919 type_name: String,
1920 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1921 ) {
1922 self.resolve_l0(tx_l0).write().set_edge_type(eid, type_name);
1923 }
1924
1925 /// Evaluate a simple CHECK constraint expression.
1926 /// Supports: "property op value" (e.g., "age > 18", "status = 'active'")
1927 fn evaluate_check_constraint(&self, expression: &str, properties: &Properties) -> Result<bool> {
1928 let parts: Vec<&str> = expression.split_whitespace().collect();
1929 if parts.len() != 3 {
1930 // For now, only support "prop op val"
1931 // Fallback to true if too complex to avoid breaking, but warn
1932 log::warn!(
1933 "Complex CHECK constraint expression '{}' not fully supported yet; allowing write.",
1934 expression
1935 );
1936 return Ok(true);
1937 }
1938
1939 let prop_part = parts[0].trim_start_matches('(');
1940 // Handle "variable.property" format - take the part after the dot
1941 let prop_name = if let Some(idx) = prop_part.find('.') {
1942 &prop_part[idx + 1..]
1943 } else {
1944 prop_part
1945 };
1946
1947 let op = parts[1];
1948 let val_str = parts[2].trim_end_matches(')');
1949
1950 let prop_val = match properties.get(prop_name) {
1951 Some(v) => v,
1952 None => return Ok(true), // If property missing, CHECK usually passes (unless NOT NULL)
1953 };
1954
1955 // Parse value string (handle quotes for strings)
1956 let target_val = if (val_str.starts_with('\'') && val_str.ends_with('\''))
1957 || (val_str.starts_with('"') && val_str.ends_with('"'))
1958 {
1959 Value::String(val_str[1..val_str.len() - 1].to_string())
1960 } else if let Ok(n) = val_str.parse::<i64>() {
1961 Value::Int(n)
1962 } else if let Ok(n) = val_str.parse::<f64>() {
1963 Value::Float(n)
1964 } else if let Ok(b) = val_str.parse::<bool>() {
1965 Value::Bool(b)
1966 } else {
1967 // Check for internal format wrappers if they somehow leaked through
1968 if val_str.starts_with("Number(") && val_str.ends_with(')') {
1969 let n_str = &val_str[7..val_str.len() - 1];
1970 if let Ok(n) = n_str.parse::<i64>() {
1971 Value::Int(n)
1972 } else if let Ok(n) = n_str.parse::<f64>() {
1973 Value::Float(n)
1974 } else {
1975 Value::String(val_str.to_string())
1976 }
1977 } else {
1978 Value::String(val_str.to_string())
1979 }
1980 };
1981
1982 match op {
1983 "=" | "==" => Ok(prop_val == &target_val),
1984 "!=" | "<>" => Ok(prop_val != &target_val),
1985 ">" => self
1986 .compare_values(prop_val, &target_val)
1987 .map(|o| o.is_gt()),
1988 "<" => self
1989 .compare_values(prop_val, &target_val)
1990 .map(|o| o.is_lt()),
1991 ">=" => self
1992 .compare_values(prop_val, &target_val)
1993 .map(|o| o.is_ge()),
1994 "<=" => self
1995 .compare_values(prop_val, &target_val)
1996 .map(|o| o.is_le()),
1997 _ => {
1998 log::warn!("Unsupported operator '{}' in CHECK constraint", op);
1999 Ok(true)
2000 }
2001 }
2002 }
2003
2004 fn compare_values(&self, a: &Value, b: &Value) -> Result<std::cmp::Ordering> {
2005 use std::cmp::Ordering;
2006
2007 fn cmp_f64(x: f64, y: f64) -> Ordering {
2008 x.partial_cmp(&y).unwrap_or(Ordering::Equal)
2009 }
2010
2011 match (a, b) {
2012 (Value::Int(n1), Value::Int(n2)) => Ok(n1.cmp(n2)),
2013 (Value::Float(f1), Value::Float(f2)) => Ok(cmp_f64(*f1, *f2)),
2014 (Value::Int(n), Value::Float(f)) => Ok(cmp_f64(*n as f64, *f)),
2015 (Value::Float(f), Value::Int(n)) => Ok(cmp_f64(*f, *n as f64)),
2016 (Value::String(s1), Value::String(s2)) => Ok(s1.cmp(s2)),
2017 _ => Err(anyhow!(
2018 "Cannot compare incompatible types: {:?} vs {:?}",
2019 a,
2020 b
2021 )),
2022 }
2023 }
2024
2025 async fn check_unique_constraint_multi(
2026 &self,
2027 label: &str,
2028 key_values: &[(String, Value)],
2029 current_vid: Vid,
2030 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2031 ) -> Result<()> {
2032 // Serialize constraint key once for O(1) lookups
2033 let key = serialize_constraint_key(label, key_values);
2034
2035 // 1. Check L0 (in-memory) using O(1) constraint index
2036 {
2037 let l0 = self.l0_manager.get_current();
2038 let l0_guard = l0.read();
2039 if l0_guard.has_constraint_key(&key, current_vid) {
2040 return Err(anyhow!(
2041 "Constraint violation: Duplicate composite key for label '{}'",
2042 label
2043 ));
2044 }
2045 }
2046
2047 // 1b. Check pending-flush buffers (Bug #9A). A flush rotates a key's
2048 // buffer onto `pending_flush` and installs a fresh empty current
2049 // buffer; until the rotated rows reach Lance the key is invisible to
2050 // both the current-buffer check above and the storage check below, so
2051 // a duplicate could slip through that flush window. Mirror the read
2052 // paths (e.g. `check_extid_globally_unique`, `get_vertex_labels`) that
2053 // already consult `pending_flush`.
2054 for pending_l0 in self.l0_manager.get_pending_flush() {
2055 if pending_l0.read().has_constraint_key(&key, current_vid) {
2056 return Err(anyhow!(
2057 "Constraint violation: Duplicate composite key for label '{}' (in pending flush)",
2058 label
2059 ));
2060 }
2061 }
2062
2063 // Check Transaction L0
2064 if let Some(tx_l0) = tx_l0 {
2065 let tx_l0_guard = tx_l0.read();
2066 if tx_l0_guard.has_constraint_key(&key, current_vid) {
2067 return Err(anyhow!(
2068 "Constraint violation: Duplicate composite key for label '{}' (in tx)",
2069 label
2070 ));
2071 }
2072 }
2073
2074 // 2. Check Storage (L1/L2)
2075 let filters: Vec<String> = key_values
2076 .iter()
2077 .map(|(prop, val)| {
2078 let val_str = match val {
2079 Value::String(s) => format!("'{}'", s.replace('\'', "''")),
2080 Value::Int(n) => n.to_string(),
2081 Value::Float(f) => f.to_string(),
2082 Value::Bool(b) => b.to_string(),
2083 _ => "NULL".to_string(),
2084 };
2085 format!("{} = {}", prop, val_str)
2086 })
2087 .collect();
2088
2089 let mut filter = filters.join(" AND ");
2090 filter.push_str(&format!(
2091 " AND _deleted = false AND _vid != {}",
2092 current_vid.as_u64()
2093 ));
2094
2095 #[cfg(feature = "lance-backend")]
2096 if let Ok(ds) = self.storage.vertex_dataset(label)
2097 && let Ok(lance_ds) = ds.open_raw().await
2098 {
2099 let count = lance_ds.count_rows(Some(filter.clone())).await?;
2100 if count > 0 {
2101 return Err(anyhow!(
2102 "Constraint violation: Duplicate composite key for label '{}' (in storage). Filter: {}",
2103 label,
2104 filter
2105 ));
2106 }
2107 }
2108
2109 Ok(())
2110 }
2111
2112 async fn check_write_pressure(&self) -> Result<()> {
2113 let status = self
2114 .storage
2115 .compaction_status()
2116 .map_err(|e| anyhow::anyhow!("Failed to get compaction status: {}", e))?;
2117 let l1_runs = status.l1_runs;
2118 let throttle = &self.config.throttle;
2119
2120 if l1_runs >= throttle.hard_limit {
2121 log::warn!("Write stalled: L1 runs ({}) at hard limit", l1_runs);
2122 // Simple polling for now
2123 while self
2124 .storage
2125 .compaction_status()
2126 .map_err(|e| anyhow::anyhow!("Failed to get compaction status: {}", e))?
2127 .l1_runs
2128 >= throttle.hard_limit
2129 {
2130 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2131 }
2132 } else if l1_runs >= throttle.soft_limit {
2133 let excess = l1_runs - throttle.soft_limit;
2134 // Cap multiplier to avoid overflow
2135 let excess = std::cmp::min(excess, 31);
2136 let multiplier = 2_u32.pow(excess as u32);
2137 let delay = throttle.base_delay * multiplier;
2138 tokio::time::sleep(delay).await;
2139 }
2140 Ok(())
2141 }
2142
2143 /// Check transaction memory limit to prevent OOM.
2144 /// No-op when no transaction is active.
2145 fn check_transaction_memory(&self, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) -> Result<()> {
2146 if let Some(tx_l0) = tx_l0 {
2147 let size = tx_l0.read().estimated_size;
2148 if size > self.config.max_transaction_memory {
2149 return Err(anyhow!(
2150 "Transaction memory limit exceeded: {} bytes used, limit is {} bytes. \
2151 Roll back or commit the current transaction.",
2152 size,
2153 self.config.max_transaction_memory
2154 ));
2155 }
2156 }
2157 Ok(())
2158 }
2159
2160 async fn get_query_context(
2161 &self,
2162 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2163 ) -> Option<QueryContext> {
2164 Some(QueryContext::new_with_pending(
2165 self.l0_manager.get_current(),
2166 tx_l0.cloned(),
2167 self.l0_manager.get_pending_flush(),
2168 ))
2169 }
2170
2171 /// Layer-1 CRDT variant enforcement, shared by the single-vertex and batch
2172 /// write paths.
2173 ///
2174 /// Rejects a declared CRDT property written as a parsed CRDT value
2175 /// (`Value::Map`) whose variant differs from the schema's declared variant.
2176 /// A mismatch would make the commit-time merge silently overwrite instead of
2177 /// merge, and the OCC CRDT carve-out (`occ::crdt_carveout_overwrite` /
2178 /// `WriteSet::from_l0`) would hide it as a lost update — so it must be caught
2179 /// at write time, on *every* write path. `try_as_crdt` is `Map`-gated, so the
2180 /// JSON-string (Cypher) form and non-CRDT values pass through untouched: they
2181 /// are never carved out and stay conflictable.
2182 fn enforce_crdt_variants(
2183 props_meta: &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
2184 properties: &Properties,
2185 ) -> Result<()> {
2186 for (key, value) in properties {
2187 let Some(meta) = props_meta.get(key) else {
2188 continue;
2189 };
2190 let uni_common::core::schema::DataType::Crdt(expected) = &meta.r#type else {
2191 continue;
2192 };
2193 if let Some(crdt) = crate::runtime::l0::try_as_crdt(value)
2194 && crdt.type_name() != expected.type_name()
2195 {
2196 return Err(anyhow::Error::new(uni_common::UniError::Constraint {
2197 message: format!(
2198 "CRDT property '{key}' must be written as a {} value",
2199 expected.type_name()
2200 ),
2201 }));
2202 }
2203 }
2204 Ok(())
2205 }
2206
2207 /// Prepare a vertex for upsert by merging CRDT properties with existing values.
2208 ///
2209 /// When `label` is provided, uses it directly to look up property metadata.
2210 /// Otherwise falls back to discovering the label from L0 buffers and storage.
2211 ///
2212 /// # Errors
2213 ///
2214 /// Returns an error if CRDT property merging fails.
2215 async fn prepare_vertex_upsert(
2216 &self,
2217 vid: Vid,
2218 properties: &mut Properties,
2219 label: Option<&str>,
2220 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2221 ) -> Result<()> {
2222 let Some(pm) = &self.property_manager else {
2223 return Ok(());
2224 };
2225
2226 let schema = self.schema_manager.schema();
2227
2228 // Resolve label: use provided label or discover from L0/storage
2229 let discovered_labels;
2230 let label_name = if let Some(l) = label {
2231 Some(l)
2232 } else {
2233 discovered_labels = self.get_vertex_labels(vid, tx_l0).await;
2234 discovered_labels
2235 .as_ref()
2236 .and_then(|l| l.first().map(|s| s.as_str()))
2237 };
2238
2239 let Some(label_str) = label_name else {
2240 return Ok(());
2241 };
2242 let Some(props_meta) = schema.properties.get(label_str) else {
2243 return Ok(());
2244 };
2245
2246 // Identify CRDT properties in the insert data
2247 let crdt_keys: Vec<String> = properties
2248 .keys()
2249 .filter(|key| {
2250 props_meta.get(*key).is_some_and(|meta| {
2251 matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2252 })
2253 })
2254 .cloned()
2255 .collect();
2256
2257 if crdt_keys.is_empty() {
2258 return Ok(());
2259 }
2260
2261 // Enforce that each declared CRDT property written as a parsed CRDT value
2262 // (`Value::Map`) carries its declared variant. A mismatched variant makes
2263 // `merge_crdt_properties` overwrite rather than merge at commit, and the
2264 // OCC carve-out (`occ::crdt_carveout_overwrite` / `WriteSet::from_l0`)
2265 // would hide that as a silent lost update — reject it at the source.
2266 //
2267 // Only the `Map` form is checked: it is exactly the form the carve-out
2268 // applies to (`try_as_crdt` is `Map`-gated). A CRDT written as a JSON
2269 // string (the Cypher form) or a non-CRDT value is never carved out — it
2270 // stays conflictable — so it poses no carve-out soundness risk and is left
2271 // to the existing merge/parse path. This is the declared-property half of
2272 // the layered fix; the commit-time check covers undeclared CRDT-shaped values.
2273 Self::enforce_crdt_variants(props_meta, properties)?;
2274
2275 let ctx = self.get_query_context(tx_l0).await;
2276 for key in crdt_keys {
2277 let existing = pm.get_vertex_prop_with_ctx(vid, &key, ctx.as_ref()).await?;
2278 if !existing.is_null()
2279 && let Some(val) = properties.get_mut(&key)
2280 {
2281 *val = pm.merge_crdt_values(&existing, val)?;
2282 }
2283 }
2284
2285 Ok(())
2286 }
2287
2288 async fn prepare_edge_upsert(
2289 &self,
2290 eid: Eid,
2291 properties: &mut Properties,
2292 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2293 ) -> Result<()> {
2294 if let Some(pm) = &self.property_manager {
2295 let schema = self.schema_manager.schema();
2296 // Get edge type from L0 buffer instead of from EID
2297 let type_name = self.get_edge_type_from_l0(eid);
2298
2299 if let Some(ref t_name) = type_name
2300 && let Some(props_meta) = schema.properties.get(t_name)
2301 {
2302 let mut crdt_keys = Vec::new();
2303 for (key, _) in properties.iter() {
2304 if let Some(meta) = props_meta.get(key)
2305 && matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2306 {
2307 crdt_keys.push(key.clone());
2308 }
2309 }
2310
2311 if !crdt_keys.is_empty() {
2312 let ctx = self.get_query_context(tx_l0).await;
2313 for key in crdt_keys {
2314 let existing = pm.get_edge_prop(eid, &key, ctx.as_ref()).await?;
2315
2316 if !existing.is_null()
2317 && let Some(val) = properties.get_mut(&key)
2318 {
2319 *val = pm.merge_crdt_values(&existing, val)?;
2320 }
2321 }
2322 }
2323 }
2324 }
2325 Ok(())
2326 }
2327
2328 #[instrument(skip(self, properties), level = "trace")]
2329 pub async fn insert_vertex(
2330 &self,
2331 vid: Vid,
2332 properties: Properties,
2333 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2334 ) -> Result<()> {
2335 self.insert_vertex_with_labels(vid, properties, &[], tx_l0)
2336 .await?;
2337 Ok(())
2338 }
2339
2340 /// Component C1 (G4): before a non-transactional mutation merges into main
2341 /// L0, if an outstanding snapshot pins the current generation, freeze it
2342 /// aside so snapshots taken *before* this write stay isolated from it.
2343 ///
2344 /// `flush_lock` (acquired and released here) serializes the freeze against
2345 /// concurrent commit-time freezes/merges, matching the atomicity the tx
2346 /// commit path gets. No-op for transactional writes (their freeze happens at
2347 /// commit) and — the common case — when nothing is pinned, where it costs one
2348 /// atomic load. Freezes at most once per pinned generation: the freeze
2349 /// installs a fresh unpinned `current`, so later writes in the same bulk
2350 /// import see no pin and merge in place, and the snapshot keeps reading the
2351 /// frozen pre-import buffer.
2352 async fn freeze_for_non_tx_write_if_pinned(&self, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) {
2353 // Self-gates on the runtime SSI toggle: nothing pins a snapshot unless a
2354 // transaction began under `ssi_enabled`, so `is_current_pinned()` is
2355 // always false (one atomic load) when SSI is off.
2356 if tx_l0.is_none() && self.l0_manager.is_current_pinned() {
2357 let _flush_lock_guard = self.flush_lock.lock().await;
2358 // Re-check under the lock: a concurrent commit may have frozen first.
2359 if self.l0_manager.is_current_pinned() {
2360 self.l0_manager.freeze_current_for_snapshot();
2361 metrics::counter!("uni_l0_snapshot_freezes_total").increment(1);
2362 }
2363 }
2364 }
2365
2366 #[instrument(skip(self, properties, labels), level = "trace")]
2367 pub async fn insert_vertex_with_labels(
2368 &self,
2369 vid: Vid,
2370 mut properties: Properties,
2371 labels: &[String],
2372 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2373 ) -> Result<Properties> {
2374 let start = std::time::Instant::now();
2375 self.check_write_pressure().await?;
2376 self.check_transaction_memory(tx_l0)?;
2377
2378 // Component C1 (G4): a non-transactional write (`tx_l0 == None`, e.g. bulk
2379 // import / LOAD CSV) mutates main L0 directly, outside the commit-time
2380 // snapshot freeze. Freeze the pinned generation aside first so snapshots
2381 // taken before this write stay isolated from it.
2382 self.freeze_for_non_tx_write_if_pinned(tx_l0).await;
2383
2384 if !self.try_defer_embedding(labels, &properties, vid, tx_l0) {
2385 self.process_embeddings_for_labels(labels, &mut properties)
2386 .await?;
2387 }
2388 self.validate_vertex_constraints(vid, &properties, labels, tx_l0)
2389 .await?;
2390 self.prepare_vertex_upsert(
2391 vid,
2392 &mut properties,
2393 labels.first().map(|s| s.as_str()),
2394 tx_l0,
2395 )
2396 .await?;
2397
2398 // Clone properties and labels before moving into L0 to return them and populate constraint index
2399 let properties_copy = properties.clone();
2400 let labels_copy = labels.to_vec();
2401
2402 {
2403 // For a non-tx write, re-resolve the live `current` buffer and hold
2404 // `flush_lock` across the (synchronous) write so a concurrent flush
2405 // rotate (which takes `flush_lock` via `begin_flush`) cannot install
2406 // a fresh `current` between resolve and write, dropping our write
2407 // (Bug #4). For a tx write `resolve_l0` returns the tx-private
2408 // buffer, never the rotating `current`, so no `flush_lock` is needed
2409 // (and taking it would risk re-entrancy with the commit path).
2410 let _flush_lock_guard = if tx_l0.is_none() {
2411 Some(self.flush_lock.lock().await)
2412 } else {
2413 None
2414 };
2415 let l0 = self.resolve_l0(tx_l0);
2416 let mut l0_guard = l0.write();
2417 // Generation chaining: a non-tx CRDT write into the (post-freeze,
2418 // possibly empty) current buffer must merge against the chained
2419 // committed state, not shadow it. No-op when the chain is empty
2420 // or this is a tx-private write.
2421 if tx_l0.is_none() {
2422 let pending = self.l0_manager.get_pending_flush();
2423 if !pending.is_empty() {
2424 Self::seed_crdt_props(&pending, vid, &properties, &mut l0_guard);
2425 }
2426 }
2427 l0_guard.insert_vertex_with_labels(vid, properties, labels);
2428
2429 // Populate constraint index for O(1) duplicate detection
2430 let schema = self.schema_manager.schema();
2431 for label in &labels_copy {
2432 if schema.get_label_case_insensitive(label).is_none() {
2433 if self.config.strict_schema {
2434 return Err(anyhow::anyhow!(
2435 "Label '{}' is not defined in the schema \
2436 (strict_schema is enabled).",
2437 label
2438 ));
2439 }
2440 continue; // Schemaless: skip unknown labels.
2441 }
2442
2443 // For each unique constraint on this label, insert into constraint index
2444 for constraint in &schema.constraints {
2445 if !constraint.enabled {
2446 continue;
2447 }
2448 if let ConstraintTarget::Label(l) = &constraint.target {
2449 if l != label {
2450 continue;
2451 }
2452 } else {
2453 continue;
2454 }
2455
2456 if let ConstraintType::Unique {
2457 properties: unique_props,
2458 } = &constraint.constraint_type
2459 {
2460 let mut key_values = Vec::new();
2461 let mut all_present = true;
2462 for prop in unique_props {
2463 if let Some(val) = properties_copy.get(prop) {
2464 key_values.push((prop.clone(), val.clone()));
2465 } else {
2466 all_present = false;
2467 break;
2468 }
2469 }
2470
2471 if all_present {
2472 let key = serialize_constraint_key(label, &key_values);
2473 l0_guard.insert_constraint_key(key, vid);
2474 }
2475 }
2476 }
2477 }
2478 }
2479
2480 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2481 self.update_metrics();
2482
2483 if tx_l0.is_none() {
2484 self.check_flush().await?;
2485 }
2486 if start.elapsed().as_millis() > 100 {
2487 log::warn!("Slow insert_vertex: {}ms", start.elapsed().as_millis());
2488 }
2489 Ok(properties_copy)
2490 }
2491
2492 /// True iff routing this partial write through MergeInsert would
2493 /// miss a constraint check. Specifically: a multi-key UNIQUE
2494 /// constraint where the touched-set doesn't cover all member keys
2495 /// requires the unchanged keys from the existing row to compute
2496 /// the composite. Conservative: also returns true if any touched
2497 /// key is `ext_id` (uniqueness checked globally — handled in the
2498 /// full-row path).
2499 fn touched_needs_full_read(&self, touched: &Properties, labels: &[String]) -> bool {
2500 if touched.contains_key("ext_id") {
2501 return true;
2502 }
2503 let schema = self.schema_manager.schema();
2504 for label in labels {
2505 if schema.get_label_case_insensitive(label).is_none() {
2506 continue;
2507 }
2508 for constraint in &schema.constraints {
2509 if !constraint.enabled {
2510 continue;
2511 }
2512 if let ConstraintTarget::Label(l) = &constraint.target {
2513 if !l.eq_ignore_ascii_case(label) {
2514 continue;
2515 }
2516 } else {
2517 continue;
2518 }
2519 if let ConstraintType::Unique {
2520 properties: unique_props,
2521 } = &constraint.constraint_type
2522 {
2523 if unique_props.len() < 2 {
2524 continue; // single-key UNIQUE — partial path sees the key
2525 }
2526 if unique_props.iter().any(|p| touched.contains_key(p)) {
2527 return true;
2528 }
2529 }
2530 }
2531 }
2532 false
2533 }
2534
2535 /// Insert a vertex's FULL property row plus a touched-keys hint so
2536 /// the flush emits ONLY those columns via Lance MergeInsert.
2537 ///
2538 /// Caller must have read the full row (via PropertyManager) and
2539 /// applied SET-touched values on top before calling — same input
2540 /// shape as `insert_vertex_with_labels`. The new arg `touched_keys`
2541 /// is the set of property keys this SET statement actually
2542 /// assigned; L0 records it in `vertex_partial_keys[vid]` and the
2543 /// flush filters the MergeInsert source schema down to those keys.
2544 /// When `UniConfig::partial_lance_writes == false`, falls through
2545 /// to `insert_vertex_with_labels` (Append) — preserving bit-for-bit
2546 /// equivalence with prior releases.
2547 #[instrument(skip(self, props, touched_keys, labels), level = "trace")]
2548 pub async fn insert_vertex_partial_full(
2549 &self,
2550 vid: Vid,
2551 mut props: Properties,
2552 touched_keys: HashSet<String>,
2553 labels: &[String],
2554 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2555 ) -> Result<()> {
2556 if !self.config.partial_lance_writes
2557 || self.touched_needs_full_read(&props_subset(&props, &touched_keys), labels)
2558 {
2559 self.insert_vertex_with_labels(vid, props, labels, tx_l0)
2560 .await?;
2561 return Ok(());
2562 }
2563
2564 self.check_write_pressure().await?;
2565 self.check_transaction_memory(tx_l0)?;
2566 if !self.try_defer_embedding(labels, &props, vid, tx_l0) {
2567 self.process_embeddings_for_labels(labels, &mut props)
2568 .await?;
2569 }
2570 // Full-row validation runs because we have the complete map;
2571 // no need for the partial-only validator.
2572 self.validate_vertex_constraints(vid, &props, labels, tx_l0)
2573 .await?;
2574 {
2575 let l0 = self.resolve_l0(tx_l0);
2576 let mut l0_guard = l0.write();
2577 l0_guard.insert_vertex_partial_full(vid, props, touched_keys, labels);
2578 }
2579 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2580 metrics::counter!("uni_partial_writes_total").increment(1);
2581 self.update_metrics();
2582 if tx_l0.is_none() {
2583 self.check_flush().await?;
2584 }
2585 Ok(())
2586 }
2587
2588 /// Insert a vertex's *partial* property set without first reading the
2589 /// full row.
2590 ///
2591 /// When `WriterConfig::partial_lance_writes` is `true`, the touched
2592 /// keys flow into `L0Buffer::vertex_partial_keys` so the next flush
2593 /// emits them via Lance `MergeInsertBuilder` against a subset-of-
2594 /// schema source — preserving untouched columns (e.g., embeddings)
2595 /// byte-equal in Lance with no read at the caller and no write of
2596 /// those columns.
2597 ///
2598 /// When the flag is `false`, this falls back to the existing
2599 /// `insert_vertex_with_labels` path after merging `touched` with
2600 /// the current properties from L0/storage. The caller can therefore
2601 /// use this entry point unconditionally; the optimization activates
2602 /// only when the flag is on.
2603 #[instrument(skip(self, touched, labels), level = "trace")]
2604 pub async fn insert_vertex_partial(
2605 &self,
2606 vid: Vid,
2607 touched: Properties,
2608 labels: &[String],
2609 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2610 ) -> Result<()> {
2611 let needs_full_read =
2612 !self.config.partial_lance_writes || self.touched_needs_full_read(&touched, labels);
2613 if needs_full_read {
2614 // Flag-off fallback (or constraint-driven fallback): merge
2615 // `touched` with the current full property snapshot from
2616 // L0/storage and route through the existing path. Preserves
2617 // bit-for-bit equivalence with the pre-Round-11 release.
2618 let existing = if let Some(pm) = &self.property_manager {
2619 pm.get_all_vertex_props_with_ctx(vid, None)
2620 .await
2621 .unwrap_or_default()
2622 .unwrap_or_default()
2623 } else {
2624 Properties::new()
2625 };
2626 let mut merged = existing;
2627 for (k, v) in touched {
2628 merged.insert(k, v);
2629 }
2630 self.insert_vertex_with_labels(vid, merged, labels, tx_l0)
2631 .await?;
2632 return Ok(());
2633 }
2634
2635 // Flag-on fast path: stage the partial update directly. Pressure
2636 // checks, embedding generation, constraint validation all still
2637 // run — but the validator is the partial-aware variant that
2638 // skips NOT NULL / multi-key UNIQUE / CHECK / EXISTS for
2639 // properties not present in `touched`. Multi-key UNIQUE that
2640 // overlaps the touched set forces a fallback above via
2641 // `touched_needs_full_read`.
2642 let mut touched = touched;
2643 self.check_write_pressure().await?;
2644 self.check_transaction_memory(tx_l0)?;
2645 if !self.try_defer_embedding(labels, &touched, vid, tx_l0) {
2646 self.process_embeddings_for_labels(labels, &mut touched)
2647 .await?;
2648 }
2649 self.validate_vertex_constraints_partial(vid, &touched, labels, tx_l0)
2650 .await?;
2651
2652 {
2653 let l0 = self.resolve_l0(tx_l0);
2654 let mut l0_guard = l0.write();
2655 l0_guard.insert_vertex_partial(vid, touched, labels);
2656 }
2657
2658 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2659 metrics::counter!("uni_partial_writes_total").increment(1);
2660 self.update_metrics();
2661 if tx_l0.is_none() {
2662 self.check_flush().await?;
2663 }
2664 Ok(())
2665 }
2666
2667 /// Insert multiple vertices with batched operations.
2668 ///
2669 /// This method uses batched operations to achieve O(N) complexity instead of O(N²)
2670 /// for bulk inserts with unique constraints.
2671 ///
2672 /// # Performance Improvements
2673 /// - Batch VID allocation: 1 call instead of N calls
2674 /// - Batch constraint validation: O(N) instead of O(N²)
2675 /// - Batch embedding generation: 1 API call per config instead of N calls
2676 /// - Transaction wrapping: Automatic flush deferral, atomicity
2677 ///
2678 /// # Arguments
2679 /// * `vids` - Pre-allocated VIDs for the vertices
2680 /// * `properties_batch` - Properties for each vertex
2681 /// * `labels` - Labels for all vertices (assumes single label for simplicity)
2682 ///
2683 /// # Errors
2684 /// Returns error if:
2685 /// - VID/properties length mismatch
2686 /// - Constraint violation detected
2687 /// - Embedding generation fails
2688 /// - Transaction commit fails
2689 ///
2690 /// # Atomicity
2691 /// If this method fails, all changes are rolled back (if transaction was started here).
2692 pub async fn insert_vertices_batch(
2693 &self,
2694 vids: Vec<Vid>,
2695 mut properties_batch: Vec<Properties>,
2696 labels: Vec<String>,
2697 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2698 ) -> Result<Vec<Properties>> {
2699 let start = std::time::Instant::now();
2700
2701 // Validate inputs
2702 if vids.len() != properties_batch.len() {
2703 return Err(anyhow!(
2704 "VID/properties size mismatch: {} vids, {} properties",
2705 vids.len(),
2706 properties_batch.len()
2707 ));
2708 }
2709
2710 if vids.is_empty() {
2711 return Ok(Vec::new());
2712 }
2713
2714 // Batch operations — writes go directly to the resolved L0.
2715 // Atomicity is guaranteed by the caller holding the writer lock.
2716 let result = async {
2717 self.check_write_pressure().await?;
2718 self.check_transaction_memory(tx_l0)?;
2719
2720 // Component C1 (G4): batch bulk-import is the canonical non-tx write —
2721 // freeze the pinned generation aside before merging so snapshot
2722 // readers stay isolated. No-op when unpinned or transactional.
2723 self.freeze_for_non_tx_write_if_pinned(tx_l0).await;
2724
2725 // Batch embedding generation (1 API call per config)
2726 self.process_embeddings_for_batch(&labels, &mut properties_batch)
2727 .await?;
2728
2729 // Batch constraint validation (O(N) instead of O(N²))
2730 let label = labels
2731 .first()
2732 .ok_or_else(|| anyhow!("No labels provided"))?;
2733 self.validate_vertex_batch_constraints(&vids, &properties_batch, label, tx_l0)
2734 .await?;
2735
2736 // Batch prepare (CRDT merging if needed)
2737 // Check schema once: skip entirely if no CRDT properties for this label.
2738 // For new vertices (freshly allocated VIDs), there are no existing CRDT
2739 // values to merge, so the per-vertex lookup is unnecessary in that case.
2740 let has_crdt_fields = {
2741 let schema = self.schema_manager.schema();
2742 schema
2743 .properties
2744 .get(label.as_str())
2745 .is_some_and(|props_meta| {
2746 props_meta.values().any(|meta| {
2747 matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2748 })
2749 })
2750 };
2751
2752 if has_crdt_fields {
2753 // Layer-1 variant enforcement (G3): the batch path must reject a
2754 // declared-CRDT variant mismatch exactly as the single-vertex
2755 // `prepare_vertex_upsert` does. Without this, a wrong-variant CRDT
2756 // written via batch import slips past write-time validation and
2757 // the OCC carve-out then masks the overwrite as a lost update.
2758 {
2759 let schema = self.schema_manager.schema();
2760 if let Some(props_meta) = schema.properties.get(label.as_str()) {
2761 for props in &properties_batch {
2762 Self::enforce_crdt_variants(props_meta, props)?;
2763 }
2764 }
2765 }
2766
2767 // Batch fetch existing CRDT values: collect VIDs that need merging,
2768 // then query once via PropertyManager instead of per-vertex lookups.
2769 let schema = self.schema_manager.schema();
2770 let crdt_keys: Vec<String> = schema
2771 .properties
2772 .get(label.as_str())
2773 .map(|props_meta| {
2774 props_meta
2775 .iter()
2776 .filter(|(_, meta)| {
2777 matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2778 })
2779 .map(|(key, _)| key.clone())
2780 .collect()
2781 })
2782 .unwrap_or_default();
2783
2784 if let Some(pm) = &self.property_manager {
2785 let ctx = self.get_query_context(tx_l0).await;
2786 for (vid, props) in vids.iter().zip(&mut properties_batch) {
2787 for key in &crdt_keys {
2788 if props.contains_key(key) {
2789 let existing =
2790 pm.get_vertex_prop_with_ctx(*vid, key, ctx.as_ref()).await?;
2791 if !existing.is_null()
2792 && let Some(val) = props.get_mut(key)
2793 {
2794 *val = pm.merge_crdt_values(&existing, val)?;
2795 }
2796 }
2797 }
2798 }
2799 }
2800 }
2801
2802 // Batch L0 writes — route to active L0 (transaction L0 if active, else current).
2803 let target_l0 = self.resolve_l0(tx_l0);
2804
2805 let properties_result = properties_batch.clone();
2806 {
2807 let mut l0_guard = target_l0.write();
2808 for (vid, props) in vids.iter().zip(properties_batch.iter()) {
2809 l0_guard.insert_vertex_with_labels(*vid, props.clone(), &labels);
2810 }
2811 }
2812
2813 // Update metrics (batch increment)
2814 metrics::counter!("uni_l0_buffer_mutations_total").increment(vids.len() as u64);
2815 self.update_metrics();
2816
2817 Ok::<Vec<Properties>, anyhow::Error>(properties_result)
2818 }
2819 .await;
2820
2821 let props = result?;
2822
2823 if start.elapsed().as_millis() > 100 {
2824 log::warn!(
2825 "Slow insert_vertices_batch ({} vertices): {}ms",
2826 vids.len(),
2827 start.elapsed().as_millis()
2828 );
2829 }
2830
2831 Ok(props)
2832 }
2833
2834 /// Delete a vertex by VID.
2835 ///
2836 /// When `labels` is provided, uses them directly to populate L0 for
2837 /// correct tombstone flushing. Otherwise discovers labels from L0
2838 /// buffers and storage (which can be slow for many vertices).
2839 ///
2840 /// # Errors
2841 ///
2842 /// Returns an error if write pressure stalls, label lookup fails, or
2843 /// the L0 delete operation fails.
2844 #[instrument(skip(self, labels), level = "trace")]
2845 pub async fn delete_vertex(
2846 &self,
2847 vid: Vid,
2848 labels: Option<Vec<String>>,
2849 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2850 ) -> Result<()> {
2851 let start = std::time::Instant::now();
2852 self.check_write_pressure().await?;
2853 self.check_transaction_memory(tx_l0)?;
2854 self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
2855
2856 // Before deleting, ensure we have the vertex's labels stored in L0 so
2857 // the tombstone can be flushed to the correct label datasets. Discover
2858 // them up front (this may await storage) WITHOUT pinning the buffer we
2859 // will eventually mutate — for non-tx writes the live `current` buffer
2860 // is re-resolved below under `flush_lock`, so a concurrent rotate can't
2861 // drop our write (Bug #4). `resolve_l0` here is only used for cheap
2862 // reads that tolerate a racing rotate.
2863 let has_labels = {
2864 let l0_guard = self.resolve_l0(tx_l0);
2865 let guard = l0_guard.read();
2866 guard.vertex_labels.contains_key(&vid)
2867 };
2868
2869 let backfill_labels = if has_labels {
2870 None
2871 } else if let Some(provided) = labels {
2872 // Caller provided labels — skip the lookup entirely
2873 Some(provided)
2874 } else {
2875 // Discover labels from pending flush L0s, then storage
2876 let mut found = None;
2877 for pending_l0 in self.l0_manager.get_pending_flush() {
2878 let pending_guard = pending_l0.read();
2879 if let Some(l) = pending_guard.get_vertex_labels(vid) {
2880 found = Some(l.to_vec());
2881 break;
2882 }
2883 }
2884 if found.is_none() {
2885 found = self.find_vertex_labels_in_storage(vid).await?;
2886 }
2887 found
2888 };
2889
2890 // Test-only seam (no-op without the `failpoints` feature): pause a
2891 // non-transactional delete AFTER the awaited label discovery but BEFORE
2892 // it re-resolves the live buffer and writes the tombstone. A concurrent
2893 // flush can rotate+complete a buffer in this window; the fix re-resolves
2894 // `get_current()` and mutates it under `flush_lock`, so the tombstone
2895 // always lands in the live buffer (Bug #4 — silent lost delete across
2896 // L0 rotation).
2897 fail::fail_point!("nontx::after-capture");
2898
2899 // Apply the label backfill and the tombstone together. For a non-tx
2900 // write, hold `flush_lock` across the (synchronous) re-resolve + write
2901 // so a concurrent flush rotate (which takes `flush_lock` via
2902 // `begin_flush`) cannot install a fresh `current` between our resolve
2903 // and our write. For a tx write `resolve_l0` returns the tx-private
2904 // buffer (never the rotating `current`), so no `flush_lock` is needed
2905 // — and taking it there would risk re-entrancy with the commit path.
2906 if tx_l0.is_none() {
2907 let _flush_lock_guard = self.flush_lock.lock().await;
2908 let l0 = self.l0_manager.get_current();
2909 let mut guard = l0.write();
2910 if let Some(found_labels) = backfill_labels {
2911 guard.vertex_labels.insert(vid, found_labels);
2912 }
2913 guard.delete_vertex(vid)?;
2914 } else {
2915 let l0 = self.resolve_l0(tx_l0);
2916 let mut guard = l0.write();
2917 if let Some(found_labels) = backfill_labels {
2918 guard.vertex_labels.insert(vid, found_labels);
2919 }
2920 guard.delete_vertex(vid)?;
2921 }
2922 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2923 self.update_metrics();
2924
2925 if tx_l0.is_none() {
2926 self.check_flush().await?;
2927 }
2928 if start.elapsed().as_millis() > 100 {
2929 log::warn!("Slow delete_vertex: {}ms", start.elapsed().as_millis());
2930 }
2931 Ok(())
2932 }
2933
2934 /// Find vertex labels from storage by querying the main vertices table.
2935 /// Returns the labels from the latest non-deleted version of the vertex.
2936 async fn find_vertex_labels_in_storage(&self, vid: Vid) -> Result<Option<Vec<String>>> {
2937 use crate::backend::types::ScanRequest;
2938 use arrow_array::Array;
2939 use arrow_array::cast::AsArray;
2940
2941 let backend = self.storage.backend();
2942 let table_name = MainVertexDataset::table_name();
2943
2944 // Check if table exists first; if not, vertex hasn't been flushed to storage yet
2945 if !backend.table_exists(table_name).await? {
2946 return Ok(None);
2947 }
2948
2949 // Query for this specific vid (don't filter by _deleted yet - we need to find the latest version first)
2950 let filter = format!("_vid = {}", vid.as_u64());
2951 let batches = backend
2952 .scan(
2953 ScanRequest::all(table_name)
2954 .with_filter(filter)
2955 .with_columns(vec![
2956 "_vid".to_string(),
2957 "labels".to_string(),
2958 "_version".to_string(),
2959 "_deleted".to_string(),
2960 ]),
2961 )
2962 .await
2963 .unwrap_or_default();
2964
2965 // Find the row with the highest version number
2966 let mut max_version: Option<u64> = None;
2967 let mut labels: Option<Vec<String>> = None;
2968 let mut is_deleted = false;
2969
2970 for batch in batches {
2971 if batch.num_rows() == 0 {
2972 continue;
2973 }
2974
2975 let version_array = batch
2976 .column_by_name("_version")
2977 .unwrap()
2978 .as_primitive::<arrow_array::types::UInt64Type>();
2979
2980 let deleted_array = batch.column_by_name("_deleted").unwrap().as_boolean();
2981
2982 let labels_array = batch.column_by_name("labels").unwrap().as_list::<i32>();
2983
2984 for row_idx in 0..batch.num_rows() {
2985 let version = version_array.value(row_idx);
2986
2987 if max_version.is_none_or(|mv| version > mv) {
2988 is_deleted = deleted_array.value(row_idx);
2989
2990 let labels_list = labels_array.value(row_idx);
2991 let string_array = labels_list.as_string::<i32>();
2992 let vertex_labels: Vec<String> = (0..string_array.len())
2993 .filter(|&i| !string_array.is_null(i))
2994 .map(|i| string_array.value(i).to_string())
2995 .collect();
2996
2997 max_version = Some(version);
2998 labels = Some(vertex_labels);
2999 }
3000 }
3001 }
3002
3003 // If the latest version is deleted, return None
3004 if is_deleted { Ok(None) } else { Ok(labels) }
3005 }
3006
3007 #[expect(clippy::too_many_arguments)]
3008 #[instrument(skip(self, props, touched_keys), level = "trace")]
3009 pub async fn insert_edge_partial_full(
3010 &self,
3011 src_vid: Vid,
3012 dst_vid: Vid,
3013 edge_type: u32,
3014 eid: Eid,
3015 props: Properties,
3016 edge_type_name: Option<String>,
3017 touched_keys: HashSet<String>,
3018 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
3019 ) -> Result<()> {
3020 self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
3021 if !self.config.partial_lance_writes {
3022 return self
3023 .insert_edge(
3024 src_vid,
3025 dst_vid,
3026 edge_type,
3027 eid,
3028 props,
3029 edge_type_name,
3030 tx_l0,
3031 )
3032 .await;
3033 }
3034
3035 let start = std::time::Instant::now();
3036 self.check_write_pressure().await?;
3037 self.check_transaction_memory(tx_l0)?;
3038 let mut props = props;
3039 self.prepare_edge_upsert(eid, &mut props, tx_l0).await?;
3040
3041 let l0 = self.resolve_l0(tx_l0);
3042 l0.write().insert_edge_partial_full(
3043 src_vid,
3044 dst_vid,
3045 edge_type,
3046 eid,
3047 props,
3048 edge_type_name,
3049 touched_keys,
3050 )?;
3051
3052 if tx_l0.is_none() {
3053 let version = l0.read().current_version;
3054 self.adjacency_manager
3055 .insert_edge(src_vid, dst_vid, eid, edge_type, version);
3056 }
3057
3058 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
3059 metrics::counter!("uni_partial_writes_total").increment(1);
3060 self.update_metrics();
3061 if tx_l0.is_none() {
3062 self.check_flush().await?;
3063 }
3064 if start.elapsed().as_millis() > 100 {
3065 log::warn!(
3066 "Slow insert_edge_partial_full: {}ms",
3067 start.elapsed().as_millis()
3068 );
3069 }
3070 Ok(())
3071 }
3072
3073 #[expect(clippy::too_many_arguments)]
3074 pub async fn insert_edge(
3075 &self,
3076 src_vid: Vid,
3077 dst_vid: Vid,
3078 edge_type: u32,
3079 eid: Eid,
3080 mut properties: Properties,
3081 edge_type_name: Option<String>,
3082 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
3083 ) -> Result<()> {
3084 let start = std::time::Instant::now();
3085 self.check_write_pressure().await?;
3086 self.check_transaction_memory(tx_l0)?;
3087 self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
3088 self.prepare_edge_upsert(eid, &mut properties, tx_l0)
3089 .await?;
3090
3091 let l0 = self.resolve_l0(tx_l0);
3092 l0.write()
3093 .insert_edge(src_vid, dst_vid, edge_type, eid, properties, edge_type_name)?;
3094
3095 // Dual-write to AdjacencyManager overlay (survives flush).
3096 // Skip for transaction-local L0 -- transaction edges are overlaid separately.
3097 if tx_l0.is_none() {
3098 let version = l0.read().current_version;
3099 self.adjacency_manager
3100 .insert_edge(src_vid, dst_vid, eid, edge_type, version);
3101 }
3102
3103 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
3104 self.update_metrics();
3105
3106 if tx_l0.is_none() {
3107 self.check_flush().await?;
3108 }
3109 if start.elapsed().as_millis() > 100 {
3110 log::warn!("Slow insert_edge: {}ms", start.elapsed().as_millis());
3111 }
3112 Ok(())
3113 }
3114
3115 #[instrument(skip(self), level = "trace")]
3116 pub async fn delete_edge(
3117 &self,
3118 eid: Eid,
3119 src_vid: Vid,
3120 dst_vid: Vid,
3121 edge_type: u32,
3122 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
3123 ) -> Result<()> {
3124 let start = std::time::Instant::now();
3125 self.check_write_pressure().await?;
3126 self.check_transaction_memory(tx_l0)?;
3127 self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
3128 let l0 = self.resolve_l0(tx_l0);
3129
3130 l0.write().delete_edge(eid, src_vid, dst_vid, edge_type)?;
3131
3132 // Dual-write tombstone to AdjacencyManager overlay.
3133 if tx_l0.is_none() {
3134 let version = l0.read().current_version;
3135 self.adjacency_manager
3136 .add_tombstone(eid, src_vid, dst_vid, edge_type, version);
3137 }
3138 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
3139 self.update_metrics();
3140
3141 if tx_l0.is_none() {
3142 self.check_flush().await?;
3143 }
3144 if start.elapsed().as_millis() > 100 {
3145 log::warn!("Slow delete_edge: {}ms", start.elapsed().as_millis());
3146 }
3147 Ok(())
3148 }
3149
3150 /// Decide whether a flush should be triggered based on mutation count
3151 /// or elapsed time since the last flush.
3152 ///
3153 /// Extracted from [`Writer::check_flush`] so `commit_transaction_l0` can
3154 /// reuse the decision while bypassing the lock-acquiring entry point
3155 /// (it already holds `flush_lock`).
3156 fn should_flush(&self) -> bool {
3157 let count = self.l0_manager.get_current().read().mutation_count;
3158 if count == 0 {
3159 return false;
3160 }
3161 if count >= self.config.auto_flush_threshold {
3162 return true;
3163 }
3164 if let Some(interval) = self.config.auto_flush_interval
3165 && self.last_flush_time.lock().elapsed() >= interval
3166 && count >= self.config.auto_flush_min_mutations
3167 {
3168 return true;
3169 }
3170 false
3171 }
3172
3173 /// Check if flush should be triggered based on mutation count or time elapsed.
3174 /// This method is called after each write operation and can also be called
3175 /// by a background task for time-based flushing.
3176 pub async fn check_flush(&self) -> Result<()> {
3177 if self.should_flush() {
3178 self.flush_to_l1(None).await?;
3179 }
3180 Ok(())
3181 }
3182
3183 /// Process embeddings for a vertex using labels passed directly.
3184 /// Use this when labels haven't been stored to L0 yet.
3185 async fn process_embeddings_for_labels(
3186 &self,
3187 labels: &[String],
3188 properties: &mut Properties,
3189 ) -> Result<()> {
3190 let label_name = labels.first().map(|s| s.as_str());
3191 self.process_embeddings_impl(label_name, properties).await
3192 }
3193
3194 /// Phase B: if `defer_embeddings` is enabled in `UniConfig` and the
3195 /// vertex has an embedding config that hasn't been satisfied by the
3196 /// caller-provided properties, enqueue the VID in
3197 /// `L0Buffer::pending_embeddings` and return `true`. The caller then
3198 /// skips `process_embeddings_for_labels` and the embedding is computed
3199 /// in a single batched call at flush time via
3200 /// `drain_pending_embeddings`.
3201 ///
3202 /// Returns `false` (caller falls back to today's per-row eager embed)
3203 /// if any of:
3204 /// - the flag is off,
3205 /// - no label has an embedding config,
3206 /// - the user already provided the target property (matches the
3207 /// existing skip-if-present semantics at writer.rs:2727).
3208 ///
3209 /// Trade-off: when deferral is active, in-tx reads of the embedding
3210 /// column return only what was already in storage (or nothing for
3211 /// brand-new vertices). Existing tests that RETURN n.embedding in
3212 /// the same tx as a SET on the source column must run with the flag
3213 /// off; opt in only when no such reads happen between write and
3214 /// commit.
3215 fn try_defer_embedding(
3216 &self,
3217 labels: &[String],
3218 properties: &Properties,
3219 vid: Vid,
3220 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
3221 ) -> bool {
3222 if !self.config.defer_embeddings {
3223 return false;
3224 }
3225 let Some(label) = labels.first() else {
3226 return false;
3227 };
3228
3229 let schema = self.schema_manager.schema();
3230 let mut has_unsatisfied_cfg = false;
3231 for idx in &schema.indexes {
3232 if let IndexDefinition::Vector(v_cfg) = idx
3233 && v_cfg.label == *label
3234 && v_cfg.embedding_config.is_some()
3235 && !properties.contains_key(&v_cfg.property)
3236 {
3237 has_unsatisfied_cfg = true;
3238 break;
3239 }
3240 }
3241 if !has_unsatisfied_cfg {
3242 return false;
3243 }
3244
3245 let l0 = self.resolve_l0(tx_l0);
3246 let mut guard = l0.write();
3247 guard.pending_embeddings.insert(vid, label.clone());
3248 true
3249 }
3250
3251 /// Drain `pending_embeddings` from the rotated old-L0 right before
3252 /// `flush_stream_l1` reads it. Groups by label, issues one batched
3253 /// `process_embeddings_for_batch` call per label, and writes the
3254 /// resulting embedding vectors into each VID's `vertex_properties`
3255 /// map. After this returns, the flush proceeds against an L0 that
3256 /// looks no different from one whose embeddings were generated
3257 /// per-row at insert.
3258 ///
3259 /// Idempotent: a VID whose embedding was already materialized
3260 /// (e.g., by on-demand read paths in a future Phase B revision) is
3261 /// detected via `properties.contains_key(target_prop)` inside
3262 /// `process_embeddings_for_batch` (writer.rs:~2650), so re-running
3263 /// the drain is safe.
3264 async fn drain_pending_embeddings(&self, old_l0_arc: &Arc<RwLock<L0Buffer>>) -> Result<()> {
3265 let by_label: HashMap<String, Vec<Vid>> = {
3266 let guard = old_l0_arc.read();
3267 if guard.pending_embeddings.is_empty() {
3268 return Ok(());
3269 }
3270 let mut m: HashMap<String, Vec<Vid>> = HashMap::new();
3271 for (vid, label) in &guard.pending_embeddings {
3272 m.entry(label.clone()).or_default().push(*vid);
3273 }
3274 m
3275 };
3276
3277 for (label, vids) in by_label {
3278 let mut properties_batch: Vec<Properties> = {
3279 let guard = old_l0_arc.read();
3280 vids.iter()
3281 .map(|vid| {
3282 guard
3283 .vertex_properties
3284 .get(vid)
3285 .cloned()
3286 .unwrap_or_default()
3287 })
3288 .collect()
3289 };
3290
3291 self.process_embeddings_for_batch(std::slice::from_ref(&label), &mut properties_batch)
3292 .await?;
3293
3294 let mut guard = old_l0_arc.write();
3295 for (vid, props) in vids.iter().zip(properties_batch) {
3296 let target = guard.vertex_properties.entry(*vid).or_default();
3297 for (k, v) in props {
3298 target.insert(k, v);
3299 }
3300 guard.pending_embeddings.remove(vid);
3301 }
3302 }
3303 Ok(())
3304 }
3305
3306 /// Process embeddings for a batch of vertices efficiently.
3307 ///
3308 /// Groups vertices by embedding config and makes batched API calls to the
3309 /// embedding service instead of calling once per vertex.
3310 ///
3311 /// # Performance
3312 /// For N vertices with embedding config:
3313 /// - Old approach: N API calls to embedding service
3314 /// - New approach: 1 API call per embedding config (usually 1 total)
3315 async fn process_embeddings_for_batch(
3316 &self,
3317 labels: &[String],
3318 properties_batch: &mut [Properties],
3319 ) -> Result<()> {
3320 let label_name = labels.first().map(|s| s.as_str());
3321 let schema = self.schema_manager.schema();
3322
3323 if let Some(label) = label_name {
3324 // Find vector indexes with embedding config for this label
3325 let mut configs = Vec::new();
3326 for idx in &schema.indexes {
3327 if let IndexDefinition::Vector(v_config) = idx
3328 && v_config.label == label
3329 && let Some(emb_config) = &v_config.embedding_config
3330 {
3331 configs.push((v_config.property.clone(), emb_config.clone()));
3332 }
3333 }
3334
3335 if configs.is_empty() {
3336 return Ok(());
3337 }
3338
3339 for (target_prop, emb_config) in configs {
3340 // Collect input texts from all vertices that need embeddings
3341 let mut input_texts: Vec<String> = Vec::new();
3342 let mut needs_embedding: Vec<usize> = Vec::new();
3343
3344 for (idx, properties) in properties_batch.iter().enumerate() {
3345 // Skip if target property already exists
3346 if properties.contains_key(&target_prop) {
3347 continue;
3348 }
3349
3350 // Check if source properties exist
3351 let mut inputs = Vec::new();
3352 for src_prop in &emb_config.source_properties {
3353 if let Some(val) = properties.get(src_prop)
3354 && let Some(s) = val.as_str()
3355 {
3356 inputs.push(s.to_string());
3357 }
3358 }
3359
3360 if !inputs.is_empty() {
3361 let input_text = inputs.join(" ");
3362 let input_text = match &emb_config.document_prefix {
3363 Some(prefix) => format!("{prefix}{input_text}"),
3364 None => input_text,
3365 };
3366 input_texts.push(input_text);
3367 needs_embedding.push(idx);
3368 }
3369 }
3370
3371 if input_texts.is_empty() {
3372 continue;
3373 }
3374
3375 let runtime = self.xervo_runtime.get().ok_or_else(|| {
3376 anyhow!("Uni-Xervo runtime not configured for auto-embedding")
3377 })?;
3378 let embedder = runtime.embedding(&emb_config.alias).await?;
3379
3380 // Batch generate embeddings (single API call)
3381 let input_refs: Vec<&str> = input_texts.iter().map(|s| s.as_str()).collect();
3382 let embeddings = embedder.embed(input_refs).await?;
3383
3384 // Distribute results back to properties
3385 for (embedding_idx, &prop_idx) in needs_embedding.iter().enumerate() {
3386 if let Some(vec) = embeddings.get(embedding_idx) {
3387 let vals: Vec<Value> =
3388 vec.iter().map(|f| Value::Float(*f as f64)).collect();
3389 properties_batch[prop_idx].insert(target_prop.clone(), Value::List(vals));
3390 }
3391 }
3392 }
3393 }
3394
3395 Ok(())
3396 }
3397
3398 async fn process_embeddings_impl(
3399 &self,
3400 label_name: Option<&str>,
3401 properties: &mut Properties,
3402 ) -> Result<()> {
3403 let schema = self.schema_manager.schema();
3404
3405 if let Some(label) = label_name {
3406 // Find vector indexes with embedding config for this label
3407 let mut configs = Vec::new();
3408 for idx in &schema.indexes {
3409 if let IndexDefinition::Vector(v_config) = idx
3410 && v_config.label == label
3411 && let Some(emb_config) = &v_config.embedding_config
3412 {
3413 configs.push((v_config.property.clone(), emb_config.clone()));
3414 }
3415 }
3416
3417 if configs.is_empty() {
3418 log::info!("No embedding config found for label {}", label);
3419 }
3420
3421 for (target_prop, emb_config) in configs {
3422 // If target property already exists, skip (assume user provided it)
3423 if properties.contains_key(&target_prop) {
3424 continue;
3425 }
3426
3427 // Check if source properties exist
3428 let mut inputs = Vec::new();
3429 for src_prop in &emb_config.source_properties {
3430 if let Some(val) = properties.get(src_prop)
3431 && let Some(s) = val.as_str()
3432 {
3433 inputs.push(s.to_string());
3434 }
3435 }
3436
3437 if inputs.is_empty() {
3438 continue;
3439 }
3440
3441 let input_text = inputs.join(" ");
3442 let input_text = match &emb_config.document_prefix {
3443 Some(prefix) => format!("{prefix}{input_text}"),
3444 None => input_text,
3445 };
3446
3447 let runtime = self.xervo_runtime.get().ok_or_else(|| {
3448 anyhow!("Uni-Xervo runtime not configured for auto-embedding")
3449 })?;
3450 let embedder = runtime.embedding(&emb_config.alias).await?;
3451
3452 // Generate
3453 let embeddings = embedder.embed(vec![input_text.as_str()]).await?;
3454 if let Some(vec) = embeddings.first() {
3455 // Store as array of floats
3456 let vals: Vec<Value> = vec.iter().map(|f| Value::Float(*f as f64)).collect();
3457 properties.insert(target_prop.clone(), Value::List(vals));
3458 }
3459 }
3460 }
3461 Ok(())
3462 }
3463
3464 /// Flushes the current in-memory L0 buffer to L1 storage.
3465 ///
3466 /// # Lock Ordering
3467 ///
3468 /// To prevent deadlocks, locks must be acquired in the following order:
3469 /// 1. `Writer` lock (held by caller via outer `Arc<RwLock<Writer>>`; removed in Phase 4)
3470 /// 2. `flush_lock` (acquired by this entry point; held across the whole flush)
3471 /// 3. `L0Manager` lock (via `begin_flush` / `get_current`)
3472 /// 4. `L0Buffer` lock (individual buffer RWLocks)
3473 /// 5. `Index` / `Storage` locks (during actual flush)
3474 ///
3475 /// Callers that already hold `flush_lock` (today only `commit_transaction_l0`)
3476 /// must call `flush_inline_under_lock` (private) directly to avoid a re-entrant
3477 /// `tokio::sync::Mutex` deadlock — see concurrent_writer.md §5.5.
3478 pub async fn flush_to_l1(&self, name: Option<String>) -> Result<String> {
3479 // Drain any in-flight async flushes first. `flush_to_l1` is a
3480 // SYNCHRONIZATION BARRIER — callers (test fixtures, fork
3481 // setup, shutdown paths) rely on it as "all writes are now
3482 // durably in Lance". Without the drain, an async stream from
3483 // a recent commit might still be writing to Lance when
3484 // `flush_to_l1` returns, leaving a window where forks branch
3485 // off pre-write Lance state and lose data.
3486 if let Some(coord) = self.flush_coordinator.as_ref() {
3487 let _ = coord.drain(self.config.drop_fork_drain_timeout).await;
3488 }
3489 let _flush_lock_guard = self.flush_lock.lock().await;
3490 self.flush_inline_under_lock(name).await
3491 }
3492
3493 /// Flush L0→L1 and capture the fork point under one held `flush_lock`.
3494 ///
3495 /// Drains in-flight async flushes, takes `flush_lock`, runs the inline
3496 /// flush, and then — still holding the lock — reads the allocator
3497 /// high-water marks and each existing candidate dataset's Lance
3498 /// version. Capturing under the held lock is what makes the fork point
3499 /// atomic: no concurrent commit can advance the allocator and no
3500 /// concurrent flush can advance a dataset tip between the flush and the
3501 /// reads. See [`ForkPoint`].
3502 ///
3503 /// `candidate_dataset_names` are resolved to `{base_uri}/{name}.lance`;
3504 /// names with no `.lance` directory on disk are skipped (returned map
3505 /// has no entry for them), matching the fork branch loop's existence
3506 /// check.
3507 ///
3508 /// # Errors
3509 /// Propagates flush failures from `flush_inline_under_lock` and any
3510 /// per-dataset version read failure from `lance_branch::current_version`.
3511 ///
3512 /// # Deadlocks
3513 /// Must not be called by a task already holding `flush_lock` (e.g.
3514 /// `commit_transaction_l0`); the `tokio::sync::Mutex` is not reentrant.
3515 /// Fork creation never holds the lock, so the sole call site is safe.
3516 pub async fn flush_and_capture_fork_point(
3517 &self,
3518 candidate_dataset_names: &[String],
3519 ) -> Result<ForkPoint> {
3520 if let Some(coord) = self.flush_coordinator.as_ref() {
3521 let _ = coord.drain(self.config.drop_fork_drain_timeout).await;
3522 }
3523 let _flush_lock_guard = self.flush_lock.lock().await;
3524 self.flush_inline_under_lock(None).await?;
3525
3526 // Still under `flush_lock`: capture the allocator HWM, the MVCC
3527 // version HWM, and every existing dataset's Lance version so
3528 // nothing can interleave.
3529 let (vid_hwm, eid_hwm) = self.allocator.current_hwm().await;
3530 // The parent's current L0 version is the largest `_version` any
3531 // inherited row can carry (flushed or in-memory). A fork bootstraps
3532 // its version floor to this so a fork tx read still sees inherited
3533 // rows. Cheap read lock; no buffer clone.
3534 let version_hwm = self.l0_manager.get_current().read().current_version;
3535
3536 let base = self.storage.base_uri();
3537 let mut dataset_versions = BTreeMap::new();
3538 for name in candidate_dataset_names {
3539 let uri = join_lance_uri(base, name);
3540 if !lance_path_exists(&uri) {
3541 continue;
3542 }
3543 let version = crate::backend::lance_branch::current_version(&uri).await?;
3544 dataset_versions.insert(name.clone(), version);
3545 }
3546
3547 Ok(ForkPoint {
3548 vid_hwm,
3549 eid_hwm,
3550 dataset_versions,
3551 version_hwm,
3552 })
3553 }
3554
3555 /// Async-flush entry point: rotate under `flush_lock`, release the
3556 /// lock, then submit the stream phase to the [`FlushCoordinator`].
3557 /// Returns a [`FlushTicket`](crate::runtime::flush_coordinator::FlushTicket)
3558 /// that resolves when finalize completes.
3559 ///
3560 /// Errors if `config.async_flush_enabled = false` (the coordinator
3561 /// is `None` in that case — see `flush_coordinator` field doc).
3562 pub async fn flush_to_l1_async(
3563 self: &Arc<Self>,
3564 name: Option<String>,
3565 ) -> Result<crate::runtime::flush_coordinator::FlushTicket> {
3566 let coord = self
3567 .flush_coordinator
3568 .as_ref()
3569 .ok_or_else(|| anyhow!("async flush not enabled (config.async_flush_enabled=false)"))?
3570 .clone();
3571 // 1. Acquire permit FIRST (outside flush_lock) so we don't
3572 // introduce a permit-while-holding-flush-lock convoy.
3573 let permit = coord.acquire_permit().await?;
3574 // 2. Rotate under flush_lock (µs work), then allocate the rotate seq
3575 // and bump pending ONLY after the rotate succeeds (Bug #3). A failed
3576 // rotate (the `?` below) must consume neither: the finalizer
3577 // advances in strictly consecutive seq order and only decrements
3578 // pending on finalize, so a leaked seq/pending would wedge it
3579 // forever. The seq is allocated under `flush_lock`, immediately
3580 // after the rotate and before the guard drops, so concurrent rotates
3581 // keep seq order == rotation order, and the seq is unused until
3582 // submit. On the `?` error path the permit drops, freeing the slot.
3583 let (
3584 RotateOutput {
3585 old_l0_arc,
3586 wal_lsn,
3587 current_version,
3588 flush_in_progress_guard,
3589 },
3590 seq,
3591 ) = {
3592 let _flush_lock_guard = self.flush_lock.lock().await;
3593 let rotate_out = self.flush_l0_rotate().await?;
3594 let seq = coord.next_rotate_seq();
3595 coord.note_pending();
3596 (rotate_out, seq)
3597 };
3598 // 3. Build the coordinator's RotatedFlush. parent_manifest is the
3599 // cached_manifest snapshot at this moment.
3600 let parent_manifest = self.cached_manifest.lock().clone();
3601 let rotated = RotatedFlush {
3602 seq,
3603 old_l0_arc: old_l0_arc.clone(),
3604 wal_lsn,
3605 current_version,
3606 name: name.clone(),
3607 parent_manifest,
3608 permit,
3609 flush_in_progress_guard,
3610 };
3611 // 4. Spawn the stream phase via the coordinator. The closure
3612 // captures Arc<Writer> transiently — drops when stream
3613 // completes (bounded, ~50-500 ms).
3614 let writer = self.clone();
3615 let ticket = coord.submit_for_stream(rotated, move |old_l0, wal, ver, n| async move {
3616 let outcome = writer.flush_stream_l1(old_l0, wal, ver, n).await?;
3617 Ok(crate::runtime::flush_coordinator::FlushOutcome {
3618 new_manifest: outcome.manifest,
3619 snapshot_id: outcome.snapshot_id,
3620 })
3621 });
3622 Ok(ticket)
3623 }
3624
3625 /// Phase A+B+C of the flush: flush the WAL, rotate L0 (so the
3626 /// to-be-flushed buffer moves to `pending_flush` and a fresh L0 takes
3627 /// its place), and hand off the WAL to the new L0.
3628 ///
3629 /// Runs in microseconds. Must be called under `flush_lock` (the caller
3630 /// is responsible). The returned [`RotateOutput`] carries everything
3631 /// the subsequent stream + finalize phases need; in particular the
3632 /// [`FlushInProgressGuard`] is bound to the return value so it stays
3633 /// alive for the full flush lifetime — including any future async
3634 /// path where stream runs on a spawned task.
3635 async fn flush_l0_rotate(&self) -> Result<RotateOutput> {
3636 // Acquire the in-progress counter BEFORE any heavy work. The
3637 // guard lives on RotateOutput; dropping RotateOutput drops the
3638 // guard, so the counter goes back to zero exactly when the flush
3639 // is fully done.
3640 let flush_in_progress_guard = FlushInProgressGuard::new(&self.storage);
3641
3642 // A. Flush WAL BEFORE rotating L0. If WAL flush fails, the
3643 // current L0 is still active and mutations are retained in
3644 // memory until restart/retry.
3645 let wal_for_truncate = {
3646 let current_l0 = self.l0_manager.get_current();
3647 let l0_guard = current_l0.read();
3648 l0_guard.wal.clone()
3649 };
3650 // Test-only seam (no-op without the `failpoints` feature): inject a
3651 // WAL-flush failure here to drive the "failed async rotate wedges the
3652 // finalizer" regression (Bug #3). When configured to "return" it makes
3653 // `flush_l0_rotate` return Err exactly as a real WAL-flush failure would.
3654 fail::fail_point!("flush::rotate-fail", |_| {
3655 Err(anyhow!("flush::rotate-fail injected WAL-flush failure"))
3656 });
3657 let wal_lsn = if let Some(ref w) = wal_for_truncate {
3658 w.flush().await?
3659 } else {
3660 0
3661 };
3662
3663 // B. Begin flush: rotate L0 and keep old L0 visible to reads via
3664 // pending_flush until complete_flush is called by finalize.
3665 let old_l0_arc = self.l0_manager.begin_flush(0, None);
3666 metrics::counter!("uni_l0_buffer_rotations_total").increment(1);
3667
3668 // C. WAL handoff: record wal_lsn on old L0, transfer WAL handle
3669 // and current_version to the new L0.
3670 let current_version;
3671 {
3672 let mut old_l0_guard = old_l0_arc.write();
3673 current_version = old_l0_guard.current_version;
3674 old_l0_guard.wal_lsn_at_flush = wal_lsn;
3675 let wal = old_l0_guard.wal.take();
3676 let new_l0_arc = self.l0_manager.get_current();
3677 let mut new_l0_guard = new_l0_arc.write();
3678 new_l0_guard.wal = wal;
3679 new_l0_guard.current_version = current_version;
3680 }
3681
3682 Ok(RotateOutput {
3683 old_l0_arc,
3684 wal_lsn,
3685 current_version,
3686 flush_in_progress_guard,
3687 })
3688 }
3689
3690 /// Phases D, E, F, G of the flush: L1 collect, orphan resolve,
3691 /// manifest seed, Lance writes. Reads from `old_l0_arc` (kept in
3692 /// pending_flush by Phase B); writes append-only Lance datasets; does
3693 /// NOT call save_snapshot / set_latest_snapshot — those are
3694 /// finalize's job, so the manifest doesn't get published until the
3695 /// next phase.
3696 ///
3697 /// Today takes `&self`; in a follow-up commit this becomes a
3698 /// static `Send + 'static` function over `SharedFlushCtx` so it can
3699 /// run on a spawned task while concurrent commits proceed.
3700 async fn flush_stream_l1(
3701 &self,
3702 old_l0_arc: Arc<RwLock<L0Buffer>>,
3703 wal_lsn: u64,
3704 current_version: u64,
3705 name: Option<String>,
3706 ) -> Result<FlushOutcome> {
3707 // Test-only seam (no-op without the `failpoints` feature): the rotate
3708 // (begin_flush) already moved the to-be-flushed buffer onto
3709 // pending_flush and installed a fresh empty current buffer, but the
3710 // rotated rows are NOT yet durable in Lance. Pausing here holds that
3711 // window open to drive the unique-constraint-hole regression
3712 // (Bug #9 Mechanism A).
3713 fail::fail_point!("flush::after-rotate-before-lance");
3714
3715 // Phase B: materialize any deferred embeddings before column
3716 // extraction. No-op when `defer_embeddings` is off (the set will
3717 // be empty). On-demand reads of the embedding column are a TODO
3718 // for a future revision (see UniConfig::defer_embeddings docs).
3719 self.drain_pending_embeddings(&old_l0_arc).await?;
3720
3721 let schema = self.schema_manager.schema();
3722 // 2. Acquire Read lock on Old L0 for flushing
3723 let mut entries_by_type: HashMap<u32, Vec<L1Entry>> = HashMap::new();
3724 // (Vid, labels, properties, deleted, version)
3725 type VertexEntry = (Vid, Vec<String>, Properties, bool, u64);
3726 let mut vertices_by_label: HashMap<u16, Vec<VertexEntry>> = HashMap::new();
3727 // Partial-column updates (Lance MergeInsert path). Per-VID tuple:
3728 // (vid, full L0 properties map, version, set of keys to update).
3729 // Only the keys in the HashSet are emitted to the partial source;
3730 // the full props map is retained so the per-row column extractor
3731 // can read each touched key's value.
3732 type PartialEntry = (Vid, Properties, u64, std::collections::HashSet<String>);
3733 let mut partial_by_label: HashMap<u16, Vec<PartialEntry>> = HashMap::new();
3734 // DELETE-via-MergeInsert (Round-12 §B): tombstones flush as a
3735 // partial source with just `_vid`, `_deleted=true`, `_version`,
3736 // `_updated_at`. Skips the wide-row Append payload that adds
3737 // nothing on a soft-delete.
3738 let mut tombstones_by_label: HashMap<u16, Vec<(Vid, u64)>> = HashMap::new();
3739 let mut main_vertex_tombstones: Vec<(Vid, u64)> = Vec::new();
3740 // Collect vertex timestamps from L0 for flushing to storage
3741 let mut vertex_created_at: HashMap<Vid, i64> = HashMap::new();
3742 let mut vertex_updated_at: HashMap<Vid, i64> = HashMap::new();
3743 // Track tombstones missing labels for storage query fallback
3744 let mut orphaned_tombstones: Vec<(Vid, u64)> = Vec::new();
3745
3746 {
3747 let old_l0 = old_l0_arc.read();
3748
3749 // 1. Collect all edges and tombstones from L0
3750 for edge in old_l0.graph.edges() {
3751 let properties = old_l0
3752 .edge_properties
3753 .get(&edge.eid)
3754 .cloned()
3755 .unwrap_or_default();
3756 let version = old_l0.edge_versions.get(&edge.eid).copied().unwrap_or(0);
3757
3758 // Get timestamps from L0 buffer (populated during insert)
3759 let created_at = old_l0.edge_created_at.get(&edge.eid).copied();
3760 let updated_at = old_l0.edge_updated_at.get(&edge.eid).copied();
3761
3762 entries_by_type
3763 .entry(edge.edge_type)
3764 .or_default()
3765 .push(L1Entry {
3766 src_vid: edge.src_vid,
3767 dst_vid: edge.dst_vid,
3768 eid: edge.eid,
3769 op: Op::Insert,
3770 version,
3771 properties,
3772 created_at,
3773 updated_at,
3774 });
3775 }
3776
3777 // From tombstones
3778 for tombstone in old_l0.tombstones.values() {
3779 let version = old_l0
3780 .edge_versions
3781 .get(&tombstone.eid)
3782 .copied()
3783 .unwrap_or(0);
3784 // Get timestamps - for deletes, updated_at reflects deletion time
3785 let created_at = old_l0.edge_created_at.get(&tombstone.eid).copied();
3786 let updated_at = old_l0.edge_updated_at.get(&tombstone.eid).copied();
3787
3788 entries_by_type
3789 .entry(tombstone.edge_type)
3790 .or_default()
3791 .push(L1Entry {
3792 src_vid: tombstone.src_vid,
3793 dst_vid: tombstone.dst_vid,
3794 eid: tombstone.eid,
3795 op: Op::Delete,
3796 version,
3797 properties: HashMap::new(),
3798 created_at,
3799 updated_at,
3800 });
3801 }
3802
3803 // 1b. Collect vertices by label (using vertex_labels from L0)
3804 //
3805 // Helper: fan-out a single vertex entry into per-label buckets.
3806 // Each per-label table row carries the full label set so multi-label
3807 // info is preserved after flush.
3808 let push_vertex_to_labels =
3809 |vid: Vid,
3810 all_labels: &[String],
3811 props: Properties,
3812 deleted: bool,
3813 version: u64,
3814 out: &mut HashMap<u16, Vec<VertexEntry>>| {
3815 for label in all_labels {
3816 if let Some(label_id) = schema.label_id_by_name(label) {
3817 out.entry(label_id).or_default().push((
3818 vid,
3819 all_labels.to_vec(),
3820 props.clone(),
3821 deleted,
3822 version,
3823 ));
3824 }
3825 }
3826 };
3827
3828 for (vid, props) in &old_l0.vertex_properties {
3829 let version = old_l0.vertex_versions.get(vid).copied().unwrap_or(0);
3830 // Collect timestamps for this vertex
3831 if let Some(&ts) = old_l0.vertex_created_at.get(vid) {
3832 vertex_created_at.insert(*vid, ts);
3833 }
3834 if let Some(&ts) = old_l0.vertex_updated_at.get(vid) {
3835 vertex_updated_at.insert(*vid, ts);
3836 }
3837 if let Some(labels) = old_l0.vertex_labels.get(vid) {
3838 // Partial-write routing: when this VID was last
3839 // touched via `insert_vertex_partial` AND the
3840 // partial_lance_writes flag is on, send only the
3841 // touched columns to a MergeInsert batch. Otherwise
3842 // (CREATE, MERGE-ON-CREATE, full-replace SET, DELETE
3843 // — or flag off) use the existing full-row Append.
3844 let is_partial = self.config.partial_lance_writes
3845 && old_l0.vertex_partial_keys.contains_key(vid);
3846 if is_partial {
3847 if let Some(touched) = old_l0.vertex_partial_keys.get(vid) {
3848 for label in labels {
3849 if let Some(label_id) = schema.label_id_by_name(label) {
3850 partial_by_label.entry(label_id).or_default().push((
3851 *vid,
3852 props.clone(),
3853 version,
3854 touched.clone(),
3855 ));
3856 }
3857 }
3858 }
3859 } else {
3860 push_vertex_to_labels(
3861 *vid,
3862 labels,
3863 props.clone(),
3864 false,
3865 version,
3866 &mut vertices_by_label,
3867 );
3868 }
3869 }
3870 }
3871 for &vid in &old_l0.vertex_tombstones {
3872 let version = old_l0.vertex_versions.get(&vid).copied().unwrap_or(0);
3873 if let Some(&ts) = old_l0.vertex_updated_at.get(&vid) {
3874 vertex_updated_at.insert(vid, ts);
3875 }
3876 if let Some(labels) = old_l0.vertex_labels.get(&vid) {
3877 // Round-12 §B: tombstones flush via Lance MergeInsert
3878 // (just `_vid`, `_deleted=true`, `_version`,
3879 // `_updated_at`) — skipping the wide-row Append.
3880 // Unconditional (no `partial_lance_writes` gating);
3881 // tombstone Append carries no useful payload.
3882 for label in labels {
3883 if let Some(label_id) = schema.label_id_by_name(label) {
3884 tombstones_by_label
3885 .entry(label_id)
3886 .or_default()
3887 .push((vid, version));
3888 }
3889 }
3890 } else {
3891 // Tombstone missing labels (old WAL format) - collect for storage query fallback
3892 orphaned_tombstones.push((vid, version));
3893 }
3894 }
3895 } // Drop read lock
3896
3897 // Resolve orphaned tombstones (missing labels) from storage
3898 if !orphaned_tombstones.is_empty() {
3899 tracing::warn!(
3900 count = orphaned_tombstones.len(),
3901 "Tombstones missing labels in L0, querying storage as fallback"
3902 );
3903 for (vid, version) in orphaned_tombstones {
3904 if let Ok(Some(labels)) = self.find_vertex_labels_in_storage(vid).await
3905 && !labels.is_empty()
3906 {
3907 for label in &labels {
3908 if let Some(label_id) = schema.label_id_by_name(label) {
3909 // Round-12 §B: route through partial tombstone too.
3910 tombstones_by_label
3911 .entry(label_id)
3912 .or_default()
3913 .push((vid, version));
3914 }
3915 }
3916 }
3917 }
3918 }
3919
3920 // 1. Load previous snapshot from cache, or fall back to storage.
3921 //
3922 // Use clone() not take(): for the async path, multiple
3923 // concurrent streams may run; if we take() here, a sibling
3924 // stream sees cached_manifest = None and seeds from
3925 // load_latest_snapshot (stale), losing the chain. clone()
3926 // preserves the parent. Finalize writes back the new manifest
3927 // unconditionally.
3928 let mut manifest = if let Some(cached) = self.cached_manifest.lock().clone() {
3929 cached
3930 } else {
3931 self.storage
3932 .snapshot_manager()
3933 .load_latest_snapshot()
3934 .await?
3935 .unwrap_or_else(|| {
3936 SnapshotManifest::new(Uuid::new_v4().to_string(), schema.schema_version)
3937 })
3938 };
3939
3940 // Update snapshot metadata
3941 // Save parent snapshot ID before generating new one (for lineage tracking)
3942 let parent_id = manifest.snapshot_id.clone();
3943 manifest.parent_snapshot = Some(parent_id);
3944 manifest.snapshot_id = Uuid::new_v4().to_string();
3945 manifest.name = name;
3946 manifest.created_at = Utc::now();
3947 manifest.version_high_water_mark = current_version;
3948 manifest.wal_high_water_mark = wal_lsn;
3949 let snapshot_id = manifest.snapshot_id.clone();
3950
3951 tracing::Span::current().record("snapshot_id", &snapshot_id);
3952
3953 // 2. Write main unified tables FIRST (before deltas).
3954 // Ensures the dual-write invariant: by the time an EID appears in a
3955 // delta table, it already exists in main_edges. This prevents the
3956 // compaction debug_assert from firing when compaction interleaves
3957 // with flush at async yield points.
3958 //
3959 // 2.1 Main edges table
3960 let (main_edges, edge_created_at_map, edge_updated_at_map) = {
3961 let _old_l0 = old_l0_arc.read();
3962 let mut main_edges: Vec<(
3963 uni_common::core::id::Eid,
3964 Vid,
3965 Vid,
3966 String,
3967 Properties,
3968 bool,
3969 u64,
3970 )> = Vec::new();
3971 let mut edge_created_at_map: HashMap<uni_common::core::id::Eid, i64> = HashMap::new();
3972 let mut edge_updated_at_map: HashMap<uni_common::core::id::Eid, i64> = HashMap::new();
3973
3974 for (&edge_type_id, entries) in entries_by_type.iter() {
3975 for entry in entries {
3976 let edge_type_name = self
3977 .storage
3978 .schema_manager()
3979 .edge_type_name_by_id_unified(edge_type_id)
3980 .unwrap_or_else(|| "unknown".to_string());
3981
3982 let deleted = matches!(entry.op, Op::Delete);
3983 main_edges.push((
3984 entry.eid,
3985 entry.src_vid,
3986 entry.dst_vid,
3987 edge_type_name,
3988 entry.properties.clone(),
3989 deleted,
3990 entry.version,
3991 ));
3992
3993 if let Some(ts) = entry.created_at {
3994 edge_created_at_map.insert(entry.eid, ts);
3995 }
3996 if let Some(ts) = entry.updated_at {
3997 edge_updated_at_map.insert(entry.eid, ts);
3998 }
3999 }
4000 }
4001
4002 (main_edges, edge_created_at_map, edge_updated_at_map)
4003 };
4004
4005 if !main_edges.is_empty() {
4006 let main_edge_batch = MainEdgeDataset::build_record_batch(
4007 &main_edges,
4008 Some(&edge_created_at_map),
4009 Some(&edge_updated_at_map),
4010 )?;
4011 MainEdgeDataset::write_batch(self.storage.backend(), main_edge_batch).await?;
4012 MainEdgeDataset::ensure_default_indexes(self.storage.backend()).await?;
4013 }
4014
4015 // 2.2 Main vertices table
4016 let mut main_vertices: Vec<(Vid, Vec<String>, Properties, bool, u64)> = {
4017 let old_l0 = old_l0_arc.read();
4018 let mut vertices = Vec::new();
4019
4020 // Live vertices: full-row Append on the main table (the
4021 // props_json blob is required for global ID lookups). For
4022 // partial-row VIDs (vertex_partial_keys non-empty), the
4023 // main table still needs the full props for the
4024 // ext_id-uniqueness path; we keep the Append here. The
4025 // per-label Lance write IS partial via MergeInsert.
4026 for (vid, props) in &old_l0.vertex_properties {
4027 let version = old_l0.vertex_versions.get(vid).copied().unwrap_or(0);
4028 let labels = old_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
4029 vertices.push((*vid, labels, props.clone(), false, version));
4030 }
4031
4032 // Tombstones: collected into `main_vertex_tombstones` for
4033 // the MergeInsert path below; skipping the wide-row Append.
4034 for &vid in &old_l0.vertex_tombstones {
4035 let version = old_l0.vertex_versions.get(&vid).copied().unwrap_or(0);
4036 main_vertex_tombstones.push((vid, version));
4037 }
4038
4039 vertices
4040 };
4041
4042 // M8: durable label-only mutations across flush windows.
4043 //
4044 // `SET n:Label` / `REMOVE n:Label` mark the vid in
4045 // `vertex_label_overwrites` and update `vertex_labels`, but for a
4046 // vid flushed in a PRIOR window they never re-add it to
4047 // `vertex_properties`. The loops above key off `vertex_properties`,
4048 // so such a relabel would be silently lost: absent from the main
4049 // table, the per-label datasets, and the VidLabelsIndex (and so
4050 // `rebuild_vid_labels_index` reads stale labels after a restart).
4051 // The same-window create+relabel case already works because the
4052 // create put the vid in `vertex_properties`.
4053 //
4054 // Re-derive each overwrite-only vid by fetching its persisted props
4055 // and labels, then route it into `main_vertices` (main table +
4056 // index), the new per-label datasets, and a tombstone in any
4057 // per-label dataset it left. `MATCH (n:OldLabel)` scans the
4058 // per-label table directly, so the old-label tombstone is required.
4059 let overwrite_only: Vec<(Vid, Vec<String>, u64)> = {
4060 let old_l0 = old_l0_arc.read();
4061 old_l0
4062 .vertex_label_overwrites
4063 .iter()
4064 .filter(|vid| {
4065 !old_l0.vertex_properties.contains_key(*vid)
4066 && !old_l0.vertex_tombstones.contains(*vid)
4067 })
4068 .map(|vid| {
4069 let labels = old_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
4070 let version = old_l0.vertex_versions.get(vid).copied().unwrap_or(0);
4071 (*vid, labels, version)
4072 })
4073 .collect()
4074 };
4075 for (vid, new_labels, version) in overwrite_only {
4076 // Persisted props of the prior-window row — required so the
4077 // re-Appended main row does not blank the vertex's properties.
4078 let Some(props) = MainVertexDataset::find_props_by_vid(
4079 self.storage.backend(),
4080 vid,
4081 self.storage.version_high_water_mark(),
4082 )
4083 .await?
4084 else {
4085 tracing::warn!(
4086 vid = vid.as_u64(),
4087 "label-only mutation for a vid with no persisted main row; skipping flush \
4088 of its relabel"
4089 );
4090 continue;
4091 };
4092 // Labels the vid carried BEFORE this relabel; the storage read
4093 // reflects pre-flush state. Any label no longer present must be
4094 // tombstoned in its per-label dataset.
4095 let old_labels = self
4096 .find_vertex_labels_in_storage(vid)
4097 .await?
4098 .unwrap_or_default();
4099
4100 main_vertices.push((vid, new_labels.clone(), props.clone(), false, version));
4101 for label in &new_labels {
4102 if let Some(label_id) = schema.label_id_by_name(label) {
4103 vertices_by_label.entry(label_id).or_default().push((
4104 vid,
4105 new_labels.clone(),
4106 props.clone(),
4107 false,
4108 version,
4109 ));
4110 }
4111 }
4112 for label in &old_labels {
4113 if !new_labels.contains(label)
4114 && let Some(label_id) = schema.label_id_by_name(label)
4115 {
4116 tombstones_by_label
4117 .entry(label_id)
4118 .or_default()
4119 .push((vid, version));
4120 }
4121 }
4122 }
4123
4124 if !main_vertices.is_empty() {
4125 let main_vertex_batch = MainVertexDataset::build_record_batch(
4126 &main_vertices,
4127 Some(&vertex_created_at),
4128 Some(&vertex_updated_at),
4129 )?;
4130 MainVertexDataset::write_batch(self.storage.backend(), main_vertex_batch).await?;
4131 }
4132 // Round-12 §B: tombstones via MergeInsert on the main vertices
4133 // table. Independent of `vertex_properties` length.
4134 if !main_vertex_tombstones.is_empty() {
4135 let tomb_batch = MainVertexDataset::build_tombstone_partial_batch(
4136 &main_vertex_tombstones,
4137 Some(&vertex_updated_at),
4138 )?;
4139 MainVertexDataset::merge_insert_tombstone_batch(self.storage.backend(), tomb_batch)
4140 .await?;
4141 }
4142 if !main_vertices.is_empty() || !main_vertex_tombstones.is_empty() {
4143 MainVertexDataset::ensure_default_indexes(self.storage.backend()).await?;
4144 }
4145
4146 // Keep the VidLabelsIndex current for every flushed vertex. This is the
4147 // single place that sees all vertices: the per-label fan-out below skips
4148 // undeclared (schemaless) labels, so updating the index there would miss
4149 // them. Traversal-time label predicates read this index to resolve
4150 // labels for vertices that live only in Lance — notably on a fork, whose
4151 // data is flushed to Lance before branching. (GitHub #99)
4152 for (vid, labels, _props, _deleted, _version) in &main_vertices {
4153 self.storage.update_vid_labels_index(*vid, labels.clone());
4154 }
4155 for (vid, _version) in &main_vertex_tombstones {
4156 self.storage.remove_from_vid_labels_index(*vid);
4157 }
4158
4159 // 3. For each edge type, write FWD and BWD delta runs
4160 for (&edge_type_id, entries) in entries_by_type.iter() {
4161 // Get edge type name from unified lookup (handles both schema'd and schemaless)
4162 let edge_type_name = self
4163 .storage
4164 .schema_manager()
4165 .edge_type_name_by_id_unified(edge_type_id)
4166 .ok_or_else(|| anyhow!("Edge type ID {} not found", edge_type_id))?;
4167
4168 // FWD Run (sorted by src_vid)
4169 // Round-12 §A: split entries into full-row Append and
4170 // partial MergeInsert routes based on `edge_partial_keys`.
4171 // Edges in `edge_partial_keys` were last written via
4172 // `insert_edge_partial_full`; the per-edge-type delta
4173 // tables receive only the touched schema columns plus
4174 // (when any overflow key was touched) the regenerated
4175 // `overflow_json` blob. Untouched columns retain their
4176 // previous-version value via Lance MergeInsert.
4177 let partial_eids: std::collections::HashSet<Eid> = {
4178 let old_l0 = old_l0_arc.read();
4179 entries
4180 .iter()
4181 .filter(|e| {
4182 self.config.partial_lance_writes
4183 && old_l0.edge_partial_keys.contains_key(&e.eid)
4184 })
4185 .map(|e| e.eid)
4186 .collect()
4187 };
4188 let touched_union_by_eid: HashMap<Eid, std::collections::HashSet<String>> = {
4189 let old_l0 = old_l0_arc.read();
4190 partial_eids
4191 .iter()
4192 .filter_map(|eid| old_l0.edge_partial_keys.get(eid).map(|s| (*eid, s.clone())))
4193 .collect()
4194 };
4195 let (full_entries, partial_entries): (Vec<L1Entry>, Vec<L1Entry>) = entries
4196 .clone()
4197 .into_iter()
4198 .partition(|e| !partial_eids.contains(&e.eid));
4199
4200 let backend = self.storage.backend();
4201
4202 // FWD run (sorted by src_vid)
4203 let mut fwd_full = full_entries.clone();
4204 fwd_full.sort_by_key(|e| e.src_vid);
4205 let mut fwd_partial = partial_entries.clone();
4206 fwd_partial.sort_by_key(|e| e.src_vid);
4207 let fwd_ds = self.storage.delta_dataset(&edge_type_name, "fwd")?;
4208 if !fwd_full.is_empty() {
4209 let fwd_batch = fwd_ds.build_record_batch(&fwd_full, &schema)?;
4210 fwd_ds.write_run(backend, fwd_batch).await?;
4211 }
4212 if !fwd_partial.is_empty() {
4213 let touched_union: std::collections::HashSet<String> = fwd_partial
4214 .iter()
4215 .flat_map(|e| {
4216 touched_union_by_eid
4217 .get(&e.eid)
4218 .cloned()
4219 .unwrap_or_default()
4220 .into_iter()
4221 })
4222 .collect();
4223 let fwd_partial_batch =
4224 fwd_ds.build_partial_record_batch(&fwd_partial, &touched_union, &schema)?;
4225 fwd_ds
4226 .merge_insert_partial_run(backend, fwd_partial_batch)
4227 .await?;
4228 }
4229 fwd_ds.ensure_eid_index(backend).await?;
4230
4231 // BWD Run (sorted by dst_vid)
4232 let mut bwd_full = full_entries.clone();
4233 bwd_full.sort_by_key(|e| e.dst_vid);
4234 let mut bwd_partial = partial_entries.clone();
4235 bwd_partial.sort_by_key(|e| e.dst_vid);
4236 let bwd_ds = self.storage.delta_dataset(&edge_type_name, "bwd")?;
4237 if !bwd_full.is_empty() {
4238 let bwd_batch = bwd_ds.build_record_batch(&bwd_full, &schema)?;
4239 bwd_ds.write_run(backend, bwd_batch).await?;
4240 }
4241 if !bwd_partial.is_empty() {
4242 let touched_union: std::collections::HashSet<String> = bwd_partial
4243 .iter()
4244 .flat_map(|e| {
4245 touched_union_by_eid
4246 .get(&e.eid)
4247 .cloned()
4248 .unwrap_or_default()
4249 .into_iter()
4250 })
4251 .collect();
4252 let bwd_partial_batch =
4253 bwd_ds.build_partial_record_batch(&bwd_partial, &touched_union, &schema)?;
4254 bwd_ds
4255 .merge_insert_partial_run(backend, bwd_partial_batch)
4256 .await?;
4257 }
4258 bwd_ds.ensure_eid_index(backend).await?;
4259
4260 // Update Manifest
4261 let current_snap =
4262 manifest
4263 .edges
4264 .entry(edge_type_name.to_string())
4265 .or_insert(EdgeSnapshot {
4266 version: 0,
4267 count: 0,
4268 lance_version: 0,
4269 });
4270 current_snap.version += 1;
4271 current_snap.count += entries.len() as u64;
4272 // LanceDB tables don't expose Lance version directly
4273 current_snap.lance_version = 0;
4274
4275 // Note: No CSR invalidation needed. AdjacencyManager's overlay
4276 // already has these edges via dual-write in insert_edge/delete_edge.
4277 }
4278
4279 // 4. Per-label vertex table writes
4280 // Iterate all labels that have either full-row OR partial-write
4281 // data pending. A label may appear in only one of the two maps
4282 // (e.g., all updates on this label were partial-only).
4283 let all_label_ids: std::collections::HashSet<u16> = vertices_by_label
4284 .keys()
4285 .chain(partial_by_label.keys())
4286 .chain(tombstones_by_label.keys())
4287 .copied()
4288 .collect();
4289 for label_id in all_label_ids {
4290 let vertices = vertices_by_label.remove(&label_id).unwrap_or_default();
4291 let label_name = schema
4292 .label_name_by_id(label_id)
4293 .ok_or_else(|| anyhow!("Label ID {} not found", label_id))?;
4294
4295 let ds = self.storage.vertex_dataset(label_name)?;
4296
4297 // Collect inverted index updates before consuming vertices
4298 // Maps: cfg.property -> (added, removed)
4299 type InvertedUpdateMap = HashMap<String, (HashMap<Vid, Vec<String>>, HashSet<Vid>)>;
4300 let mut inverted_updates: InvertedUpdateMap = HashMap::new();
4301
4302 for idx in &schema.indexes {
4303 if let IndexDefinition::Inverted(cfg) = idx
4304 && cfg.label == label_name
4305 {
4306 let mut added: HashMap<Vid, Vec<String>> = HashMap::new();
4307 let mut removed: HashSet<Vid> = HashSet::new();
4308
4309 for (vid, _labels, props, deleted, _version) in &vertices {
4310 if *deleted {
4311 removed.insert(*vid);
4312 } else if let Some(prop_value) = props.get(&cfg.property) {
4313 // Extract terms from the property value (List<String>)
4314 if let Some(arr) = prop_value.as_array() {
4315 let terms: Vec<String> = arr
4316 .iter()
4317 .filter_map(|v| v.as_str().map(ToString::to_string))
4318 .collect();
4319 if !terms.is_empty() {
4320 added.insert(*vid, terms);
4321 }
4322 }
4323 }
4324 }
4325 // Round-12 §B: tombstones no longer in `vertices`;
4326 // pull them from `tombstones_by_label` for inverted
4327 // index removal.
4328 if let Some(tomb_rows) = tombstones_by_label.get(&label_id) {
4329 for (vid, _) in tomb_rows {
4330 removed.insert(*vid);
4331 }
4332 }
4333
4334 if !added.is_empty() || !removed.is_empty() {
4335 inverted_updates.insert(cfg.property.clone(), (added, removed));
4336 }
4337 }
4338 }
4339
4340 let mut v_data = Vec::new();
4341 let mut d_data = Vec::new();
4342 let mut ver_data = Vec::new();
4343 for (vid, labels, props, deleted, version) in vertices {
4344 v_data.push((vid, labels, props));
4345 d_data.push(deleted);
4346 ver_data.push(version);
4347 }
4348
4349 let backend = self.storage.backend();
4350
4351 // Skip the full-row Append entirely if this label only has
4352 // partial-write rows pending.
4353 if !v_data.is_empty() {
4354 let batch = ds.build_record_batch_with_timestamps(
4355 &v_data,
4356 &d_data,
4357 &ver_data,
4358 &schema,
4359 Some(&vertex_created_at),
4360 Some(&vertex_updated_at),
4361 )?;
4362 ds.write_batch(backend, batch, &schema).await?;
4363 }
4364
4365 // Partial-column batch (Lance MergeInsert path). The flag
4366 // gates whether the routing classified any VIDs as partial;
4367 // outside the flag this collection is always empty so the
4368 // call below is a cheap no-op.
4369 if let Some(partial_rows) = partial_by_label.remove(&label_id)
4370 && !partial_rows.is_empty()
4371 {
4372 let touched_union: std::collections::HashSet<String> = partial_rows
4373 .iter()
4374 .flat_map(|(_, _, _, keys)| keys.iter().cloned())
4375 .collect();
4376 let pairs: Vec<(Vid, Properties)> = partial_rows
4377 .iter()
4378 .map(|(vid, props, _, _)| (*vid, props.clone()))
4379 .collect();
4380 let versions: Vec<u64> = partial_rows.iter().map(|(_, _, v, _)| *v).collect();
4381 let partial_batch = ds.build_partial_record_batch(
4382 &pairs,
4383 &versions,
4384 &touched_union,
4385 &schema,
4386 Some(&vertex_updated_at),
4387 )?;
4388 if partial_batch.num_rows() > 0 {
4389 ds.merge_insert_batch(backend, partial_batch).await?;
4390 }
4391 }
4392
4393 // Tombstone batch (Round-12 §B): always MergeInsert with
4394 // just `_vid`, `_deleted=true`, `_version`, `_updated_at`.
4395 // No partial_lance_writes gating — tombstones never carry
4396 // useful property payload to write. Captured tombstone vids
4397 // also drive `remove_from_vid_labels_index` below.
4398 let tombstone_rows = tombstones_by_label.remove(&label_id).unwrap_or_default();
4399 if !tombstone_rows.is_empty() {
4400 let tomb_batch =
4401 ds.build_tombstone_partial_batch(&tombstone_rows, Some(&vertex_updated_at))?;
4402 if tomb_batch.num_rows() > 0 {
4403 ds.merge_insert_batch(backend, tomb_batch).await?;
4404 }
4405 }
4406
4407 ds.ensure_default_indexes(backend).await?;
4408
4409 // VidLabelsIndex maintenance is centralized at the main-vertex
4410 // flush above (it sees both schema'd and schemaless vertices).
4411
4412 // Update Manifest
4413 let current_snap =
4414 manifest
4415 .vertices
4416 .entry(label_name.to_string())
4417 .or_insert(LabelSnapshot {
4418 version: 0,
4419 count: 0,
4420 lance_version: 0,
4421 });
4422 current_snap.version += 1;
4423 current_snap.count += v_data.len() as u64;
4424 // LanceDB tables don't expose Lance version directly
4425 current_snap.lance_version = 0;
4426
4427 // Invalidate table cache to ensure next read picks up new version
4428 self.storage.invalidate_table_cache(label_name);
4429
4430 // Apply inverted index updates incrementally
4431 #[cfg(feature = "lance-backend")]
4432 for idx in &schema.indexes {
4433 if let IndexDefinition::Inverted(cfg) = idx
4434 && cfg.label == label_name
4435 && let Some((added, removed)) = inverted_updates.get(&cfg.property)
4436 {
4437 self.storage
4438 .index_manager()
4439 .update_inverted_index_incremental(cfg, added, removed)
4440 .await?;
4441 }
4442 }
4443
4444 // Update UID index with new vertex mappings
4445 // Collect (UniId, Vid) mappings from non-deleted vertices
4446 #[cfg(feature = "lance-backend")]
4447 {
4448 let mut uid_mappings: Vec<(uni_common::core::id::UniId, Vid)> = Vec::new();
4449 for (vid, _labels, props) in &v_data {
4450 let ext_id = props.get("ext_id").and_then(|v| v.as_str());
4451 let uid = crate::storage::vertex::VertexDataset::compute_vertex_uid(
4452 label_name, ext_id, props,
4453 );
4454 uid_mappings.push((uid, *vid));
4455 }
4456
4457 if !uid_mappings.is_empty()
4458 && let Ok(uid_index) = self.storage.uid_index(label_name)
4459 {
4460 // Stamp mappings with this flush's MVCC version so a later
4461 // re-create of the same UID deterministically outranks the
4462 // stale mapping (review C3).
4463 uid_index
4464 .write_mapping_versioned(&uid_mappings, current_version)
4465 .await?;
4466 }
4467 }
4468 }
4469 Ok(FlushOutcome {
4470 manifest,
4471 snapshot_id,
4472 })
4473 }
4474
4475 /// Composition entry that assumes the caller already holds `flush_lock`.
4476 /// Runs rotate + stream + finalize_locked in sequence. Used by
4477 /// [`Writer::flush_to_l1`] (acquires the lock first) and by
4478 /// `commit_transaction_l0`'s post-merge auto-flush branch (which already
4479 /// holds the lock from the commit critical section).
4480 #[instrument(
4481 skip(self),
4482 fields(snapshot_id, mutations_count, size_bytes),
4483 level = "info"
4484 )]
4485 async fn flush_inline_under_lock(&self, name: Option<String>) -> Result<String> {
4486 let start = std::time::Instant::now();
4487
4488 let (initial_size, initial_count) = {
4489 let l0_arc = self.l0_manager.get_current();
4490 let l0 = l0_arc.read();
4491 (l0.estimated_size, l0.mutation_count)
4492 };
4493 tracing::Span::current().record("size_bytes", initial_size);
4494 tracing::Span::current().record("mutations_count", initial_count);
4495
4496 debug!("Starting L0 flush to L1");
4497
4498 // Phases A (WAL pre-flush), B (rotate), C (WAL handoff).
4499 // FlushInProgressGuard lives on RotateOutput and stays alive for
4500 // the full flush — including the finalize_locked call below.
4501 let RotateOutput {
4502 old_l0_arc,
4503 wal_lsn,
4504 current_version,
4505 flush_in_progress_guard: _flush_guard,
4506 } = self.flush_l0_rotate().await?;
4507
4508 // Phases D (L1 collect), E (orphan resolve), F (manifest seed),
4509 // G (Lance writes). Builds the manifest but does NOT publish it.
4510 let FlushOutcome {
4511 manifest,
4512 snapshot_id,
4513 } = self
4514 .flush_stream_l1(old_l0_arc.clone(), wal_lsn, current_version, name)
4515 .await?;
4516
4517 // Phases H..S: publish manifest, complete_flush, WAL truncate,
4518 // property cache clear, last_flush_time, metrics, l1_runs++,
4519 // compaction trigger, index-rebuild scheduling, fork tick.
4520 self.flush_finalize_locked(
4521 old_l0_arc,
4522 wal_lsn,
4523 manifest,
4524 snapshot_id,
4525 initial_size,
4526 initial_count,
4527 start,
4528 )
4529 .await
4530 }
4531
4532 /// Phases H..S of the flush: publish the manifest and run all
4533 /// post-publish bookkeeping. Assumes the caller already holds
4534 /// `flush_lock` — see [`Writer::flush_finalize_now`] for the
4535 /// lock-acquiring variant used by the async finalize path.
4536 #[allow(clippy::too_many_arguments)]
4537 async fn flush_finalize_locked(
4538 &self,
4539 old_l0_arc: Arc<RwLock<L0Buffer>>,
4540 wal_lsn: u64,
4541 manifest: SnapshotManifest,
4542 snapshot_id: String,
4543 initial_size: usize,
4544 initial_count: usize,
4545 start: std::time::Instant,
4546 ) -> Result<String> {
4547 Self::flush_finalize_body(
4548 &self.shared_ctx(),
4549 old_l0_arc,
4550 wal_lsn,
4551 manifest,
4552 snapshot_id,
4553 initial_size,
4554 initial_count,
4555 start,
4556 )
4557 .await
4558 }
4559
4560 /// Phases H..S of the flush, lock-acquiring variant. Used by the
4561 /// async-flush finalizer task (running on a spawned tokio task),
4562 /// which holds neither `&self` nor `flush_lock`. Briefly re-acquires
4563 /// `flush_lock` to serialize the publish boundary, then runs the
4564 /// same body as `flush_finalize_locked` but over a SharedFlushCtx.
4565 #[allow(clippy::too_many_arguments)]
4566 pub(crate) async fn flush_finalize_now(
4567 shared: SharedFlushCtx,
4568 old_l0_arc: Arc<RwLock<L0Buffer>>,
4569 wal_lsn: u64,
4570 manifest: SnapshotManifest,
4571 snapshot_id: String,
4572 initial_size: usize,
4573 initial_count: usize,
4574 start: std::time::Instant,
4575 ) -> Result<String> {
4576 let _flush_lock_guard = shared.flush_lock.clone().lock_owned().await;
4577 Self::flush_finalize_body(
4578 &shared,
4579 old_l0_arc,
4580 wal_lsn,
4581 manifest,
4582 snapshot_id,
4583 initial_size,
4584 initial_count,
4585 start,
4586 )
4587 .await
4588 }
4589
4590 /// Shared body of `flush_finalize_locked` and `flush_finalize_now`.
4591 /// Static over `SharedFlushCtx`; the caller is responsible for
4592 /// holding `flush_lock`.
4593 #[allow(clippy::too_many_arguments)]
4594 async fn flush_finalize_body(
4595 shared: &SharedFlushCtx,
4596 old_l0_arc: Arc<RwLock<L0Buffer>>,
4597 wal_lsn: u64,
4598 mut manifest: SnapshotManifest,
4599 snapshot_id: String,
4600 initial_size: usize,
4601 initial_count: usize,
4602 start: std::time::Instant,
4603 ) -> Result<String> {
4604 // Parent-snapshot fixup. The stream phase built `manifest` with
4605 // parent_snapshot set from cached_manifest at stream time. If
4606 // OTHER flushes (sync or async) have finalized since then,
4607 // cached_manifest has advanced. Re-link this manifest to the
4608 // current cached chain so we don't orphan their data when we
4609 // overwrite cached_manifest below.
4610 let current_parent_id = shared
4611 .cached_manifest
4612 .lock()
4613 .as_ref()
4614 .map(|m| m.snapshot_id.clone());
4615 if current_parent_id.is_some() && manifest.parent_snapshot != current_parent_id {
4616 manifest.parent_snapshot = current_parent_id;
4617 metrics::counter!("uni_flush_parent_chain_fixups_total").increment(1);
4618 }
4619
4620 // H. Publish manifest (body first, then pointer — recovery is
4621 // idempotent if we crash between the two).
4622 // A fork writer must publish to a fork-scoped namespace, never the
4623 // global `catalog/latest` that the primary reopen reads (review C1).
4624 debug_assert_eq!(
4625 shared.fork_id.is_some(),
4626 shared.storage.snapshot_manager().is_fork_scoped(),
4627 "fork writer must publish to a fork-scoped snapshot namespace (review C1)"
4628 );
4629 shared
4630 .storage
4631 .snapshot_manager()
4632 .save_snapshot(&manifest)
4633 .await?;
4634 shared
4635 .storage
4636 .snapshot_manager()
4637 .set_latest_snapshot(&manifest.snapshot_id)
4638 .await?;
4639
4640 // H2. Durability barrier (review C4). `save_snapshot` / `set_latest_snapshot`
4641 // wrote the manifest body and the `catalog/latest` pointer through the
4642 // object store, which does NOT fsync. WAL truncation (K, below) removes
4643 // the only other durable copy of this flush's data, so a crash after K
4644 // but before the OS flushed those writes would lose the snapshot —
4645 // recovery could not resolve `latest`. Make them durable now (local-fs
4646 // only; remote stores provide their own durability on `put`).
4647 crate::snapshot::manager::fsync_snapshot_pointer(
4648 shared.storage.local_fs_root().as_deref(),
4649 shared.fork_id.as_ref(),
4650 &manifest.snapshot_id,
4651 )
4652 .map_err(|e| {
4653 anyhow!(
4654 "fsync snapshot {} before WAL truncate: {}",
4655 manifest.snapshot_id,
4656 e
4657 )
4658 })?;
4659
4660 // I. Cache manifest for next flush to avoid re-reading from object store.
4661 *shared.cached_manifest.lock() = Some(manifest.clone());
4662
4663 // L. Invalidate the property cache BEFORE removing the flushed buffer
4664 // from the L0 chain (Bug #10). `clear_cache` has no dependency on the
4665 // complete_flush (J) / WAL-truncate (K) steps below, so clearing it
4666 // first closes the non-monotonic-read window: once the buffer leaves
4667 // the L0 chain at J a freshly-written value would otherwise miss the
4668 // chain and fall through to a stale cache entry. By the time finalize
4669 // runs the streamed rows are already durable in L1, so a post-clear
4670 // read falls through to fresh storage instead. The finalizer holds
4671 // `flush_lock` throughout, so reordering L ahead of J is safe.
4672 if let Some(ref pm) = shared.property_manager {
4673 pm.clear_cache().await;
4674 }
4675
4676 // J. Complete flush: remove old L0 from pending_flush. MUST happen
4677 // BEFORE WAL truncation so min_pending_wal_lsn is accurate.
4678 shared.l0_manager.complete_flush(&old_l0_arc);
4679
4680 // Test-only seam (no-op without the `failpoints` feature): pause AFTER
4681 // complete_flush removed the buffer from the L0 chain (J) but BEFORE
4682 // WAL truncation (K). The property cache is already cleared (L moved
4683 // ahead of J above), so a read in this window falls through to fresh
4684 // L1 storage rather than a stale cache entry (Bug #10 — non-monotonic
4685 // read after flush finalize).
4686 fail::fail_point!("flush::after-complete-before-cache-clear");
4687
4688 // K. Truncate WAL up to the safe LSN.
4689 let wal_handle = shared.l0_manager.get_current().read().wal.clone();
4690 if let Some(w) = wal_handle {
4691 let safe_lsn = shared
4692 .l0_manager
4693 .min_pending_wal_lsn()
4694 .map(|min_pending| min_pending.min(wal_lsn))
4695 .unwrap_or(wal_lsn);
4696 w.truncate_before(safe_lsn).await?;
4697 }
4698
4699 // M. Reset last flush time for time-based auto-flush.
4700 *shared.last_flush_time.lock() = std::time::Instant::now();
4701
4702 info!(
4703 snapshot_id,
4704 mutations_count = initial_count,
4705 size_bytes = initial_size,
4706 "L0 flush to L1 completed successfully"
4707 );
4708 metrics::histogram!("uni_flush_duration_seconds").record(start.elapsed().as_secs_f64());
4709 metrics::counter!("uni_flush_bytes_total").increment(initial_size as u64);
4710 metrics::counter!("uni_flush_rows_total").increment(initial_count as u64);
4711
4712 // P. Increment flush generation counter for write throttling.
4713 {
4714 let mut status = uni_common::sync::acquire_mutex(
4715 &shared.storage.compaction_status,
4716 "compaction_status",
4717 )?;
4718 status.l1_runs += 1;
4719 }
4720
4721 // Q. Trigger CSR compaction if enough frozen segments have accumulated.
4722 let am = shared.adjacency_manager.clone();
4723 if am.should_compact(shared.compaction_config.frozen_segments_compact_threshold) {
4724 let previous_still_running = {
4725 let guard = shared.compaction_handle.read();
4726 guard.as_ref().is_some_and(|h| !h.is_finished())
4727 };
4728 if previous_still_running {
4729 info!("Skipping compaction: previous compaction still in progress");
4730 } else {
4731 let handle = tokio::spawn(async move {
4732 am.compact();
4733 });
4734 *shared.compaction_handle.write() = Some(handle);
4735 }
4736 }
4737
4738 // R. Post-flush: check if any indexes need rebuilding based on thresholds.
4739 if shared.auto_rebuild_enabled
4740 && let Some(rebuild_mgr) = shared.index_rebuild_manager.get()
4741 {
4742 Self::schedule_index_rebuilds_if_needed_static(
4743 &manifest,
4744 rebuild_mgr.clone(),
4745 shared.schema_manager.clone(),
4746 shared.index_rebuild_config.clone(),
4747 );
4748 }
4749
4750 // S. Emit fork-fragment observability after a successful forked flush.
4751 Self::tick_fork_fragment_observability_static(
4752 shared.fork_id,
4753 shared.fork_flush_count.clone(),
4754 shared.fork_fragment_warn_fired.clone(),
4755 shared.fork_fragment_warn_threshold,
4756 );
4757
4758 Ok(snapshot_id)
4759 }
4760
4761 /// Increment fork-flush bookkeeping and fire the fragment warn
4762 /// once if the threshold is crossed.
4763 ///
4764 /// Each flush typically appends ~1 fragment per touched dataset on
4765 /// the fork's branches; without compaction (deferred to Phase 5)
4766 /// long-lived heavy-write forks degrade. The flush count is a
4767 /// proxy for actual fragment growth — reading
4768 /// `Dataset::manifest().fragments.len()` per dataset would add a
4769 /// per-flush object-store roundtrip on the hot commit path, which
4770 /// is too costly for a purely observational guard rail.
4771 ///
4772 /// No-op for primary writers (`fork_id == None`).
4773 #[allow(dead_code)] // called by tests; production path uses _static
4774 pub(crate) fn tick_fork_fragment_observability(&self) {
4775 Self::tick_fork_fragment_observability_static(
4776 self.fork_id,
4777 self.fork_flush_count.clone(),
4778 self.fork_fragment_warn_fired.clone(),
4779 self.config.fork_fragment_warn_threshold,
4780 );
4781 }
4782
4783 /// Static variant of [`Writer::tick_fork_fragment_observability`].
4784 /// Used by the async-flush finalize path, where we hold a
4785 /// [`SharedFlushCtx`] bundle of Arcs rather than `&Writer`.
4786 pub(crate) fn tick_fork_fragment_observability_static(
4787 fork_id: Option<ForkId>,
4788 fork_flush_count: Arc<AtomicU64>,
4789 fork_fragment_warn_fired: Arc<AtomicBool>,
4790 warn_threshold: usize,
4791 ) {
4792 let Some(fork_id) = fork_id else { return };
4793 // `Relaxed` is sufficient: observational counter, no synchronizes-with.
4794 let new_count = fork_flush_count.fetch_add(1, Ordering::Relaxed) + 1;
4795 let fork_label = fork_id.to_string();
4796 metrics::gauge!(
4797 "uni_fork_l1_flushes",
4798 "fork" => fork_label.clone(),
4799 )
4800 .set(new_count as f64);
4801 let threshold = warn_threshold as u64;
4802 if !fork_fragment_warn_fired.load(Ordering::Relaxed)
4803 && threshold > 0
4804 && new_count >= threshold
4805 {
4806 fork_fragment_warn_fired.store(true, Ordering::Relaxed);
4807 tracing::warn!(
4808 fork = %fork_label,
4809 flush_count = new_count,
4810 threshold,
4811 "fork has exceeded the L1 flush-count threshold; \
4812 fork compaction is deferred to Phase 5 — consider \
4813 drop+recreate or promotion to bound fragment growth"
4814 );
4815 }
4816 }
4817
4818 /// Check rebuild thresholds and schedule background index rebuilds for
4819 /// labels that exceed growth or age limits. Marks affected indexes as
4820 /// `Stale` and spawns an async task to schedule the rebuild.
4821 #[allow(dead_code)] // production path uses _static; kept as the
4822 // documented instance entry point.
4823 fn schedule_index_rebuilds_if_needed(
4824 &self,
4825 manifest: &SnapshotManifest,
4826 rebuild_mgr: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
4827 ) {
4828 Self::schedule_index_rebuilds_if_needed_static(
4829 manifest,
4830 rebuild_mgr,
4831 self.schema_manager.clone(),
4832 self.config.index_rebuild.clone(),
4833 );
4834 }
4835
4836 /// Static variant of [`Writer::schedule_index_rebuilds_if_needed`].
4837 /// Used by the async-flush finalize path, where we hold the
4838 /// [`SchemaManager`] via `SharedFlushCtx` rather than `&Writer`.
4839 pub(crate) fn schedule_index_rebuilds_if_needed_static(
4840 manifest: &SnapshotManifest,
4841 rebuild_mgr: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
4842 schema_manager: Arc<uni_common::core::schema::SchemaManager>,
4843 index_rebuild_config: uni_common::config::IndexRebuildConfig,
4844 ) {
4845 let checker =
4846 crate::storage::index_rebuild::RebuildTriggerChecker::new(index_rebuild_config);
4847 let schema = schema_manager.schema();
4848 let labels = checker.labels_needing_rebuild(manifest, &schema.indexes);
4849
4850 if labels.is_empty() {
4851 return;
4852 }
4853
4854 // Mark affected indexes as Stale
4855 for label in &labels {
4856 for idx in &schema.indexes {
4857 if idx.label() == label {
4858 let _ = schema_manager.update_index_metadata(idx.name(), |m| {
4859 m.status = uni_common::core::schema::IndexStatus::Stale;
4860 });
4861 }
4862 }
4863 }
4864
4865 tokio::spawn(async move {
4866 if let Err(e) = rebuild_mgr.schedule(labels).await {
4867 tracing::warn!("Failed to schedule index rebuild: {e}");
4868 }
4869 });
4870 }
4871}
4872
4873/// `FinalizeFn` implementation that the `FlushCoordinator` invokes from
4874/// its single-task finalizer loop. Unit struct on purpose: it must NOT
4875/// hold `Arc<Writer>` (that would create a reference cycle Writer ->
4876/// FlushCoordinator -> Arc<dyn FinalizeFn> -> Writer). All state needed
4877/// for finalize travels in via `SharedFlushCtx`.
4878pub(crate) struct WriterFinalizer;
4879
4880impl FinalizeFn for WriterFinalizer {
4881 fn finalize<'a>(
4882 &'a self,
4883 rotated: RotatedFlush,
4884 outcome: AsyncFlushOutcome,
4885 shared: SharedFlushCtx,
4886 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<String>> + Send + 'a>> {
4887 Box::pin(async move {
4888 // Read initial_size / initial_count from the rotated L0 so
4889 // we don't have to plumb them through the coordinator
4890 // submission. The buffer is still alive in pending_flush
4891 // until `complete_flush` (J) below pops it.
4892 let (initial_size, initial_count) = {
4893 let l0 = rotated.old_l0_arc.read();
4894 (l0.estimated_size, l0.mutation_count)
4895 };
4896 let result = Writer::flush_finalize_now(
4897 shared,
4898 rotated.old_l0_arc.clone(),
4899 rotated.wal_lsn,
4900 outcome.new_manifest,
4901 outcome.snapshot_id,
4902 initial_size,
4903 initial_count,
4904 std::time::Instant::now(),
4905 )
4906 .await;
4907 // `rotated` (permit + flush_in_progress_guard) drops here.
4908 drop(rotated.permit);
4909 result
4910 })
4911 }
4912
4913 fn finalize_failure<'a>(
4914 &'a self,
4915 rotated: RotatedFlush,
4916 err: anyhow::Error,
4917 _shared: SharedFlushCtx,
4918 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Error> + Send + 'a>> {
4919 Box::pin(async move {
4920 tracing::warn!(
4921 error = %err,
4922 seq = rotated.seq,
4923 "async flush stream failed; old L0 remains in pending_flush, \
4924 WAL retains its data, recovery via WAL replay on restart"
4925 );
4926 metrics::counter!("uni_flush_failures_total").increment(1);
4927 // Permit + guard drop here so back-pressure releases even on
4928 // failure.
4929 drop(rotated.permit);
4930 err
4931 })
4932 }
4933}
4934
4935#[cfg(test)]
4936mod tests {
4937 use super::*;
4938 use tempfile::tempdir;
4939
4940 /// Test that commit_transaction writes mutations to WAL before merging to main L0.
4941 /// This verifies fix for issue #137 (transaction commit atomicity).
4942 #[tokio::test]
4943 async fn test_commit_transaction_wal_before_merge() -> Result<()> {
4944 use crate::runtime::wal::WriteAheadLog;
4945 use crate::storage::manager::StorageManager;
4946 use object_store::local::LocalFileSystem;
4947 use object_store::path::Path as ObjectStorePath;
4948 use uni_common::core::schema::SchemaManager;
4949
4950 let dir = tempdir()?;
4951 let path = dir.path().to_str().unwrap();
4952 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
4953 let schema_path = ObjectStorePath::from("schema.json");
4954
4955 let schema_manager =
4956 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
4957 let _label_id = schema_manager.add_label("Test")?;
4958 schema_manager.save().await?;
4959
4960 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
4961
4962 // Create WAL for main L0
4963 let wal_path = ObjectStorePath::from("wal");
4964 let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
4965
4966 let writer = Writer::new_with_config(
4967 storage.clone(),
4968 schema_manager.clone(),
4969 1,
4970 UniConfig::default(),
4971 Some(wal),
4972 None,
4973 )
4974 .await?;
4975
4976 // Begin transaction — create a transaction L0
4977 let tx_l0 = writer.create_transaction_l0();
4978
4979 // Insert data in transaction
4980 let vid_a = writer.next_vid().await?;
4981 let vid_b = writer.next_vid().await?;
4982
4983 let mut props = std::collections::HashMap::new();
4984 props.insert("test".to_string(), Value::String("data".to_string()));
4985
4986 writer
4987 .insert_vertex_with_labels(vid_a, props.clone(), &["Test".to_string()], Some(&tx_l0))
4988 .await?;
4989 writer
4990 .insert_vertex_with_labels(
4991 vid_b,
4992 std::collections::HashMap::new(),
4993 &["Test".to_string()],
4994 Some(&tx_l0),
4995 )
4996 .await?;
4997
4998 let eid = writer.next_eid(1).await?;
4999 writer
5000 .insert_edge(
5001 vid_a,
5002 vid_b,
5003 1,
5004 eid,
5005 std::collections::HashMap::new(),
5006 None,
5007 Some(&tx_l0),
5008 )
5009 .await?;
5010
5011 // Get WAL before commit
5012 let l0 = writer.l0_manager.get_current();
5013 let wal = l0.read().wal.clone().expect("Main L0 should have WAL");
5014 let mutations_before = wal.replay().await?;
5015 let count_before = mutations_before.len();
5016
5017 // Commit transaction - this should write to WAL first
5018 let writer = Arc::new(writer);
5019 writer.commit_transaction_l0(tx_l0).await?;
5020
5021 // Verify WAL has the new mutations
5022 let mutations_after = wal.replay().await?;
5023 assert!(
5024 mutations_after.len() > count_before,
5025 "WAL should contain transaction mutations after commit"
5026 );
5027
5028 // Verify mutations are in correct order: vertices first, then edges
5029 let new_mutations: Vec<_> = mutations_after.into_iter().skip(count_before).collect();
5030
5031 let mut saw_vertex_a = false;
5032 let mut saw_vertex_b = false;
5033 let mut saw_edge = false;
5034
5035 for mutation in &new_mutations {
5036 match mutation {
5037 crate::runtime::wal::Mutation::InsertVertex { vid, .. } => {
5038 if *vid == vid_a {
5039 saw_vertex_a = true;
5040 }
5041 if *vid == vid_b {
5042 saw_vertex_b = true;
5043 }
5044 // Vertices should come before edges
5045 assert!(!saw_edge, "Vertices should be logged to WAL before edges");
5046 }
5047 crate::runtime::wal::Mutation::InsertEdge { eid: e, .. } => {
5048 if *e == eid {
5049 saw_edge = true;
5050 }
5051 // Edges should come after vertices
5052 assert!(
5053 saw_vertex_a && saw_vertex_b,
5054 "Edge should be logged after both vertices"
5055 );
5056 }
5057 _ => {}
5058 }
5059 }
5060
5061 assert!(saw_vertex_a, "Vertex A should be in WAL");
5062 assert!(saw_vertex_b, "Vertex B should be in WAL");
5063 assert!(saw_edge, "Edge should be in WAL");
5064
5065 // Verify data is also in main L0
5066 let l0_read = l0.read();
5067 assert!(
5068 l0_read.vertex_properties.contains_key(&vid_a),
5069 "Vertex A should be in main L0"
5070 );
5071 assert!(
5072 l0_read.vertex_properties.contains_key(&vid_b),
5073 "Vertex B should be in main L0"
5074 );
5075 assert!(
5076 l0_read.edge_endpoints.contains_key(&eid),
5077 "Edge should be in main L0"
5078 );
5079
5080 Ok(())
5081 }
5082
5083 /// Test that failed WAL flush leaves transaction intact for retry or rollback.
5084 #[tokio::test]
5085 async fn test_commit_transaction_wal_failure_rollback() -> Result<()> {
5086 use crate::runtime::wal::WriteAheadLog;
5087 use crate::storage::manager::StorageManager;
5088 use object_store::local::LocalFileSystem;
5089 use object_store::path::Path as ObjectStorePath;
5090 use uni_common::core::schema::SchemaManager;
5091
5092 let dir = tempdir()?;
5093 let path = dir.path().to_str().unwrap();
5094 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5095 let schema_path = ObjectStorePath::from("schema.json");
5096
5097 let schema_manager =
5098 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5099 let _label_id = schema_manager.add_label("Test")?;
5100 let _baseline_label_id = schema_manager.add_label("Baseline")?;
5101 let _txdata_label_id = schema_manager.add_label("TxData")?;
5102 schema_manager.save().await?;
5103
5104 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5105
5106 // Create WAL for main L0
5107 let wal_path = ObjectStorePath::from("wal");
5108 let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
5109
5110 let writer = Writer::new_with_config(
5111 storage.clone(),
5112 schema_manager.clone(),
5113 1,
5114 UniConfig::default(),
5115 Some(wal),
5116 None,
5117 )
5118 .await?;
5119
5120 // Insert baseline data (outside transaction)
5121 let baseline_vid = writer.next_vid().await?;
5122 writer
5123 .insert_vertex_with_labels(
5124 baseline_vid,
5125 [("baseline".to_string(), Value::Bool(true))]
5126 .into_iter()
5127 .collect(),
5128 &["Baseline".to_string()],
5129 None,
5130 )
5131 .await?;
5132
5133 // Begin transaction — create a transaction L0
5134 let tx_l0 = writer.create_transaction_l0();
5135
5136 // Insert data in transaction
5137 let tx_vid = writer.next_vid().await?;
5138 writer
5139 .insert_vertex_with_labels(
5140 tx_vid,
5141 [("tx_data".to_string(), Value::Bool(true))]
5142 .into_iter()
5143 .collect(),
5144 &["TxData".to_string()],
5145 Some(&tx_l0),
5146 )
5147 .await?;
5148
5149 // Capture main L0 state before rollback
5150 let l0 = writer.l0_manager.get_current();
5151 let vertex_count_before = l0.read().vertex_properties.len();
5152
5153 // Rollback transaction (simulating what would happen after WAL flush failure)
5154 drop(tx_l0);
5155
5156 // Verify main L0 is unchanged
5157 let vertex_count_after = l0.read().vertex_properties.len();
5158 assert_eq!(
5159 vertex_count_before, vertex_count_after,
5160 "Main L0 should not change after rollback"
5161 );
5162
5163 // Baseline should still be present
5164 assert!(
5165 l0.read().vertex_properties.contains_key(&baseline_vid),
5166 "Baseline data should remain"
5167 );
5168
5169 // Transaction data should NOT be in main L0
5170 assert!(
5171 !l0.read().vertex_properties.contains_key(&tx_vid),
5172 "Transaction data should not be in main L0 after rollback"
5173 );
5174
5175 Ok(())
5176 }
5177
5178 /// Test that batch insert with shared labels does not clone labels per vertex.
5179 /// This verifies fix for issue #161 (redundant label cloning).
5180 #[tokio::test]
5181 async fn test_batch_insert_shared_labels() -> Result<()> {
5182 use crate::storage::manager::StorageManager;
5183 use object_store::local::LocalFileSystem;
5184 use object_store::path::Path as ObjectStorePath;
5185 use uni_common::core::schema::SchemaManager;
5186
5187 let dir = tempdir()?;
5188 let path = dir.path().to_str().unwrap();
5189 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5190 let schema_path = ObjectStorePath::from("schema.json");
5191
5192 let schema_manager =
5193 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5194 let _label_id = schema_manager.add_label("Person")?;
5195 schema_manager.save().await?;
5196
5197 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5198
5199 let writer = Writer::new(storage.clone(), schema_manager.clone(), 1).await?;
5200
5201 // Shared labels - should not be cloned per vertex
5202 let labels = &["Person".to_string()];
5203
5204 // Insert batch of vertices with same labels
5205 let mut vids = Vec::new();
5206 for i in 0..100 {
5207 let vid = writer.next_vid().await?;
5208 let mut props = std::collections::HashMap::new();
5209 props.insert("id".to_string(), Value::Int(i));
5210 writer
5211 .insert_vertex_with_labels(vid, props, labels, None)
5212 .await?;
5213 vids.push(vid);
5214 }
5215
5216 // Verify all vertices have the correct labels
5217 let l0 = writer.l0_manager.get_current();
5218 for vid in vids {
5219 let l0_guard = l0.read();
5220 let vertex_labels = l0_guard.vertex_labels.get(&vid);
5221 assert!(vertex_labels.is_some(), "Vertex should have labels");
5222 assert_eq!(
5223 vertex_labels.unwrap(),
5224 &vec!["Person".to_string()],
5225 "Labels should match"
5226 );
5227 }
5228
5229 Ok(())
5230 }
5231
5232 /// Test that estimated_size tracks mutations correctly and approximates size_bytes().
5233 /// This verifies fix for issue #147 (O(V+E) size_bytes() in metrics).
5234 #[tokio::test]
5235 async fn test_estimated_size_tracks_mutations() -> Result<()> {
5236 use crate::storage::manager::StorageManager;
5237 use object_store::local::LocalFileSystem;
5238 use object_store::path::Path as ObjectStorePath;
5239 use uni_common::core::schema::SchemaManager;
5240
5241 let dir = tempdir()?;
5242 let path = dir.path().to_str().unwrap();
5243 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5244 let schema_path = ObjectStorePath::from("schema.json");
5245
5246 let schema_manager =
5247 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5248 let _label_id = schema_manager.add_label("Test")?;
5249 schema_manager.save().await?;
5250
5251 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5252
5253 let writer = Writer::new(storage.clone(), schema_manager.clone(), 1).await?;
5254
5255 let l0 = writer.l0_manager.get_current();
5256
5257 // Initial state should be empty
5258 let initial_estimated = l0.read().estimated_size;
5259 let initial_actual = l0.read().size_bytes();
5260 assert_eq!(initial_estimated, 0, "Initial estimated_size should be 0");
5261 assert_eq!(initial_actual, 0, "Initial size_bytes should be 0");
5262
5263 // Insert vertices with properties
5264 let mut vids = Vec::new();
5265 for i in 0..10 {
5266 let vid = writer.next_vid().await?;
5267 let mut props = std::collections::HashMap::new();
5268 props.insert("name".to_string(), Value::String(format!("vertex_{}", i)));
5269 props.insert("index".to_string(), Value::Int(i));
5270 writer
5271 .insert_vertex_with_labels(vid, props, &[], None)
5272 .await?;
5273 vids.push(vid);
5274 }
5275
5276 // Verify estimated_size grew
5277 let after_vertices_estimated = l0.read().estimated_size;
5278 let after_vertices_actual = l0.read().size_bytes();
5279 assert!(
5280 after_vertices_estimated > 0,
5281 "estimated_size should grow after insertions"
5282 );
5283
5284 // Verify estimated_size is within reasonable bounds of actual size (within 2x)
5285 let ratio = after_vertices_estimated as f64 / after_vertices_actual as f64;
5286 assert!(
5287 (0.5..=2.0).contains(&ratio),
5288 "estimated_size ({}) should be within 2x of size_bytes ({}), ratio: {}",
5289 after_vertices_estimated,
5290 after_vertices_actual,
5291 ratio
5292 );
5293
5294 // Insert edges with a simple edge type
5295 let edge_type = 1u32;
5296 for i in 0..9 {
5297 let eid = writer.next_eid(edge_type).await?;
5298 writer
5299 .insert_edge(
5300 vids[i],
5301 vids[i + 1],
5302 edge_type,
5303 eid,
5304 std::collections::HashMap::new(),
5305 Some("NEXT".to_string()),
5306 None,
5307 )
5308 .await?;
5309 }
5310
5311 // Verify estimated_size grew further
5312 let after_edges_estimated = l0.read().estimated_size;
5313 let after_edges_actual = l0.read().size_bytes();
5314 assert!(
5315 after_edges_estimated > after_vertices_estimated,
5316 "estimated_size should grow after edge insertions"
5317 );
5318
5319 // Verify still within reasonable bounds
5320 let ratio = after_edges_estimated as f64 / after_edges_actual as f64;
5321 assert!(
5322 (0.5..=2.0).contains(&ratio),
5323 "estimated_size ({}) should be within 2x of size_bytes ({}), ratio: {}",
5324 after_edges_estimated,
5325 after_edges_actual,
5326 ratio
5327 );
5328
5329 Ok(())
5330 }
5331
5332 /// Test that flushing WAL on a writer with no mutations succeeds cleanly.
5333 #[tokio::test]
5334 async fn test_flush_wal_empty_l0_is_noop() -> Result<()> {
5335 use crate::runtime::wal::WriteAheadLog;
5336 use crate::storage::manager::StorageManager;
5337 use object_store::local::LocalFileSystem;
5338 use object_store::path::Path as ObjectStorePath;
5339 use uni_common::core::schema::SchemaManager;
5340
5341 let dir = tempdir()?;
5342 let path = dir.path().to_str().unwrap();
5343 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5344 let schema_path = ObjectStorePath::from("schema.json");
5345
5346 let schema_manager =
5347 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5348 schema_manager.save().await?;
5349
5350 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5351
5352 let wal_path = ObjectStorePath::from("wal");
5353 let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
5354
5355 let writer = Writer::new_with_config(
5356 storage.clone(),
5357 schema_manager.clone(),
5358 1,
5359 UniConfig::default(),
5360 Some(wal.clone()),
5361 None,
5362 )
5363 .await?;
5364
5365 // Flush with no mutations — should succeed cleanly
5366 let lsn = writer.flush_wal().await?;
5367 // LSN should be 0 or 1 (no real mutations flushed)
5368 assert!(lsn <= 1, "Empty flush should produce low LSN, got {}", lsn);
5369
5370 Ok(())
5371 }
5372
5373 /// Test that transaction data does not leak into main L0 without commit.
5374 #[tokio::test]
5375 async fn test_transaction_isolation_without_commit() -> Result<()> {
5376 use crate::runtime::wal::WriteAheadLog;
5377 use crate::storage::manager::StorageManager;
5378 use object_store::local::LocalFileSystem;
5379 use object_store::path::Path as ObjectStorePath;
5380 use uni_common::core::schema::SchemaManager;
5381
5382 let dir = tempdir()?;
5383 let path = dir.path().to_str().unwrap();
5384 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5385 let schema_path = ObjectStorePath::from("schema.json");
5386
5387 let schema_manager =
5388 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5389 let _label_id = schema_manager.add_label("Person")?;
5390 schema_manager.save().await?;
5391
5392 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5393
5394 let wal_path = ObjectStorePath::from("wal");
5395 let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
5396
5397 let writer = Writer::new_with_config(
5398 storage.clone(),
5399 schema_manager.clone(),
5400 1,
5401 UniConfig::default(),
5402 Some(wal),
5403 None,
5404 )
5405 .await?;
5406
5407 // Create transaction L0
5408 let tx_l0 = writer.create_transaction_l0();
5409
5410 // Insert vertex into transaction L0
5411 let vid = writer.next_vid().await?;
5412 writer
5413 .insert_vertex_with_labels(
5414 vid,
5415 [("name".to_string(), Value::String("Ghost".to_string()))]
5416 .into_iter()
5417 .collect(),
5418 &["Person".to_string()],
5419 Some(&tx_l0),
5420 )
5421 .await?;
5422
5423 // Verify data is in transaction L0
5424 assert!(
5425 tx_l0.read().vertex_properties.contains_key(&vid),
5426 "Transaction L0 should contain the vertex"
5427 );
5428
5429 // Verify data is NOT in main L0
5430 let main_l0 = writer.l0_manager.get_current();
5431 assert!(
5432 !main_l0.read().vertex_properties.contains_key(&vid),
5433 "Main L0 should NOT contain uncommitted transaction data"
5434 );
5435
5436 // Drop transaction without committing — data should be lost
5437 drop(tx_l0);
5438
5439 // Main L0 still should not have it
5440 assert!(
5441 !main_l0.read().vertex_properties.contains_key(&vid),
5442 "Main L0 should remain clean after dropped transaction"
5443 );
5444
5445 Ok(())
5446 }
5447
5448 /// Phase 2 Day 12: the fork-fragment warn fires exactly once when
5449 /// the flush count crosses the configured threshold and stays
5450 /// silent on subsequent flushes for the lifetime of the writer.
5451 /// Primary writers (`fork_id == None`) never fire it.
5452 ///
5453 /// Tested directly against `tick_fork_fragment_observability` so
5454 /// the contract is locked in independently of the broader
5455 /// `flush_to_l1` path (the end-to-end fork-flush path is blocked
5456 /// on Day 10's on-the-fly schema overlay growth).
5457 #[tokio::test]
5458 async fn fork_fragment_warn_fires_once_then_silences() -> Result<()> {
5459 use crate::storage::manager::StorageManager;
5460 use object_store::local::LocalFileSystem;
5461 use object_store::path::Path as ObjectStorePath;
5462 use uni_common::core::fork::ForkId;
5463 use uni_common::core::schema::SchemaManager;
5464
5465 let dir = tempdir()?;
5466 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5467 let schema_path = ObjectStorePath::from("schema.json");
5468 let schema_manager =
5469 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5470 let storage = Arc::new(
5471 StorageManager::new(dir.path().to_str().unwrap(), schema_manager.clone()).await?,
5472 );
5473
5474 let config = UniConfig {
5475 fork_fragment_warn_threshold: 3,
5476 ..Default::default()
5477 };
5478 let mut writer =
5479 Writer::new_with_config(storage, schema_manager, 1, config, None, None).await?;
5480
5481 // Primary path: never fires.
5482 for _ in 0..10 {
5483 writer.tick_fork_fragment_observability();
5484 }
5485 assert!(!writer.fork_fragment_warn_fired.load(Ordering::Relaxed));
5486 assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 0);
5487
5488 // Fork path: tag and tick. Below threshold → no fire.
5489 writer.fork_id = Some(ForkId::new());
5490 writer.tick_fork_fragment_observability();
5491 writer.tick_fork_fragment_observability();
5492 assert!(!writer.fork_fragment_warn_fired.load(Ordering::Relaxed));
5493 assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 2);
5494
5495 // Crossing threshold → fires once.
5496 writer.tick_fork_fragment_observability();
5497 assert!(writer.fork_fragment_warn_fired.load(Ordering::Relaxed));
5498 assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 3);
5499
5500 // Subsequent ticks bump the gauge but do not re-fire.
5501 let fired_after = writer.fork_fragment_warn_fired.load(Ordering::Relaxed);
5502 for _ in 0..5 {
5503 writer.tick_fork_fragment_observability();
5504 }
5505 assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 8);
5506 assert_eq!(
5507 writer.fork_fragment_warn_fired.load(Ordering::Relaxed),
5508 fired_after
5509 );
5510
5511 Ok(())
5512 }
5513
5514 /// The hot-path mutators must not write to any `Writer` struct field.
5515 /// Phase 2 of the refactor
5516 /// gave them `&self` receivers, which the compiler enforces against
5517 /// direct `self.x = y` assignment — but interior-mutable writes
5518 /// (Mutex/Atomic/OnceLock) still compile. This regression test snapshots
5519 /// every potentially-writable field, calls each hot-path mutator, and
5520 /// asserts no field changed.
5521 ///
5522 /// Cold-path methods (`flush_to_l1`, `commit_transaction_l0`,
5523 /// `tick_fork_fragment_observability`) DO mutate fields by design and
5524 /// are intentionally out of scope here.
5525 #[tokio::test]
5526 async fn hot_path_mutators_do_not_change_writer_fields() -> Result<()> {
5527 use crate::storage::manager::StorageManager;
5528 use object_store::local::LocalFileSystem;
5529 use object_store::path::Path as ObjectStorePath;
5530 use uni_common::core::schema::SchemaManager;
5531
5532 let dir = tempdir()?;
5533 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5534 let schema_path = ObjectStorePath::from("schema.json");
5535 let schema_manager =
5536 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5537 schema_manager.add_label("Person")?;
5538 schema_manager.save().await?;
5539 let storage = Arc::new(
5540 StorageManager::new(dir.path().to_str().unwrap(), schema_manager.clone()).await?,
5541 );
5542
5543 let writer =
5544 Writer::new_with_config(storage, schema_manager, 1, UniConfig::default(), None, None)
5545 .await?;
5546
5547 /// Captures every `Writer` field that *could* be written by a
5548 /// hot-path mutator (i.e., every non-Arc, non-immutable-after-
5549 /// construction field). Arc'd substructures (`l0_manager`,
5550 /// `storage`, etc.) are intentionally not checked — they are
5551 /// re-pointed only at construction.
5552 #[derive(Debug, PartialEq)]
5553 struct Snapshot {
5554 last_flush_time: std::time::Instant,
5555 cached_manifest_some: bool,
5556 fork_flush_count: u64,
5557 fork_fragment_warn_fired: bool,
5558 xervo_runtime_some: bool,
5559 index_rebuild_manager_some: bool,
5560 fork_id: Option<ForkId>,
5561 }
5562
5563 fn snap(w: &Writer) -> Snapshot {
5564 Snapshot {
5565 last_flush_time: *w.last_flush_time.lock(),
5566 cached_manifest_some: w.cached_manifest.lock().is_some(),
5567 fork_flush_count: w.fork_flush_count.load(Ordering::Relaxed),
5568 fork_fragment_warn_fired: w.fork_fragment_warn_fired.load(Ordering::Relaxed),
5569 xervo_runtime_some: w.xervo_runtime.get().is_some(),
5570 index_rebuild_manager_some: w.index_rebuild_manager.get().is_some(),
5571 fork_id: w.fork_id,
5572 }
5573 }
5574
5575 // 1. insert_vertex_with_labels
5576 let before = snap(&writer);
5577 let vid = writer.next_vid().await?;
5578 writer
5579 .insert_vertex_with_labels(vid, Properties::new(), &["Person".to_string()], None)
5580 .await?;
5581 assert_eq!(
5582 snap(&writer),
5583 before,
5584 "insert_vertex_with_labels mutated a Writer field"
5585 );
5586
5587 // 2. insert_vertices_batch
5588 let before = snap(&writer);
5589 let vids = writer.allocate_vids(2).await?;
5590 writer
5591 .insert_vertices_batch(
5592 vids,
5593 vec![Properties::new(), Properties::new()],
5594 vec!["Person".into()],
5595 None,
5596 )
5597 .await?;
5598 assert_eq!(
5599 snap(&writer),
5600 before,
5601 "insert_vertices_batch mutated a Writer field"
5602 );
5603
5604 // 3. delete_vertex
5605 let before = snap(&writer);
5606 writer.delete_vertex(vid, None, None).await?;
5607 assert_eq!(
5608 snap(&writer),
5609 before,
5610 "delete_vertex mutated a Writer field"
5611 );
5612
5613 // (insert_edge / delete_edge are skipped here: their fixture cost is
5614 // disproportionate to the audit's marginal value, and the same
5615 // structural argument plus the compiler-enforced `&self` covers them.)
5616
5617 Ok(())
5618 }
5619}