uni_store/runtime/writer.rs
1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::runtime::context::QueryContext;
5use crate::runtime::flush_coordinator::{
6 FinalizeFn, FlushCoordinator, FlushOutcome as AsyncFlushOutcome, RotatedFlush, SharedFlushCtx,
7};
8use crate::runtime::id_allocator::IdAllocator;
9use crate::runtime::l0::{L0Buffer, serialize_constraint_key};
10use crate::runtime::l0_manager::L0Manager;
11use crate::runtime::property_manager::PropertyManager;
12use crate::runtime::wal::WriteAheadLog;
13use crate::storage::adjacency_manager::AdjacencyManager;
14use crate::storage::delta::{L1Entry, Op};
15use crate::storage::main_edge::MainEdgeDataset;
16use crate::storage::main_vertex::MainVertexDataset;
17use crate::storage::manager::StorageManager;
18use anyhow::{Result, anyhow};
19use chrono::Utc;
20use metrics;
21use parking_lot::{Mutex as PlMutex, RwLock};
22use std::collections::{HashMap, HashSet};
23use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
24use std::sync::{Arc, OnceLock};
25use tracing::{debug, info, instrument};
26use uni_common::Properties;
27use uni_common::Value;
28use uni_common::config::UniConfig;
29use uni_common::core::fork::ForkId;
30use uni_common::core::id::{Eid, Vid};
31use uni_common::core::schema::{ConstraintTarget, ConstraintType, IndexDefinition};
32use uni_common::core::snapshot::{EdgeSnapshot, LabelSnapshot, SnapshotManifest};
33use uni_xervo::runtime::ModelRuntime;
34use uuid::Uuid;
35
36#[derive(Clone, Debug)]
37pub struct WriterConfig {
38 pub max_mutations: usize,
39 /// Enable the partial-column MergeInsert path for SET-only flushes.
40 ///
41 /// When `true`, `Writer::insert_vertex_partial` records the touched
42 /// property keys into `L0Buffer::vertex_partial_keys` and the flush
43 /// routes those VIDs through Lance `MergeInsertBuilder` with a
44 /// subset-of-schema source, skipping the read of (and write of)
45 /// the unchanged columns — including wide ones like embeddings.
46 ///
47 /// When `false`, `insert_vertex_partial` falls back to the
48 /// read-modify-write `insert_vertex_with_labels` path (preserving
49 /// bit-for-bit equivalence with prior releases). Default `false`
50 /// for the first release; flip to `true` after telemetry on the
51 /// issue #72 ingest workload confirms the win.
52 ///
53 /// See the soundness probe at
54 /// `crates/uni-store/tests/common/storage/lance_merge_insert_probe.rs`.
55 pub partial_lance_writes: bool,
56}
57
58impl Default for WriterConfig {
59 fn default() -> Self {
60 Self {
61 max_mutations: 10_000,
62 partial_lance_writes: false,
63 }
64 }
65}
66
67/// RAII latch on [`StorageManager::flush_in_progress`].
68///
69/// Sets the flag to `true` on construction (via CAS) and back to `false` on
70/// drop, so any `?` early-exit inside `flush_to_l1` cannot leave the flag
71/// stuck. Returns `None` if a flush is already in progress, providing
72/// forward-compatible exclusion once the outer writer-RwLock is removed in
73/// Phase 4 of the concurrent-writer refactor.
74// FlushInProgressGuard moved to storage/manager.rs so flush_coordinator.rs
75// can hold it on RotatedFlush without a writer.rs back-import cycle.
76pub use crate::storage::manager::FlushInProgressGuard;
77
78/// Output of [`Writer::flush_l0_rotate`]: the to-be-flushed L0 buffer,
79/// captured WAL LSN, current_version, and the in-progress guard whose
80/// lifetime spans the full flush (including the future async stream
81/// phase that runs on a spawned task).
82struct RotateOutput {
83 old_l0_arc: Arc<RwLock<L0Buffer>>,
84 wal_lsn: u64,
85 current_version: u64,
86 flush_in_progress_guard: FlushInProgressGuard,
87}
88
89/// Project a property map to a subset selected by `keys`. Used to
90/// run `touched_needs_full_read` against just the SET-touched keys
91/// when the caller passes a fully-merged `props` map.
92fn props_subset(props: &Properties, keys: &HashSet<String>) -> Properties {
93 let mut out = Properties::new();
94 for k in keys {
95 if let Some(v) = props.get(k) {
96 out.insert(k.clone(), v.clone());
97 }
98 }
99 out
100}
101
102/// Output of [`Writer::flush_stream_l1`]: the built (but not yet
103/// published) snapshot manifest and its id. Finalize is responsible
104/// for `save_snapshot` + `set_latest_snapshot` + `cached_manifest`
105/// update.
106struct FlushOutcome {
107 manifest: SnapshotManifest,
108 snapshot_id: String,
109}
110
111pub struct Writer {
112 pub l0_manager: Arc<L0Manager>,
113 pub storage: Arc<StorageManager>,
114 pub schema_manager: Arc<uni_common::core::schema::SchemaManager>,
115 pub allocator: Arc<IdAllocator>,
116 pub config: UniConfig,
117 /// Optional embedding runtime. `OnceLock` so the initializer can run
118 /// on `&self` after the `Writer` has been wrapped in `Arc<Writer>`
119 /// (Phase 4 of concurrent_writer.md). Read through
120 /// [`Writer::xervo_runtime`] — the field itself is private to keep
121 /// callers oblivious to the OnceLock representation.
122 xervo_runtime: OnceLock<Arc<ModelRuntime>>,
123 /// Property manager for cache invalidation after flush
124 pub property_manager: Option<Arc<PropertyManager>>,
125 /// Adjacency manager for dual-write (edges survive flush).
126 adjacency_manager: Arc<AdjacencyManager>,
127 /// Timestamp of last flush or creation. Interior-mutable so that
128 /// `&self` callers can update it; uncontended in practice because all
129 /// writes happen inside the single-flusher critical section.
130 /// Arc-wrapped so it can travel into the SharedFlushCtx that the
131 /// async-flush coordinator passes to spawned stream/finalize tasks.
132 last_flush_time: Arc<PlMutex<std::time::Instant>>,
133 /// Background compaction task handle (prevents concurrent compaction races)
134 compaction_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
135 /// Optional index rebuild manager for post-flush automatic rebuild scheduling.
136 /// `OnceLock` for the same reason as `xervo_runtime`.
137 /// Wrapped in `Arc` so the async-flush finalize path can read it
138 /// from a spawned task via `SharedFlushCtx`.
139 index_rebuild_manager: Arc<OnceLock<Arc<crate::storage::index_rebuild::IndexRebuildManager>>>,
140 /// Cached snapshot manifest from the last flush. Avoids re-reading from
141 /// object store on every flush_to_l1 call. Wrapped in a `Mutex` for
142 /// `&self` access; uncontended because all access is inside the
143 /// single-flusher critical section.
144 cached_manifest: Arc<PlMutex<Option<SnapshotManifest>>>,
145 /// Identifier of the fork this writer serves, if any. `None` for
146 /// primary's writer. Set by [`crate::fork::writer_factory::new_for_fork`]
147 /// and read in `flush_to_l1` to emit fork-tagged metrics and to fire
148 /// the fragment-count guard rail (Phase 2 Day 12).
149 pub fork_id: Option<ForkId>,
150 /// Number of `flush_to_l1` calls since this writer was constructed.
151 /// Used as a proxy for L1 fragment growth on the fork's branches:
152 /// each flush typically appends ~1 fragment per touched dataset, so
153 /// the count tracks the order of magnitude of fragment accumulation.
154 /// Reading the actual `Dataset::manifest().fragments.len()` per
155 /// flush would add a per-dataset object-store roundtrip on the hot
156 /// commit path; the proxy keeps the guard rail purely observational
157 /// (Phase 5 introduces fork compaction proper). Only meaningful when
158 /// `fork_id.is_some()`. `Relaxed` is sufficient — observational only.
159 fork_flush_count: Arc<AtomicU64>,
160 /// Whether the fork-fragment warning has already fired at the
161 /// configured threshold. One-shot per writer lifetime. `Relaxed` is
162 /// sufficient — observational only.
163 fork_fragment_warn_fired: Arc<AtomicBool>,
164 /// Dedicated lock for the genuinely-exclusive flush path. Acquired by
165 /// the [`Writer::flush_to_l1`] entry and by `commit_transaction_l0`
166 /// across its WAL-append + L0-merge window. Replaces the outer
167 /// `Arc<RwLock<Writer>>` for flush exclusion once Phase 4 drops it.
168 /// Arc-wrapped so async-flush coordinator's finalize path can
169 /// re-acquire it from a spawned task via SharedFlushCtx.
170 flush_lock: Arc<tokio::sync::Mutex<()>>,
171 /// Coordinator for async-flush pipeline. Owns the back-pressure
172 /// semaphore, rotate-order sequence, single-finalizer task, and
173 /// pending-flush counter. Always present even when async flush is
174 /// disabled — the sync `flush_to_l1` path uses it for the future
175 /// `FlushInProgressGuard`/permit ownership model.
176 /// Coordinator is `None` when `async_flush_enabled = false`. The
177 /// coordinator's finalizer task captures `SharedFlushCtx` which
178 /// includes `Arc<StorageManager>`; on a fork-scoped Writer that
179 /// also pins the fork's `ForkScope` via `storage.fork_scope`, so
180 /// the holder count never drops. Constructing it only when the
181 /// feature is actually on avoids that side-effect for all
182 /// existing sync-flush paths. When async-flush graduates from
183 /// opt-in to default (Commit 12), `drop_fork` (Commit 8) handles
184 /// the drain explicitly.
185 #[allow(dead_code)] // first production use lands in Commit 6/7
186 pub(crate) flush_coordinator: Option<Arc<crate::runtime::flush_coordinator::FlushCoordinator>>,
187 /// Optimistic-concurrency commit-sequence counter (SSI). Incremented once
188 /// per successful commit under `flush_lock`; a transaction captures the
189 /// current value at begin as its read sequence (`L0Buffer::occ_read_seq`).
190 ///
191 /// Always allocated; consulted only when `config.ssi_enabled` is `true`.
192 commit_sequence: Arc<AtomicU64>,
193 /// Bounded log of recently-committed write-sets for OCC conflict detection.
194 /// Read and updated only under `flush_lock`.
195 ///
196 /// Always allocated; consulted only when `config.ssi_enabled` is `true`.
197 committed_writes: Arc<PlMutex<crate::runtime::occ::CommitRegistry>>,
198 /// Per-row pessimistic locks for `FOR UPDATE` (SSI escape hatch), keyed by
199 /// canonical (label, key-props) bytes. A transaction holds the lock from
200 /// MATCH until commit/rollback, serializing concurrent `FOR UPDATE` writers
201 /// on the same key (avoiding optimistic abort-retry on hot keys).
202 ///
203 /// Always allocated; populated only when `config.ssi_enabled` is `true`.
204 for_update_locks: Arc<dashmap::DashMap<Vec<u8>, Arc<tokio::sync::Mutex<()>>>>,
205}
206
207/// Number of recent commits retained for OCC conflict detection. Large enough
208/// that under-run — and the resulting conservative abort — is rare in practice;
209/// each entry is a small set of touched ids.
210const OCC_REGISTRY_CAPACITY: usize = 4096;
211
212impl Writer {
213 pub async fn new(
214 storage: Arc<StorageManager>,
215 schema_manager: Arc<uni_common::core::schema::SchemaManager>,
216 start_version: u64,
217 ) -> Result<Self> {
218 Self::new_with_config(
219 storage,
220 schema_manager,
221 start_version,
222 UniConfig::default(),
223 None,
224 None,
225 )
226 .await
227 }
228
229 pub async fn new_with_config(
230 storage: Arc<StorageManager>,
231 schema_manager: Arc<uni_common::core::schema::SchemaManager>,
232 start_version: u64,
233 config: UniConfig,
234 wal: Option<Arc<WriteAheadLog>>,
235 allocator: Option<Arc<IdAllocator>>,
236 ) -> Result<Self> {
237 let allocator = if let Some(a) = allocator {
238 a
239 } else {
240 let store = storage.store();
241 let path = object_store::path::Path::from("id_allocator.json");
242 Arc::new(IdAllocator::new(store, path, 1000).await?)
243 };
244
245 let l0_manager = Arc::new(L0Manager::new(start_version, wal));
246
247 let property_manager = Some(Arc::new(PropertyManager::new(
248 storage.clone(),
249 schema_manager.clone(),
250 1000,
251 )));
252
253 let adjacency_manager = storage.adjacency_manager();
254
255 // Hoist the Arc'd fields so we can both stash them on Writer and
256 // hand the same Arcs to the SharedFlushCtx that FlushCoordinator
257 // captures. Single-source-of-truth for each piece of mutable
258 // shared state.
259 let last_flush_time = Arc::new(PlMutex::new(std::time::Instant::now()));
260 let cached_manifest = Arc::new(PlMutex::new(None));
261 let fork_flush_count = Arc::new(AtomicU64::new(0));
262 let fork_fragment_warn_fired = Arc::new(AtomicBool::new(false));
263 let flush_lock = Arc::new(tokio::sync::Mutex::new(()));
264 let compaction_handle = Arc::new(RwLock::new(None));
265 let index_rebuild_manager: Arc<
266 OnceLock<Arc<crate::storage::index_rebuild::IndexRebuildManager>>,
267 > = Arc::new(OnceLock::new());
268
269 let flush_coordinator = if config.async_flush_enabled {
270 let shared = SharedFlushCtx {
271 storage: storage.clone(),
272 l0_manager: l0_manager.clone(),
273 adjacency_manager: adjacency_manager.clone(),
274 property_manager: property_manager.clone(),
275 schema_manager: schema_manager.clone(),
276 cached_manifest: cached_manifest.clone(),
277 last_flush_time: last_flush_time.clone(),
278 fork_id: None,
279 fork_flush_count: fork_flush_count.clone(),
280 fork_fragment_warn_fired: fork_fragment_warn_fired.clone(),
281 fork_fragment_warn_threshold: config.fork_fragment_warn_threshold,
282 flush_lock: flush_lock.clone(),
283 index_rebuild_manager: index_rebuild_manager.clone(),
284 compaction_handle: compaction_handle.clone(),
285 compaction_config: config.compaction.clone(),
286 index_rebuild_config: config.index_rebuild.clone(),
287 auto_rebuild_enabled: config.index_rebuild.auto_rebuild_enabled,
288 };
289 let finalize_fn: Arc<dyn FinalizeFn> = Arc::new(WriterFinalizer);
290 Some(Arc::new(FlushCoordinator::new(
291 config.max_pending_flushes,
292 shared,
293 finalize_fn,
294 )))
295 } else {
296 None
297 };
298
299 let commit_sequence = Arc::new(AtomicU64::new(0));
300 let committed_writes = Arc::new(PlMutex::new(crate::runtime::occ::CommitRegistry::new(
301 OCC_REGISTRY_CAPACITY,
302 )));
303 let for_update_locks = Arc::new(dashmap::DashMap::new());
304
305 Ok(Self {
306 l0_manager,
307 storage,
308 schema_manager,
309 allocator,
310 config,
311 xervo_runtime: OnceLock::new(),
312 property_manager,
313 adjacency_manager,
314 last_flush_time,
315 compaction_handle,
316 index_rebuild_manager,
317 cached_manifest,
318 fork_id: None,
319 fork_flush_count,
320 fork_fragment_warn_fired,
321 flush_lock,
322 flush_coordinator,
323 commit_sequence,
324 committed_writes,
325 for_update_locks,
326 })
327 }
328
329 /// Returns the shared pessimistic lock handle for a `FOR UPDATE` row key,
330 /// creating it on first use. The caller `.lock_owned().await`s the returned
331 /// mutex and holds the guard for the transaction's lifetime.
332 pub fn row_lock_handle(&self, key: &[u8]) -> Arc<tokio::sync::Mutex<()>> {
333 self.for_update_locks
334 .entry(key.to_vec())
335 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
336 .clone()
337 }
338
339 /// Prunes `FOR UPDATE` lock-map entries for `keys` that no live transaction
340 /// holds anymore, so the map does not grow without bound across the keyspace.
341 ///
342 /// Called when a transaction ends, **after** its guards have been dropped.
343 /// `remove_if` evaluates its predicate under the DashMap shard lock, which is
344 /// the same lock `row_lock_handle` takes to clone an entry — so the check
345 /// `strong_count == 1` (only the map holds the `Arc`) is race-free: a
346 /// concurrent acquirer either already cloned the `Arc` (count ≥ 2 → we skip
347 /// removal) or has not yet taken the shard lock (it will mint a fresh entry
348 /// after we remove). Either way no two transactions ever lock different
349 /// `Mutex` instances for the same key.
350 pub fn release_for_update_locks(&self, keys: &[Vec<u8>]) {
351 for key in keys {
352 self.for_update_locks
353 .remove_if(key, |_, handle| Arc::strong_count(handle) == 1);
354 }
355 }
356
357 /// Number of live entries in the `FOR UPDATE` lock map. Introspection for
358 /// tests that the map does not leak entries across transactions (G5).
359 pub fn for_update_lock_count(&self) -> usize {
360 self.for_update_locks.len()
361 }
362
363 /// The current OCC commit sequence. A `FOR UPDATE` acquisition re-stamps a
364 /// fresh transaction's `occ_read_seq` to this so its conflict-detection
365 /// baseline advances to lock-acquisition time (read-latest under the lock).
366 pub fn current_commit_sequence(&self) -> u64 {
367 self.commit_sequence.load(Ordering::Relaxed)
368 }
369
370 /// Build a fresh `SharedFlushCtx` from this Writer's current state.
371 /// Used by the async-flush stream/finalize paths to pass into spawned
372 /// tasks without smuggling `Arc<Writer>` (which would create a cycle
373 /// with `flush_coordinator -> FinalizeFn -> Writer`).
374 pub(crate) fn shared_ctx(&self) -> SharedFlushCtx {
375 SharedFlushCtx {
376 storage: self.storage.clone(),
377 l0_manager: self.l0_manager.clone(),
378 adjacency_manager: self.adjacency_manager.clone(),
379 property_manager: self.property_manager.clone(),
380 schema_manager: self.schema_manager.clone(),
381 cached_manifest: self.cached_manifest.clone(),
382 last_flush_time: self.last_flush_time.clone(),
383 fork_id: self.fork_id,
384 fork_flush_count: self.fork_flush_count.clone(),
385 fork_fragment_warn_fired: self.fork_fragment_warn_fired.clone(),
386 fork_fragment_warn_threshold: self.config.fork_fragment_warn_threshold,
387 flush_lock: self.flush_lock.clone(),
388 index_rebuild_manager: self.index_rebuild_manager.clone(),
389 compaction_handle: self.compaction_handle.clone(),
390 compaction_config: self.config.compaction.clone(),
391 index_rebuild_config: self.config.index_rebuild.clone(),
392 auto_rebuild_enabled: self.config.index_rebuild.auto_rebuild_enabled,
393 }
394 }
395
396 /// Borrow the flush coordinator if async flush is enabled.
397 /// Returns `None` when `config.async_flush_enabled = false`.
398 /// External callers (`drop_fork`) use this to drain pending streams.
399 pub fn flush_coordinator(
400 &self,
401 ) -> Option<&Arc<crate::runtime::flush_coordinator::FlushCoordinator>> {
402 self.flush_coordinator.as_ref()
403 }
404
405 /// Set the index rebuild manager for post-flush automatic rebuild scheduling.
406 ///
407 /// One-shot: returns `Err` if already set. The receiver is `&self` so this
408 /// can be called after the `Writer` has been wrapped in `Arc<Writer>`.
409 pub fn set_index_rebuild_manager(
410 &self,
411 manager: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
412 ) -> Result<()> {
413 self.index_rebuild_manager
414 .set(manager)
415 .map_err(|_| anyhow!("index_rebuild_manager already set"))
416 }
417
418 /// Replay WAL mutations into the current L0 buffer.
419 pub async fn replay_wal(&self, wal_high_water_mark: u64) -> Result<usize> {
420 let l0 = self.l0_manager.get_current();
421 let wal = l0.read().wal.clone();
422
423 if let Some(wal) = wal {
424 wal.initialize().await?;
425 let mutations = wal.replay_since(wal_high_water_mark).await?;
426 let count = mutations.len();
427
428 if count > 0 {
429 log::info!(
430 "Replaying {} mutations from WAL (LSN > {})",
431 count,
432 wal_high_water_mark
433 );
434 let mut l0_guard = l0.write();
435 l0_guard.replay_mutations(mutations)?;
436 // Rebuild the UNIQUE constraint index over the recovered rows
437 // (Bug #9 Mechanism B). `replay_mutations` restores
438 // vertices/properties/labels but never repopulates
439 // `constraint_index` (its only other caller is the live insert
440 // path). Without this, a unique key that lives only in the WAL
441 // (committed but not yet flushed to Lance) is invisible to
442 // `check_unique_constraint_multi` after recovery and a
443 // duplicate of it could be created.
444 self.rebuild_constraint_index(&mut l0_guard);
445 }
446
447 Ok(count)
448 } else {
449 Ok(0)
450 }
451 }
452
453 /// Rebuild the UNIQUE constraint index on a recovered L0 buffer.
454 ///
455 /// Scans every recovered vertex's properties and, for each enabled UNIQUE
456 /// constraint whose target label the vertex carries and whose member
457 /// properties are all present, inserts the same constraint key the live
458 /// insert path builds (`serialize_constraint_key`). Tombstoned vertices are
459 /// skipped. Called after [`L0Buffer::replay_mutations`] under the buffer's
460 /// write lock; the schema is already loaded on the `Writer`.
461 fn rebuild_constraint_index(&self, l0_guard: &mut L0Buffer) {
462 let schema = self.schema_manager.schema();
463 // Collect entries first to avoid borrowing `vertex_properties`
464 // immutably while mutating `constraint_index` through the same guard.
465 let mut keys: Vec<(Vec<u8>, Vid)> = Vec::new();
466 for (&vid, props) in &l0_guard.vertex_properties {
467 if l0_guard.vertex_tombstones.contains(&vid) {
468 continue;
469 }
470 let Some(labels) = l0_guard.vertex_labels.get(&vid) else {
471 continue;
472 };
473 for label in labels {
474 for constraint in &schema.constraints {
475 if !constraint.enabled {
476 continue;
477 }
478 let ConstraintTarget::Label(l) = &constraint.target else {
479 continue;
480 };
481 if l != label {
482 continue;
483 }
484 let ConstraintType::Unique {
485 properties: unique_props,
486 } = &constraint.constraint_type
487 else {
488 continue;
489 };
490 let mut key_values = Vec::new();
491 let mut all_present = true;
492 for prop in unique_props {
493 if let Some(val) = props.get(prop) {
494 key_values.push((prop.clone(), val.clone()));
495 } else {
496 all_present = false;
497 break;
498 }
499 }
500 if all_present {
501 keys.push((serialize_constraint_key(label, &key_values), vid));
502 }
503 }
504 }
505 }
506 for (key, vid) in keys {
507 l0_guard.insert_constraint_key(key, vid);
508 }
509 }
510
511 /// Allocates the next VID (pure auto-increment).
512 pub async fn next_vid(&self) -> Result<Vid> {
513 self.allocator.allocate_vid().await
514 }
515
516 /// Allocates multiple VIDs at once for bulk operations.
517 /// This is more efficient than calling next_vid() in a loop.
518 pub async fn allocate_vids(&self, count: usize) -> Result<Vec<Vid>> {
519 self.allocator.allocate_vids(count).await
520 }
521
522 /// Allocates the next EID (pure auto-increment).
523 pub async fn next_eid(&self, _type_id: u32) -> Result<Eid> {
524 self.allocator.allocate_eid().await
525 }
526
527 /// Allocates multiple EIDs at once for bulk operations.
528 /// This is more efficient than calling next_eid() in a loop.
529 pub async fn allocate_eids(&self, count: usize) -> Result<Vec<Eid>> {
530 self.allocator.allocate_eids(count).await
531 }
532
533 /// Install the embedding runtime exactly once. Receiver is `&self` so it
534 /// can be called after the `Writer` has been wrapped in `Arc<Writer>`.
535 pub fn set_xervo_runtime(&self, runtime: Arc<ModelRuntime>) -> Result<()> {
536 self.xervo_runtime
537 .set(runtime)
538 .map_err(|_| anyhow!("xervo_runtime already set"))
539 }
540
541 pub fn xervo_runtime(&self) -> Option<Arc<ModelRuntime>> {
542 self.xervo_runtime.get().cloned()
543 }
544
545 /// Create a new empty L0 buffer for transaction-scoped mutations.
546 ///
547 /// Only reads the current version — no exclusive lock required on Writer.
548 /// The returned buffer has no WAL reference; mutations are logged at
549 /// commit time via [`Self::commit_transaction_l0`].
550 pub fn create_transaction_l0(&self) -> Arc<RwLock<L0Buffer>> {
551 let current_version = self.l0_manager.get_current().read().current_version;
552 // Transaction mutations are logged to WAL at COMMIT time, not during the transaction.
553 let buf = L0Buffer::new(current_version, None);
554 // SSI: stamp the OCC read sequence at begin so commit can detect any
555 // transaction that committed since. Gated on the runtime `ssi_enabled`
556 // toggle — when off, `occ_read_set` stays `None` and every downstream
557 // read-set recording / commit validation self-gates to a no-op.
558 let buf = if self.config.ssi_enabled {
559 let mut buf = buf;
560 buf.occ_read_seq = self.commit_sequence.load(Ordering::Relaxed);
561 // The read path records observed ids here for SSI antidependency
562 // detection; commit consults it.
563 buf.occ_read_set = Some(Arc::new(parking_lot::Mutex::new(
564 crate::runtime::l0::OccReadSet::default(),
565 )));
566 buf
567 } else {
568 buf
569 };
570 Arc::new(RwLock::new(buf))
571 }
572
573 /// Resolve the target L0 buffer for a mutation.
574 ///
575 /// When `tx_l0` is `Some`, the mutation targets a transaction-private buffer.
576 /// When `None`, it targets the global L0 from the manager.
577 fn resolve_l0(&self, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) -> Arc<RwLock<L0Buffer>> {
578 tx_l0
579 .cloned()
580 .unwrap_or_else(|| self.l0_manager.get_current())
581 }
582
583 fn update_metrics(&self) {
584 let l0 = self.l0_manager.get_current();
585 let size = l0.read().estimated_size;
586 metrics::gauge!("l0_buffer_size_bytes").set(size as f64);
587 }
588
589 /// Overlay-aware issue-#77 edge-endpoint validation.
590 ///
591 /// The current buffer alone does not hold all committed-but-unflushed
592 /// tombstones — a flush rotation moves them onto `pending_flush` until
593 /// the Lance write completes. A vertex is effectively deleted iff,
594 /// walking newest-first (tx → current → pending newest→oldest), the
595 /// first buffer that knows the vid says "tombstoned" (an insert clears
596 /// the tombstone within a buffer, so props/tombstone are mutually
597 /// exclusive per buffer).
598 ///
599 /// Must run under `flush_lock` so the overlay cannot change before the
600 /// merge, and BEFORE the durable WAL flush (see the call site).
601 fn validate_edge_endpoints_overlay(&self, tx_l0: &L0Buffer) -> Result<()> {
602 let chain = self.l0_manager.get_pending_flush();
603 let current = self.l0_manager.get_current();
604 let effectively_deleted = |vid: &Vid| -> bool {
605 if tx_l0.vertex_properties.contains_key(vid) {
606 return false;
607 }
608 if tx_l0.vertex_tombstones.contains(vid) {
609 return true;
610 }
611 {
612 let cur = current.read();
613 if cur.vertex_properties.contains_key(vid) {
614 return false;
615 }
616 if cur.vertex_tombstones.contains(vid) {
617 return true;
618 }
619 }
620 for frozen in chain.iter().rev() {
621 let g = frozen.read();
622 if g.vertex_properties.contains_key(vid) {
623 return false;
624 }
625 if g.vertex_tombstones.contains(vid) {
626 return true;
627 }
628 }
629 false
630 };
631 for (eid, (src_vid, dst_vid, _etype)) in &tx_l0.edge_endpoints {
632 if tx_l0.tombstones.contains_key(eid) {
633 continue; // a deletion, not an insertion — never resurrects a vertex
634 }
635 if effectively_deleted(src_vid) {
636 anyhow::bail!(
637 "Cannot insert edge {}: source vertex {} has been deleted (issue #77)",
638 eid,
639 src_vid
640 );
641 }
642 if effectively_deleted(dst_vid) {
643 anyhow::bail!(
644 "Cannot insert edge {}: destination vertex {} has been deleted (issue #77)",
645 eid,
646 dst_vid
647 );
648 }
649 }
650 Ok(())
651 }
652
653 /// Seed `main_l0` (the current buffer) with the newest pending-overlay
654 /// value for each CRDT property the transaction writes, so the commit
655 /// merge MERGES against the committed CRDT state instead of shadowing it
656 /// (the carve-out lets concurrent CRDT writers commit on the assumption
657 /// that the merge sees the committed value — true only while that value
658 /// lives in current, not mid-flush on `pending_flush`). No-op when
659 /// nothing is pending or the property already exists in current. Vertex
660 /// properties only, mirroring the carve-out itself.
661 fn seed_crdt_state_from_chain(&self, tx_l0: &L0Buffer, main_l0: &mut L0Buffer) {
662 let chain = self.l0_manager.get_pending_flush();
663 if chain.is_empty() {
664 return;
665 }
666 for (vid, props) in &tx_l0.vertex_properties {
667 Self::seed_crdt_props(&chain, *vid, props, main_l0);
668 }
669 }
670
671 /// Per-vertex CRDT seeding (see [`Self::seed_crdt_state_from_chain`]).
672 /// Also used by the non-transactional vertex write path, which CRDT-merges
673 /// into the current buffer directly and has the same shadowing hazard
674 /// during a flush window.
675 fn seed_crdt_props(
676 chain: &[Arc<RwLock<L0Buffer>>],
677 vid: Vid,
678 props: &Properties,
679 target: &mut L0Buffer,
680 ) {
681 for (key, value) in props {
682 if crate::runtime::l0::try_as_crdt(value).is_none() {
683 continue;
684 }
685 if target
686 .vertex_properties
687 .get(&vid)
688 .is_some_and(|p| p.contains_key(key))
689 {
690 continue;
691 }
692 // Newest generation first: the first hit is the live state.
693 for frozen in chain.iter().rev() {
694 let g = frozen.read();
695 if let Some(v) = g.vertex_properties.get(&vid).and_then(|p| p.get(key)) {
696 if crate::runtime::l0::try_as_crdt(v).is_some() {
697 target
698 .vertex_properties
699 .entry(vid)
700 .or_default()
701 .insert(key.clone(), v.clone());
702 }
703 break;
704 }
705 }
706 }
707 }
708
709 /// Commit an externally-owned transaction L0 buffer.
710 ///
711 /// Writes mutations to WAL, flushes, merges into main L0, and replays
712 /// edges into the AdjacencyManager. Returns the WAL LSN of the commit
713 /// (0 when no WAL is configured).
714 /// Commit a transaction's private L0 buffer into main L0.
715 ///
716 /// Returns `(wal_lsn, flush_pending)`. When `flush_pending == true`, the
717 /// post-commit `should_flush()` predicate fired but no flush ran — the
718 /// caller is expected to spawn a background `flush_to_l1`. This is the
719 /// shape used when `UniConfig::async_flush_enabled` is set, so commits
720 /// don't block on L1-streaming I/O.
721 pub async fn commit_transaction_l0(
722 self: &Arc<Self>,
723 tx_l0_arc: Arc<RwLock<L0Buffer>>,
724 ) -> Result<(u64, bool)> {
725 // Hold `flush_lock` across WAL append + flush + main-L0 merge.
726 // Two concurrent commits serialize here; in Phase 3 the outer
727 // `Arc<RwLock<Writer>>` already provides this exclusion, so the
728 // acquisition is uncontended. Phase 4 drops the outer lock and
729 // this becomes the load-bearing serialization point.
730 let _flush_lock_guard = self.flush_lock.lock().await;
731
732 // Crash-recovery seam: simulate process death immediately after winning
733 // the commit serialization point but before any durable work. No-op
734 // unless built with `--features failpoints`. (See ssi_resilience tests.)
735 fail::fail_point!("commit::after-flush-lock");
736
737 // SSI: optimistic conflict detection. This MUST run before any WAL
738 // write — `flush_wal()` below is the durable commit point and the WAL
739 // has no abort marker, so aborting after it would resurrect this
740 // transaction on crash recovery. The write-set is reused for
741 // registration after a successful merge.
742 // Runtime-gated on `config.ssi_enabled`. When off, no validation runs
743 // and `occ_write_set` is `None`, so the post-merge registration below
744 // is skipped — reproducing last-writer-wins exactly.
745 let occ_write_set: Option<crate::runtime::occ::WriteSet> = if self.config.ssi_enabled {
746 let tx_l0 = tx_l0_arc.read();
747 let read_seq = tx_l0.occ_read_seq;
748 let write_set = crate::runtime::occ::WriteSet::from_l0(&tx_l0);
749 if !write_set.is_empty() {
750 // Telemetry: one validation per non-empty (writing) commit. The
751 // ratio of conflicts to validations is the headline abort rate.
752 metrics::counter!("uni_ssi_commit_validations_total").increment(1);
753 // Read-set is consulted only for writing transactions, so a
754 // read-only commit (empty write-set) runs at snapshot isolation.
755 let read_guard = tx_l0.occ_read_set.as_ref().map(|rs| rs.lock());
756 if let Some(conflict) =
757 self.committed_writes
758 .lock()
759 .check(read_seq, &write_set, read_guard.as_deref())
760 {
761 use crate::runtime::occ::Conflict;
762 match &conflict {
763 Conflict::WriteWrite { .. } => metrics::counter!(
764 "uni_ssi_serialization_conflicts_total",
765 "kind" => "write_write",
766 )
767 .increment(1),
768 Conflict::ReadWrite { .. } => metrics::counter!(
769 "uni_ssi_serialization_conflicts_total",
770 "kind" => "read_write",
771 )
772 .increment(1),
773 Conflict::HistoryTruncated { .. } => {
774 metrics::counter!("uni_ssi_history_truncated_total").increment(1)
775 }
776 }
777 return Err(anyhow::Error::new(
778 uni_common::UniError::SerializationConflict {
779 message: conflict.to_string(),
780 },
781 ));
782 }
783 }
784
785 // Validate against the committed-but-unflushed overlay under
786 // `flush_lock`: serializable MERGE uniqueness + CRDT carve-out
787 // soundness. The current buffer alone does not hold all committed
788 // state — a flush rotation moves it onto `pending_flush` until the
789 // Lance write completes (the Bug #9A window, here at the
790 // commit-time layer) — so every check walks [current, pending…].
791 {
792 let pending = self.l0_manager.get_pending_flush();
793 let main_l0 = self.l0_manager.get_current();
794 let overlay: Vec<Arc<RwLock<L0Buffer>>> =
795 std::iter::once(main_l0).chain(pending).collect();
796
797 // SSI / serializable MERGE: abort if a concurrent transaction has
798 // already committed a row with one of this transaction's unique
799 // keys. Commits serialize here, so this closes the race window
800 // left by the per-insert check. (Empty index → no iterations.)
801 for (key, vid) in &tx_l0.constraint_index {
802 if overlay
803 .iter()
804 .any(|b| b.read().has_constraint_key(key, *vid))
805 {
806 metrics::counter!("uni_ssi_constraint_conflicts_total").increment(1);
807 return Err(anyhow::Error::new(
808 uni_common::UniError::ConstraintConflict {
809 message: "unique key already committed by a concurrent \
810 transaction"
811 .to_string(),
812 },
813 ));
814 }
815 }
816
817 // Implicit MERGE phantom guard: a `MERGE` that *created* a node
818 // registered its (label, key-props) here even with no declared
819 // UNIQUE constraint. If a concurrent transaction already committed
820 // the same MERGE key, abort retriably so the two converge to one
821 // node on retry (the loser's MATCH then finds the committed row).
822 // Only MERGE-creates register keys, so a plain CREATE of the same
823 // properties never lands here. (Empty index → no iterations.)
824 for (key, vid) in &tx_l0.merge_guard_index {
825 if overlay
826 .iter()
827 .any(|b| b.read().has_merge_guard_key(key, *vid))
828 {
829 metrics::counter!("uni_ssi_constraint_conflicts_total").increment(1);
830 return Err(anyhow::Error::new(
831 uni_common::UniError::ConstraintConflict {
832 message: "MERGE key already committed by a concurrent \
833 transaction"
834 .to_string(),
835 },
836 ));
837 }
838 }
839
840 // Same race window for global ext_id uniqueness: the per-insert
841 // check ran against an older main L0; re-probe the committed
842 // index here, where commits serialize.
843 for (ext_id, vid) in &tx_l0.extid_index {
844 let taken = overlay.iter().any(|b| {
845 matches!(b.read().extid_index.get(ext_id), Some(&owner) if owner != *vid)
846 });
847 if taken {
848 metrics::counter!("uni_ssi_constraint_conflicts_total").increment(1);
849 return Err(anyhow::Error::new(
850 uni_common::UniError::ConstraintConflict {
851 message: format!(
852 "ext_id '{ext_id}' already committed by a concurrent \
853 transaction"
854 ),
855 },
856 ));
857 }
858 }
859
860 // CRDT carve-out soundness: a pure-CRDT write was dropped from the
861 // write-set assuming its merge commutes. If the overlay holds a
862 // *different* CRDT variant for the same property, the merge would
863 // silently overwrite it — abort instead of losing the update.
864 // (Checked against every overlay buffer: conservative if an old
865 // generation held a different variant that a newer commit already
866 // replaced, but an abort+retry is always sound.)
867 for buf in &overlay {
868 if let Some(conflict) =
869 crate::runtime::occ::crdt_carveout_overwrite(&tx_l0, &buf.read())
870 {
871 metrics::counter!("uni_ssi_crdt_aborts_total").increment(1);
872 return Err(anyhow::Error::new(
873 uni_common::UniError::SerializationConflict {
874 message: conflict.to_string(),
875 },
876 ));
877 }
878 }
879 }
880 Some(write_set)
881 } else {
882 None
883 };
884
885 // Issue #77: an edge whose endpoint is effectively deleted makes the
886 // merge below bail. That bail MUST happen before the durable WAL flush —
887 // after it the transaction is committed-but-unmerged (a ghost commit),
888 // and WAL replay re-hits the same bail, making the database unopenable.
889 // SSI validation was deliberately placed before the flush for exactly
890 // this reason; the endpoint check belongs here too. Runs unconditionally
891 // (issue #77 is not SSI-gated) under `flush_lock`, so the overlay
892 // tombstone state cannot change between here and the merge. A
893 // tombstone may live in a flush-rotated pending buffer rather than
894 // the current buffer, so the check walks the overlay newest-first.
895 {
896 let tx_l0 = tx_l0_arc.read();
897 self.validate_edge_endpoints_overlay(&tx_l0)?;
898 }
899
900 // Crash-recovery seam: SSI validation has passed; the transaction is
901 // about to become durable. A crash here must leave NO trace (validation
902 // happens before the WAL is touched). No-op unless `failpoints`.
903 fail::fail_point!("commit::after-validate");
904
905 // 1. Write transaction mutations to WAL BEFORE merging into main L0
906 // This ensures durability before visibility.
907 {
908 let tx_l0 = tx_l0_arc.read();
909 let main_l0_arc = self.l0_manager.get_current();
910 let main_l0 = main_l0_arc.read();
911
912 // If WAL exists, write mutations to it for durability
913 if let Some(wal) = main_l0.wal.as_ref() {
914 // Order: vertices first, then edges (to ensure src/dst exist on replay)
915
916 // Vertex insertions
917 for (vid, properties) in &tx_l0.vertex_properties {
918 if !tx_l0.vertex_tombstones.contains(vid) {
919 let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
920 wal.append(crate::runtime::wal::Mutation::InsertVertex {
921 vid: *vid,
922 properties: properties.clone(),
923 labels,
924 })?;
925 }
926 }
927
928 // Vertex deletions
929 for vid in &tx_l0.vertex_tombstones {
930 let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
931 wal.append(crate::runtime::wal::Mutation::DeleteVertex { vid: *vid, labels })?;
932 }
933
934 // Label-only mutations (SET n:Label / REMOVE n:Label). After
935 // vertex inserts (so the vertex exists on replay), before edges,
936 // and skipping vertices deleted in this same commit.
937 for vid in &tx_l0.vertex_label_overwrites {
938 if tx_l0.vertex_tombstones.contains(vid) {
939 continue;
940 }
941 let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
942 wal.append(crate::runtime::wal::Mutation::SetVertexLabels {
943 vid: *vid,
944 labels,
945 })?;
946 }
947
948 // Crash-recovery seam: vertices appended, edges not yet. Tests
949 // assert that a crash here (before `flush_wal`) recovers NOTHING
950 // — the durable commit point is the flush below, not append.
951 fail::fail_point!("commit::mid-wal");
952
953 // Edge insertions and deletions from edge_endpoints
954 for (eid, (src_vid, dst_vid, edge_type)) in &tx_l0.edge_endpoints {
955 if tx_l0.tombstones.contains_key(eid) {
956 let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
957 wal.append(crate::runtime::wal::Mutation::DeleteEdge {
958 eid: *eid,
959 src_vid: *src_vid,
960 dst_vid: *dst_vid,
961 edge_type: *edge_type,
962 version,
963 })?;
964 } else {
965 let properties =
966 tx_l0.edge_properties.get(eid).cloned().unwrap_or_default();
967 let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
968 let edge_type_name = tx_l0.edge_types.get(eid).cloned();
969 wal.append(crate::runtime::wal::Mutation::InsertEdge {
970 src_vid: *src_vid,
971 dst_vid: *dst_vid,
972 edge_type: *edge_type,
973 eid: *eid,
974 version,
975 properties,
976 edge_type_name,
977 })?;
978 }
979 }
980
981 // Tombstones for edges that only exist in the global L0 (not in
982 // this transaction's edge_endpoints). Without this, deletes of
983 // pre-existing edges would be silently lost.
984 for (eid, tombstone) in &tx_l0.tombstones {
985 if !tx_l0.edge_endpoints.contains_key(eid) {
986 let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
987 wal.append(crate::runtime::wal::Mutation::DeleteEdge {
988 eid: *eid,
989 src_vid: tombstone.src_vid,
990 dst_vid: tombstone.dst_vid,
991 edge_type: tombstone.edge_type,
992 version,
993 })?;
994 }
995 }
996 }
997 }
998
999 // 2. Flush WAL to durable storage - THIS IS THE COMMIT POINT
1000 let wal_lsn = self.flush_wal().await?;
1001
1002 // Crash-recovery seam: the WAL is durable but main L0 has NOT merged.
1003 // A crash here must RECOVER the transaction on replay (it is committed),
1004 // even though it was never made visible in-process. No-op unless `failpoints`.
1005 fail::fail_point!("commit::after-wal-flush");
1006
1007 // Component C1: if an outstanding snapshot pins the current generation,
1008 // clone it aside (lazy copy-on-write) before merging, so the pinning
1009 // transaction's reads stay isolated from this commit. No-op — and zero
1010 // cost — when nothing is pinned (the common case). We hold `flush_lock`,
1011 // so this cannot race a flush rotate or another commit's merge; the merge
1012 // below re-fetches `get_current()`, landing in the fresh post-freeze buffer.
1013 // Self-gates on the runtime SSI toggle: a snapshot is only ever pinned by
1014 // a transaction begun under `ssi_enabled`, so `is_current_pinned()` is
1015 // always false when SSI is off and this is a zero-cost no-op.
1016 if self.l0_manager.is_current_pinned() {
1017 self.l0_manager.freeze_current_for_snapshot();
1018 metrics::counter!("uni_l0_snapshot_freezes_total").increment(1);
1019 }
1020
1021 // 3. Merge into main L0 and make visible
1022 {
1023 // Write-lock the tx buffer: `merge_take` moves its property maps
1024 // into main L0 instead of cloning them. The commit consumes the
1025 // transaction, so the drained maps are never observed afterwards;
1026 // everything read below (endpoints, versions, tombstones) is left
1027 // intact.
1028 let mut tx_l0 = tx_l0_arc.write();
1029 let main_l0_arc = self.l0_manager.get_current();
1030 let mut main_l0 = main_l0_arc.write();
1031 // A CRDT property's committed state may live only in a
1032 // flush-rotated pending buffer (the post-rotation current is
1033 // empty until the Lance write completes — the Bug #9A window).
1034 // `merge_crdt_properties` merges against the CURRENT buffer's
1035 // value — without seeding, the tx's CRDT state would SHADOW the
1036 // pending buffer's at read time (newest buffer wins per property)
1037 // and concurrent increments would be lost. Seed the newest
1038 // overlay value for each CRDT property the tx writes that current
1039 // lacks, so the merge below merges instead of replaces.
1040 self.seed_crdt_state_from_chain(&tx_l0, &mut main_l0);
1041 main_l0.merge_take(&mut tx_l0)?;
1042
1043 // Replay transaction edges into the AdjacencyManager overlay
1044 for (eid, (src, dst, etype)) in &tx_l0.edge_endpoints {
1045 let edge_version = tx_l0
1046 .edge_versions
1047 .get(eid)
1048 .copied()
1049 .unwrap_or(main_l0.current_version);
1050 if tx_l0.tombstones.contains_key(eid) {
1051 self.adjacency_manager
1052 .add_tombstone(*eid, *src, *dst, *etype, edge_version);
1053 } else {
1054 self.adjacency_manager
1055 .insert_edge(*src, *dst, *eid, *etype, edge_version);
1056 }
1057 }
1058
1059 // Replay tombstones for edges that only exist in the global L0
1060 // (not in this transaction's edge_endpoints).
1061 for (eid, tombstone) in &tx_l0.tombstones {
1062 if !tx_l0.edge_endpoints.contains_key(eid) {
1063 let edge_version = tx_l0
1064 .edge_versions
1065 .get(eid)
1066 .copied()
1067 .unwrap_or(main_l0.current_version);
1068 self.adjacency_manager.add_tombstone(
1069 *eid,
1070 tombstone.src_vid,
1071 tombstone.dst_vid,
1072 tombstone.edge_type,
1073 edge_version,
1074 );
1075 }
1076 }
1077 }
1078
1079 // Crash-recovery seam: durable AND merged, but the in-memory commit
1080 // registry has not recorded this write-set yet. A crash here is
1081 // indistinguishable from one at `after-wal-flush` on reopen (the
1082 // registry is in-memory and rebuilt empty); the tx still recovers.
1083 fail::fail_point!("commit::after-merge");
1084
1085 // SSI: register this commit's write-set under a fresh commit sequence so
1086 // later transactions detect conflicts against it. Still under
1087 // `flush_lock`, before the async-flush branch can drop the guard.
1088 // `occ_write_set` is `Some` only when `config.ssi_enabled`.
1089 if let Some(write_set) = occ_write_set
1090 && !write_set.is_empty()
1091 {
1092 let seq = self.commit_sequence.fetch_add(1, Ordering::Relaxed) + 1;
1093 self.committed_writes.lock().record(seq, write_set);
1094 }
1095
1096 self.update_metrics();
1097
1098 // 4. Best-effort post-commit auto-flush.
1099 //
1100 // Two paths:
1101 // - async_flush_enabled = false (default): inline under our
1102 // existing flush_lock guard via flush_inline_under_lock.
1103 // - async_flush_enabled = true: rotate inline, drop flush_lock,
1104 // then submit the stream phase to the coordinator. Gated on
1105 // `pending_flush_count() < max_pending_flushes` so we don't
1106 // stack up rotations beyond the configured pipeline depth.
1107 // `try_acquire_permit` is non-blocking: if we lose the race
1108 // for the last permit, we just skip this trigger (the next
1109 // commit retries).
1110 let mut flush_pending = false;
1111 if self.should_flush() {
1112 if self.config.async_flush_enabled
1113 && let Some(coord) = self.flush_coordinator.as_ref()
1114 && coord.pending_flush_count() < self.config.max_pending_flushes
1115 {
1116 match coord.try_acquire_permit() {
1117 Some(permit) => {
1118 match self.flush_l0_rotate().await {
1119 Ok(rotate_out) => {
1120 // Allocate the rotate seq and bump pending ONLY
1121 // after the rotate succeeds (Bug #3). A failed
1122 // rotate must consume neither: the finalizer
1123 // advances strictly in consecutive seq order and
1124 // only decrements pending on finalize, so a
1125 // leaked seq/pending from a failed rotate would
1126 // wedge the finalizer forever and climb pending
1127 // toward `max_pending_flushes`. The seq is still
1128 // allocated under `flush_lock` (immediately after
1129 // the rotate, before the guard drops below), so
1130 // concurrent rotates keep seq order == rotation
1131 // order, and the seq is not used until submit.
1132 let seq = coord.next_rotate_seq();
1133 coord.note_pending();
1134 // Release flush_lock BEFORE the spawn so concurrent
1135 // commits can proceed while the stream runs.
1136 drop(_flush_lock_guard);
1137 let parent_manifest = self.cached_manifest.lock().clone();
1138 let rotated = crate::runtime::flush_coordinator::RotatedFlush {
1139 seq,
1140 old_l0_arc: rotate_out.old_l0_arc.clone(),
1141 wal_lsn: rotate_out.wal_lsn,
1142 current_version: rotate_out.current_version,
1143 name: None,
1144 parent_manifest,
1145 permit,
1146 flush_in_progress_guard: rotate_out.flush_in_progress_guard,
1147 };
1148 let writer = self.clone();
1149 let _ticket = coord.submit_for_stream(
1150 rotated,
1151 move |old_l0, wal, ver, n| async move {
1152 let outcome =
1153 writer.flush_stream_l1(old_l0, wal, ver, n).await?;
1154 Ok(crate::runtime::flush_coordinator::FlushOutcome {
1155 new_manifest: outcome.manifest,
1156 snapshot_id: outcome.snapshot_id,
1157 })
1158 },
1159 );
1160 flush_pending = true;
1161 // Early return — flush_lock already dropped.
1162 return Ok((wal_lsn, flush_pending));
1163 }
1164 Err(e) => {
1165 tracing::warn!("Async rotate failed (non-critical): {}", e);
1166 // No seq was allocated and pending was not
1167 // bumped (both moved into the Ok arm for Bug
1168 // #3), so the finalizer is not wedged. The
1169 // permit drops here, freeing the slot.
1170 }
1171 }
1172 }
1173 None => {
1174 // Race: someone else grabbed the last permit. Skip;
1175 // next commit will retry should_flush().
1176 metrics::counter!("uni_flush_trigger_skipped_total").increment(1);
1177 }
1178 }
1179 } else if let Err(e) = self.flush_inline_under_lock(None).await {
1180 tracing::warn!("Post-commit flush check failed (non-critical): {}", e);
1181 }
1182 }
1183
1184 Ok((wal_lsn, flush_pending))
1185 }
1186
1187 /// Flush the WAL buffer to durable storage.
1188 ///
1189 /// Returns the LSN of the flushed segment, or `0` when no WAL is configured.
1190 pub async fn flush_wal(&self) -> Result<u64> {
1191 let l0 = self.l0_manager.get_current();
1192 let wal = l0.read().wal.clone();
1193
1194 match wal {
1195 Some(wal) => Ok(wal.flush().await?),
1196 None => Ok(0),
1197 }
1198 }
1199
1200 /// Record property removals in the active L0 mutation stats.
1201 ///
1202 /// Routes to the transaction L0 if provided, otherwise to the main L0.
1203 pub fn track_properties_removed(&self, count: usize, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) {
1204 if count == 0 {
1205 return;
1206 }
1207 let l0 = self.resolve_l0(tx_l0);
1208 l0.write().mutation_stats.properties_removed += count;
1209 }
1210
1211 /// Validates vertex constraints for the given properties.
1212 /// In the new design, label is passed as a parameter since VID no longer embeds label.
1213 async fn validate_vertex_constraints_for_label(
1214 &self,
1215 vid: Vid,
1216 properties: &Properties,
1217 label: &str,
1218 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1219 ) -> Result<()> {
1220 self.validate_vertex_constraints_for_label_impl(vid, properties, label, tx_l0, false)
1221 .await
1222 }
1223
1224 /// Partial-update sibling: validates only constraints touching keys
1225 /// present in `properties` (the touched set). NOT NULL is checked
1226 /// only for touched keys; multi-key UNIQUE / CHECK / EXISTS are
1227 /// skipped when any referenced key is absent (the caller is
1228 /// expected to have routed to the full-row path in that case via
1229 /// `touched_needs_full_read`).
1230 async fn validate_vertex_constraints_for_label_partial(
1231 &self,
1232 vid: Vid,
1233 properties: &Properties,
1234 label: &str,
1235 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1236 ) -> Result<()> {
1237 self.validate_vertex_constraints_for_label_impl(vid, properties, label, tx_l0, true)
1238 .await
1239 }
1240
1241 async fn validate_vertex_constraints_for_label_impl(
1242 &self,
1243 vid: Vid,
1244 properties: &Properties,
1245 label: &str,
1246 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1247 partial: bool,
1248 ) -> Result<()> {
1249 let schema = self.schema_manager.schema();
1250
1251 {
1252 // 1. Check NOT NULL constraints (from Property definitions).
1253 // Under partial-update mode, skip properties NOT in
1254 // `properties` — they retain their previous (already-
1255 // validated) value.
1256 if let Some(props_meta) = schema.properties.get(label) {
1257 for (prop_name, meta) in props_meta {
1258 if !meta.nullable {
1259 let present = properties.get(prop_name);
1260 if partial && present.is_none() {
1261 continue;
1262 }
1263 if present.is_none_or(|v| v.is_null()) {
1264 log::warn!(
1265 "Constraint violation: Property '{}' cannot be null for label '{}'",
1266 prop_name,
1267 label
1268 );
1269 return Err(anyhow!(
1270 "Constraint violation: Property '{}' cannot be null",
1271 prop_name
1272 ));
1273 }
1274 }
1275 }
1276 }
1277
1278 // 2. Check Explicit Constraints (Unique, Check, etc.)
1279 for constraint in &schema.constraints {
1280 if !constraint.enabled {
1281 continue;
1282 }
1283 match &constraint.target {
1284 ConstraintTarget::Label(l) if l == label => {}
1285 _ => continue,
1286 }
1287
1288 match &constraint.constraint_type {
1289 ConstraintType::Unique {
1290 properties: unique_props,
1291 } => {
1292 // Support single and multi-property unique constraints
1293 if !unique_props.is_empty() {
1294 let mut key_values = Vec::new();
1295 let mut missing = false;
1296 for prop in unique_props {
1297 if let Some(val) = properties.get(prop) {
1298 key_values.push((prop.clone(), val.clone()));
1299 } else {
1300 missing = true; // Can't enforce if property missing (partial update?)
1301 // For INSERT, missing means null?
1302 // If property is nullable, unique constraint typically allows multiple nulls or ignores?
1303 // For now, only check if ALL keys are present
1304 }
1305 }
1306
1307 if !missing {
1308 self.check_unique_constraint_multi(label, &key_values, vid, tx_l0)
1309 .await?;
1310 }
1311 }
1312 }
1313 ConstraintType::Exists { property } => {
1314 if properties.get(property).is_none_or(|v| v.is_null()) {
1315 log::warn!(
1316 "Constraint violation: Property '{}' must exist for label '{}'",
1317 property,
1318 label
1319 );
1320 return Err(anyhow!(
1321 "Constraint violation: Property '{}' must exist",
1322 property
1323 ));
1324 }
1325 }
1326 ConstraintType::Check { expression } => {
1327 if !self.evaluate_check_constraint(expression, properties)? {
1328 return Err(anyhow!(
1329 "CHECK constraint '{}' violated: expression '{}' evaluated to false",
1330 constraint.name,
1331 expression
1332 ));
1333 }
1334 }
1335 _ => {
1336 return Err(anyhow!("Unsupported constraint type"));
1337 }
1338 }
1339 }
1340 }
1341 Ok(())
1342 }
1343
1344 /// Validates vertex constraints for a vertex with the given labels.
1345 /// Labels must be passed explicitly since the vertex may not yet be in L0.
1346 /// Unknown labels (not in schema) are skipped.
1347 async fn validate_vertex_constraints(
1348 &self,
1349 vid: Vid,
1350 properties: &Properties,
1351 labels: &[String],
1352 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1353 ) -> Result<()> {
1354 let schema = self.schema_manager.schema();
1355
1356 // Validate constraints only for known labels
1357 for label in labels {
1358 // Skip unknown labels (schemaless support)
1359 if schema.get_label_case_insensitive(label).is_none() {
1360 continue;
1361 }
1362 self.validate_vertex_constraints_for_label(vid, properties, label, tx_l0)
1363 .await?;
1364 }
1365
1366 // Check global ext_id uniqueness if ext_id is provided
1367 if let Some(ext_id) = properties.get("ext_id").and_then(|v| v.as_str()) {
1368 self.check_extid_globally_unique(ext_id, vid, tx_l0).await?;
1369 }
1370
1371 Ok(())
1372 }
1373
1374 /// Partial sibling of `validate_vertex_constraints` — validates only
1375 /// constraints touching keys present in `properties`. Used by
1376 /// `insert_vertex_partial`'s fast path; the caller pre-screens for
1377 /// multi-key UNIQUE constraints via `touched_needs_full_read`.
1378 async fn validate_vertex_constraints_partial(
1379 &self,
1380 vid: Vid,
1381 touched: &Properties,
1382 labels: &[String],
1383 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1384 ) -> Result<()> {
1385 let schema = self.schema_manager.schema();
1386 for label in labels {
1387 if schema.get_label_case_insensitive(label).is_none() {
1388 continue;
1389 }
1390 self.validate_vertex_constraints_for_label_partial(vid, touched, label, tx_l0)
1391 .await?;
1392 }
1393 if let Some(ext_id) = touched.get("ext_id").and_then(|v| v.as_str()) {
1394 self.check_extid_globally_unique(ext_id, vid, tx_l0).await?;
1395 }
1396 Ok(())
1397 }
1398
1399 /// Collect ext_ids and unique constraint keys from an iterator of vertex properties.
1400 ///
1401 /// Used to build a constraint key index from L0 buffers for batch validation.
1402 fn collect_constraint_keys_from_properties<'a>(
1403 properties_iter: impl Iterator<Item = &'a Properties>,
1404 label: &str,
1405 constraints: &[uni_common::core::schema::Constraint],
1406 existing_keys: &mut HashMap<String, HashSet<String>>,
1407 existing_extids: &mut HashSet<String>,
1408 ) {
1409 for props in properties_iter {
1410 if let Some(ext_id) = props.get("ext_id").and_then(|v| v.as_str()) {
1411 existing_extids.insert(ext_id.to_string());
1412 }
1413
1414 for constraint in constraints {
1415 if !constraint.enabled {
1416 continue;
1417 }
1418 if let ConstraintTarget::Label(l) = &constraint.target {
1419 if l != label {
1420 continue;
1421 }
1422 } else {
1423 continue;
1424 }
1425
1426 if let ConstraintType::Unique {
1427 properties: unique_props,
1428 } = &constraint.constraint_type
1429 {
1430 let mut key_parts = Vec::new();
1431 let mut all_present = true;
1432 for prop in unique_props {
1433 if let Some(val) = props.get(prop) {
1434 key_parts.push(format!("{}:{}", prop, val));
1435 } else {
1436 all_present = false;
1437 break;
1438 }
1439 }
1440 if all_present {
1441 let key = key_parts.join("|");
1442 existing_keys
1443 .entry(constraint.name.clone())
1444 .or_default()
1445 .insert(key);
1446 }
1447 }
1448 }
1449 }
1450 }
1451
1452 /// Validates constraints for a batch of vertices efficiently.
1453 ///
1454 /// This method builds an in-memory index from L0 buffers ONCE instead of scanning
1455 /// per vertex, reducing complexity from O(n²) to O(n) for bulk inserts.
1456 ///
1457 /// # Arguments
1458 /// * `vids` - VIDs of vertices being inserted
1459 /// * `properties_batch` - Properties for each vertex
1460 /// * `label` - Label for all vertices (assumes single label for now)
1461 ///
1462 /// # Performance
1463 /// For N vertices with unique constraints:
1464 /// - Old approach: O(N²) - scan L0 buffer N times
1465 /// - New approach: O(N) - scan L0 buffer once, build HashSet, check each vertex in O(1)
1466 async fn validate_vertex_batch_constraints(
1467 &self,
1468 vids: &[Vid],
1469 properties_batch: &[Properties],
1470 label: &str,
1471 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1472 ) -> Result<()> {
1473 if vids.len() != properties_batch.len() {
1474 return Err(anyhow!("VID/properties length mismatch"));
1475 }
1476
1477 let schema = self.schema_manager.schema();
1478
1479 // 1. Validate NOT NULL constraints for each vertex
1480 if let Some(props_meta) = schema.properties.get(label) {
1481 for (idx, properties) in properties_batch.iter().enumerate() {
1482 for (prop_name, meta) in props_meta {
1483 if !meta.nullable && properties.get(prop_name).is_none_or(|v| v.is_null()) {
1484 return Err(anyhow!(
1485 "Constraint violation at index {}: Property '{}' cannot be null",
1486 idx,
1487 prop_name
1488 ));
1489 }
1490 }
1491 }
1492 }
1493
1494 // 2. Build constraint key index from L0 buffers (ONCE for entire batch)
1495 let mut existing_keys: HashMap<String, HashSet<String>> = HashMap::new();
1496 let mut existing_extids: HashSet<String> = HashSet::new();
1497
1498 // Scan current L0 buffer
1499 {
1500 let l0 = self.l0_manager.get_current();
1501 let l0_guard = l0.read();
1502 Self::collect_constraint_keys_from_properties(
1503 l0_guard.vertex_properties.values(),
1504 label,
1505 &schema.constraints,
1506 &mut existing_keys,
1507 &mut existing_extids,
1508 );
1509 }
1510
1511 // Scan transaction L0 if present
1512 if let Some(tx_l0) = tx_l0 {
1513 let tx_l0_guard = tx_l0.read();
1514 Self::collect_constraint_keys_from_properties(
1515 tx_l0_guard.vertex_properties.values(),
1516 label,
1517 &schema.constraints,
1518 &mut existing_keys,
1519 &mut existing_extids,
1520 );
1521 }
1522
1523 // 3. Check batch vertices against index AND check for duplicates within batch
1524 let mut batch_keys: HashMap<String, HashMap<String, usize>> = HashMap::new();
1525 let mut batch_extids: HashMap<String, usize> = HashMap::new();
1526
1527 for (idx, (_vid, properties)) in vids.iter().zip(properties_batch.iter()).enumerate() {
1528 // Check ext_id uniqueness
1529 if let Some(ext_id) = properties.get("ext_id").and_then(|v| v.as_str()) {
1530 if existing_extids.contains(ext_id) {
1531 return Err(anyhow!(
1532 "Constraint violation at index {}: ext_id '{}' already exists",
1533 idx,
1534 ext_id
1535 ));
1536 }
1537 if let Some(first_idx) = batch_extids.get(ext_id) {
1538 return Err(anyhow!(
1539 "Constraint violation: ext_id '{}' duplicated in batch at indices {} and {}",
1540 ext_id,
1541 first_idx,
1542 idx
1543 ));
1544 }
1545 batch_extids.insert(ext_id.to_string(), idx);
1546 }
1547
1548 // Check unique constraints
1549 for constraint in &schema.constraints {
1550 if !constraint.enabled {
1551 continue;
1552 }
1553 if let ConstraintTarget::Label(l) = &constraint.target {
1554 if l != label {
1555 continue;
1556 }
1557 } else {
1558 continue;
1559 }
1560
1561 match &constraint.constraint_type {
1562 ConstraintType::Unique {
1563 properties: unique_props,
1564 } => {
1565 let mut key_parts = Vec::new();
1566 let mut all_present = true;
1567 for prop in unique_props {
1568 if let Some(val) = properties.get(prop) {
1569 key_parts.push(format!("{}:{}", prop, val));
1570 } else {
1571 all_present = false;
1572 break;
1573 }
1574 }
1575
1576 if all_present {
1577 let key = key_parts.join("|");
1578
1579 // Check against existing L0 keys
1580 if let Some(keys) = existing_keys.get(&constraint.name)
1581 && keys.contains(&key)
1582 {
1583 return Err(anyhow!(
1584 "Constraint violation at index {}: Duplicate composite key for label '{}' (constraint '{}')",
1585 idx,
1586 label,
1587 constraint.name
1588 ));
1589 }
1590
1591 // Check for duplicates within batch
1592 let batch_constraint_keys =
1593 batch_keys.entry(constraint.name.clone()).or_default();
1594 if let Some(first_idx) = batch_constraint_keys.get(&key) {
1595 return Err(anyhow!(
1596 "Constraint violation: Duplicate key '{}' in batch at indices {} and {}",
1597 key,
1598 first_idx,
1599 idx
1600 ));
1601 }
1602 batch_constraint_keys.insert(key, idx);
1603 }
1604 }
1605 ConstraintType::Exists { property }
1606 if properties.get(property).is_none_or(|v| v.is_null()) =>
1607 {
1608 return Err(anyhow!(
1609 "Constraint violation at index {}: Property '{}' must exist",
1610 idx,
1611 property
1612 ));
1613 }
1614 ConstraintType::Check { expression }
1615 if !self.evaluate_check_constraint(expression, properties)? =>
1616 {
1617 return Err(anyhow!(
1618 "Constraint violation at index {}: CHECK constraint '{}' violated",
1619 idx,
1620 constraint.name
1621 ));
1622 }
1623 _ => {}
1624 }
1625 }
1626 }
1627
1628 // 4. Check storage for unique constraints (can batch this into a single query)
1629 for constraint in &schema.constraints {
1630 if !constraint.enabled {
1631 continue;
1632 }
1633 if let ConstraintTarget::Label(l) = &constraint.target {
1634 if l != label {
1635 continue;
1636 }
1637 } else {
1638 continue;
1639 }
1640
1641 if let ConstraintType::Unique {
1642 properties: unique_props,
1643 } = &constraint.constraint_type
1644 {
1645 // Build compound OR filter for all batch vertices
1646 let mut or_filters = Vec::new();
1647 for properties in properties_batch.iter() {
1648 let mut and_parts = Vec::new();
1649 let mut all_present = true;
1650 for prop in unique_props {
1651 if let Some(val) = properties.get(prop) {
1652 let val_str = match val {
1653 Value::String(s) => format!("'{}'", s.replace('\'', "''")),
1654 Value::Int(n) => n.to_string(),
1655 Value::Float(f) => f.to_string(),
1656 Value::Bool(b) => b.to_string(),
1657 _ => {
1658 all_present = false;
1659 break;
1660 }
1661 };
1662 and_parts.push(format!("{} = {}", prop, val_str));
1663 } else {
1664 all_present = false;
1665 break;
1666 }
1667 }
1668 if all_present {
1669 or_filters.push(format!("({})", and_parts.join(" AND ")));
1670 }
1671 }
1672
1673 #[cfg(feature = "lance-backend")]
1674 if !or_filters.is_empty() {
1675 let vid_list: Vec<String> =
1676 vids.iter().map(|v| v.as_u64().to_string()).collect();
1677 let filter = format!(
1678 "({}) AND _deleted = false AND _vid NOT IN ({})",
1679 or_filters.join(" OR "),
1680 vid_list.join(", ")
1681 );
1682
1683 if let Ok(ds) = self.storage.vertex_dataset(label)
1684 && let Ok(lance_ds) = ds.open_raw().await
1685 {
1686 let count = lance_ds.count_rows(Some(filter.clone())).await?;
1687 if count > 0 {
1688 return Err(anyhow!(
1689 "Constraint violation: Duplicate composite key for label '{}' in storage (constraint '{}')",
1690 label,
1691 constraint.name
1692 ));
1693 }
1694 }
1695 }
1696 }
1697 }
1698
1699 Ok(())
1700 }
1701
1702 /// Checks that ext_id is globally unique across all vertices.
1703 ///
1704 /// Searches L0 buffers (current, transaction, pending) and the main vertices table
1705 /// to ensure no other vertex uses this ext_id.
1706 ///
1707 /// # Errors
1708 ///
1709 /// Returns error if another vertex with the same ext_id exists.
1710 async fn check_extid_globally_unique(
1711 &self,
1712 ext_id: &str,
1713 current_vid: Vid,
1714 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1715 ) -> Result<()> {
1716 // Check L0 buffers: current, transaction, and pending flush
1717 let l0_buffers_to_check: Vec<Arc<RwLock<L0Buffer>>> = {
1718 let mut buffers = vec![self.l0_manager.get_current()];
1719 if let Some(tx_l0) = tx_l0 {
1720 buffers.push(tx_l0.clone());
1721 }
1722 buffers.extend(self.l0_manager.get_pending_flush());
1723 buffers
1724 };
1725
1726 for l0 in &l0_buffers_to_check {
1727 // O(1) per buffer via the maintained `extid_index` (the previous
1728 // full `vertex_properties` scan made constrained ingest O(n²)).
1729 if let Some(&vid) = l0.read().extid_index.get(ext_id)
1730 && vid != current_vid
1731 {
1732 return Err(anyhow!(
1733 "Constraint violation: ext_id '{}' already exists (vertex {:?})",
1734 ext_id,
1735 vid
1736 ));
1737 }
1738 }
1739
1740 // Check main vertices table (if it exists)
1741 // Pass None for global uniqueness check (not snapshot-isolated)
1742 let backend = self.storage.backend();
1743 if let Ok(Some(found_vid)) = MainVertexDataset::find_by_ext_id(backend, ext_id, None).await
1744 && found_vid != current_vid
1745 {
1746 return Err(anyhow!(
1747 "Constraint violation: ext_id '{}' already exists (vertex {:?})",
1748 ext_id,
1749 found_vid
1750 ));
1751 }
1752
1753 Ok(())
1754 }
1755
1756 /// Helper to get vertex labels from L0 buffer.
1757 fn get_vertex_labels_from_l0(&self, vid: Vid) -> Option<Vec<String>> {
1758 let l0 = self.l0_manager.get_current();
1759 let l0_guard = l0.read();
1760 // Check if vertex is tombstoned (deleted) - if so, return None
1761 if l0_guard.vertex_tombstones.contains(&vid) {
1762 return None;
1763 }
1764 l0_guard.get_vertex_labels(vid).map(|l| l.to_vec())
1765 }
1766
1767 /// Get vertex labels from all sources: current L0, pending L0s, and storage.
1768 /// This is the proper way to read vertex labels after a flush, as it checks both
1769 /// in-memory buffers and persisted storage.
1770 pub async fn get_vertex_labels(
1771 &self,
1772 vid: Vid,
1773 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1774 ) -> Option<Vec<String>> {
1775 // 1. Check current L0
1776 if let Some(labels) = self.get_vertex_labels_from_l0(vid) {
1777 return Some(labels);
1778 }
1779
1780 // 2. Check transaction L0 if present
1781 if let Some(tx_l0) = tx_l0 {
1782 let guard = tx_l0.read();
1783 if guard.vertex_tombstones.contains(&vid) {
1784 return None;
1785 }
1786 if let Some(labels) = guard.get_vertex_labels(vid) {
1787 return Some(labels.to_vec());
1788 }
1789 }
1790
1791 // 3. Check pending flush L0s
1792 for pending_l0 in self.l0_manager.get_pending_flush() {
1793 let guard = pending_l0.read();
1794 if guard.vertex_tombstones.contains(&vid) {
1795 return None;
1796 }
1797 if let Some(labels) = guard.get_vertex_labels(vid) {
1798 return Some(labels.to_vec());
1799 }
1800 }
1801
1802 // 4. Check storage
1803 self.find_vertex_labels_in_storage(vid).await.ok().flatten()
1804 }
1805
1806 /// Helper to get edge type from L0 buffer.
1807 fn get_edge_type_from_l0(&self, eid: Eid) -> Option<String> {
1808 let l0 = self.l0_manager.get_current();
1809 let l0_guard = l0.read();
1810 l0_guard.get_edge_type(eid).map(|s| s.to_string())
1811 }
1812
1813 /// Look up the edge type ID (u32) for an EID from the L0 buffer's edge endpoints.
1814 /// Falls back to the transaction L0 if available.
1815 pub fn get_edge_type_id_from_l0(
1816 &self,
1817 eid: Eid,
1818 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1819 ) -> Option<u32> {
1820 // Check transaction L0 first
1821 if let Some(tx_l0) = tx_l0 {
1822 let guard = tx_l0.read();
1823 if let Some((_, _, etype)) = guard.get_edge_endpoint_full(eid) {
1824 return Some(etype);
1825 }
1826 }
1827 // Fall back to main L0
1828 let l0 = self.l0_manager.get_current();
1829 let l0_guard = l0.read();
1830 l0_guard
1831 .get_edge_endpoint_full(eid)
1832 .map(|(_, _, etype)| etype)
1833 }
1834
1835 /// Set the type name for an edge (used for schemaless edge types).
1836 /// This is called during CREATE for edge types not found in the schema.
1837 pub fn set_edge_type(
1838 &self,
1839 eid: Eid,
1840 type_name: String,
1841 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1842 ) {
1843 self.resolve_l0(tx_l0).write().set_edge_type(eid, type_name);
1844 }
1845
1846 /// Evaluate a simple CHECK constraint expression.
1847 /// Supports: "property op value" (e.g., "age > 18", "status = 'active'")
1848 fn evaluate_check_constraint(&self, expression: &str, properties: &Properties) -> Result<bool> {
1849 let parts: Vec<&str> = expression.split_whitespace().collect();
1850 if parts.len() != 3 {
1851 // For now, only support "prop op val"
1852 // Fallback to true if too complex to avoid breaking, but warn
1853 log::warn!(
1854 "Complex CHECK constraint expression '{}' not fully supported yet; allowing write.",
1855 expression
1856 );
1857 return Ok(true);
1858 }
1859
1860 let prop_part = parts[0].trim_start_matches('(');
1861 // Handle "variable.property" format - take the part after the dot
1862 let prop_name = if let Some(idx) = prop_part.find('.') {
1863 &prop_part[idx + 1..]
1864 } else {
1865 prop_part
1866 };
1867
1868 let op = parts[1];
1869 let val_str = parts[2].trim_end_matches(')');
1870
1871 let prop_val = match properties.get(prop_name) {
1872 Some(v) => v,
1873 None => return Ok(true), // If property missing, CHECK usually passes (unless NOT NULL)
1874 };
1875
1876 // Parse value string (handle quotes for strings)
1877 let target_val = if (val_str.starts_with('\'') && val_str.ends_with('\''))
1878 || (val_str.starts_with('"') && val_str.ends_with('"'))
1879 {
1880 Value::String(val_str[1..val_str.len() - 1].to_string())
1881 } else if let Ok(n) = val_str.parse::<i64>() {
1882 Value::Int(n)
1883 } else if let Ok(n) = val_str.parse::<f64>() {
1884 Value::Float(n)
1885 } else if let Ok(b) = val_str.parse::<bool>() {
1886 Value::Bool(b)
1887 } else {
1888 // Check for internal format wrappers if they somehow leaked through
1889 if val_str.starts_with("Number(") && val_str.ends_with(')') {
1890 let n_str = &val_str[7..val_str.len() - 1];
1891 if let Ok(n) = n_str.parse::<i64>() {
1892 Value::Int(n)
1893 } else if let Ok(n) = n_str.parse::<f64>() {
1894 Value::Float(n)
1895 } else {
1896 Value::String(val_str.to_string())
1897 }
1898 } else {
1899 Value::String(val_str.to_string())
1900 }
1901 };
1902
1903 match op {
1904 "=" | "==" => Ok(prop_val == &target_val),
1905 "!=" | "<>" => Ok(prop_val != &target_val),
1906 ">" => self
1907 .compare_values(prop_val, &target_val)
1908 .map(|o| o.is_gt()),
1909 "<" => self
1910 .compare_values(prop_val, &target_val)
1911 .map(|o| o.is_lt()),
1912 ">=" => self
1913 .compare_values(prop_val, &target_val)
1914 .map(|o| o.is_ge()),
1915 "<=" => self
1916 .compare_values(prop_val, &target_val)
1917 .map(|o| o.is_le()),
1918 _ => {
1919 log::warn!("Unsupported operator '{}' in CHECK constraint", op);
1920 Ok(true)
1921 }
1922 }
1923 }
1924
1925 fn compare_values(&self, a: &Value, b: &Value) -> Result<std::cmp::Ordering> {
1926 use std::cmp::Ordering;
1927
1928 fn cmp_f64(x: f64, y: f64) -> Ordering {
1929 x.partial_cmp(&y).unwrap_or(Ordering::Equal)
1930 }
1931
1932 match (a, b) {
1933 (Value::Int(n1), Value::Int(n2)) => Ok(n1.cmp(n2)),
1934 (Value::Float(f1), Value::Float(f2)) => Ok(cmp_f64(*f1, *f2)),
1935 (Value::Int(n), Value::Float(f)) => Ok(cmp_f64(*n as f64, *f)),
1936 (Value::Float(f), Value::Int(n)) => Ok(cmp_f64(*f, *n as f64)),
1937 (Value::String(s1), Value::String(s2)) => Ok(s1.cmp(s2)),
1938 _ => Err(anyhow!(
1939 "Cannot compare incompatible types: {:?} vs {:?}",
1940 a,
1941 b
1942 )),
1943 }
1944 }
1945
1946 async fn check_unique_constraint_multi(
1947 &self,
1948 label: &str,
1949 key_values: &[(String, Value)],
1950 current_vid: Vid,
1951 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1952 ) -> Result<()> {
1953 // Serialize constraint key once for O(1) lookups
1954 let key = serialize_constraint_key(label, key_values);
1955
1956 // 1. Check L0 (in-memory) using O(1) constraint index
1957 {
1958 let l0 = self.l0_manager.get_current();
1959 let l0_guard = l0.read();
1960 if l0_guard.has_constraint_key(&key, current_vid) {
1961 return Err(anyhow!(
1962 "Constraint violation: Duplicate composite key for label '{}'",
1963 label
1964 ));
1965 }
1966 }
1967
1968 // 1b. Check pending-flush buffers (Bug #9A). A flush rotates a key's
1969 // buffer onto `pending_flush` and installs a fresh empty current
1970 // buffer; until the rotated rows reach Lance the key is invisible to
1971 // both the current-buffer check above and the storage check below, so
1972 // a duplicate could slip through that flush window. Mirror the read
1973 // paths (e.g. `check_extid_globally_unique`, `get_vertex_labels`) that
1974 // already consult `pending_flush`.
1975 for pending_l0 in self.l0_manager.get_pending_flush() {
1976 if pending_l0.read().has_constraint_key(&key, current_vid) {
1977 return Err(anyhow!(
1978 "Constraint violation: Duplicate composite key for label '{}' (in pending flush)",
1979 label
1980 ));
1981 }
1982 }
1983
1984 // Check Transaction L0
1985 if let Some(tx_l0) = tx_l0 {
1986 let tx_l0_guard = tx_l0.read();
1987 if tx_l0_guard.has_constraint_key(&key, current_vid) {
1988 return Err(anyhow!(
1989 "Constraint violation: Duplicate composite key for label '{}' (in tx)",
1990 label
1991 ));
1992 }
1993 }
1994
1995 // 2. Check Storage (L1/L2)
1996 let filters: Vec<String> = key_values
1997 .iter()
1998 .map(|(prop, val)| {
1999 let val_str = match val {
2000 Value::String(s) => format!("'{}'", s.replace('\'', "''")),
2001 Value::Int(n) => n.to_string(),
2002 Value::Float(f) => f.to_string(),
2003 Value::Bool(b) => b.to_string(),
2004 _ => "NULL".to_string(),
2005 };
2006 format!("{} = {}", prop, val_str)
2007 })
2008 .collect();
2009
2010 let mut filter = filters.join(" AND ");
2011 filter.push_str(&format!(
2012 " AND _deleted = false AND _vid != {}",
2013 current_vid.as_u64()
2014 ));
2015
2016 #[cfg(feature = "lance-backend")]
2017 if let Ok(ds) = self.storage.vertex_dataset(label)
2018 && let Ok(lance_ds) = ds.open_raw().await
2019 {
2020 let count = lance_ds.count_rows(Some(filter.clone())).await?;
2021 if count > 0 {
2022 return Err(anyhow!(
2023 "Constraint violation: Duplicate composite key for label '{}' (in storage). Filter: {}",
2024 label,
2025 filter
2026 ));
2027 }
2028 }
2029
2030 Ok(())
2031 }
2032
2033 async fn check_write_pressure(&self) -> Result<()> {
2034 let status = self
2035 .storage
2036 .compaction_status()
2037 .map_err(|e| anyhow::anyhow!("Failed to get compaction status: {}", e))?;
2038 let l1_runs = status.l1_runs;
2039 let throttle = &self.config.throttle;
2040
2041 if l1_runs >= throttle.hard_limit {
2042 log::warn!("Write stalled: L1 runs ({}) at hard limit", l1_runs);
2043 // Simple polling for now
2044 while self
2045 .storage
2046 .compaction_status()
2047 .map_err(|e| anyhow::anyhow!("Failed to get compaction status: {}", e))?
2048 .l1_runs
2049 >= throttle.hard_limit
2050 {
2051 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2052 }
2053 } else if l1_runs >= throttle.soft_limit {
2054 let excess = l1_runs - throttle.soft_limit;
2055 // Cap multiplier to avoid overflow
2056 let excess = std::cmp::min(excess, 31);
2057 let multiplier = 2_u32.pow(excess as u32);
2058 let delay = throttle.base_delay * multiplier;
2059 tokio::time::sleep(delay).await;
2060 }
2061 Ok(())
2062 }
2063
2064 /// Check transaction memory limit to prevent OOM.
2065 /// No-op when no transaction is active.
2066 fn check_transaction_memory(&self, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) -> Result<()> {
2067 if let Some(tx_l0) = tx_l0 {
2068 let size = tx_l0.read().estimated_size;
2069 if size > self.config.max_transaction_memory {
2070 return Err(anyhow!(
2071 "Transaction memory limit exceeded: {} bytes used, limit is {} bytes. \
2072 Roll back or commit the current transaction.",
2073 size,
2074 self.config.max_transaction_memory
2075 ));
2076 }
2077 }
2078 Ok(())
2079 }
2080
2081 async fn get_query_context(
2082 &self,
2083 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2084 ) -> Option<QueryContext> {
2085 Some(QueryContext::new_with_pending(
2086 self.l0_manager.get_current(),
2087 tx_l0.cloned(),
2088 self.l0_manager.get_pending_flush(),
2089 ))
2090 }
2091
2092 /// Layer-1 CRDT variant enforcement, shared by the single-vertex and batch
2093 /// write paths.
2094 ///
2095 /// Rejects a declared CRDT property written as a parsed CRDT value
2096 /// (`Value::Map`) whose variant differs from the schema's declared variant.
2097 /// A mismatch would make the commit-time merge silently overwrite instead of
2098 /// merge, and the OCC CRDT carve-out (`occ::crdt_carveout_overwrite` /
2099 /// `WriteSet::from_l0`) would hide it as a lost update — so it must be caught
2100 /// at write time, on *every* write path. `try_as_crdt` is `Map`-gated, so the
2101 /// JSON-string (Cypher) form and non-CRDT values pass through untouched: they
2102 /// are never carved out and stay conflictable.
2103 fn enforce_crdt_variants(
2104 props_meta: &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
2105 properties: &Properties,
2106 ) -> Result<()> {
2107 for (key, value) in properties {
2108 let Some(meta) = props_meta.get(key) else {
2109 continue;
2110 };
2111 let uni_common::core::schema::DataType::Crdt(expected) = &meta.r#type else {
2112 continue;
2113 };
2114 if let Some(crdt) = crate::runtime::l0::try_as_crdt(value)
2115 && crdt.type_name() != expected.type_name()
2116 {
2117 return Err(anyhow::Error::new(uni_common::UniError::Constraint {
2118 message: format!(
2119 "CRDT property '{key}' must be written as a {} value",
2120 expected.type_name()
2121 ),
2122 }));
2123 }
2124 }
2125 Ok(())
2126 }
2127
2128 /// Prepare a vertex for upsert by merging CRDT properties with existing values.
2129 ///
2130 /// When `label` is provided, uses it directly to look up property metadata.
2131 /// Otherwise falls back to discovering the label from L0 buffers and storage.
2132 ///
2133 /// # Errors
2134 ///
2135 /// Returns an error if CRDT property merging fails.
2136 async fn prepare_vertex_upsert(
2137 &self,
2138 vid: Vid,
2139 properties: &mut Properties,
2140 label: Option<&str>,
2141 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2142 ) -> Result<()> {
2143 let Some(pm) = &self.property_manager else {
2144 return Ok(());
2145 };
2146
2147 let schema = self.schema_manager.schema();
2148
2149 // Resolve label: use provided label or discover from L0/storage
2150 let discovered_labels;
2151 let label_name = if let Some(l) = label {
2152 Some(l)
2153 } else {
2154 discovered_labels = self.get_vertex_labels(vid, tx_l0).await;
2155 discovered_labels
2156 .as_ref()
2157 .and_then(|l| l.first().map(|s| s.as_str()))
2158 };
2159
2160 let Some(label_str) = label_name else {
2161 return Ok(());
2162 };
2163 let Some(props_meta) = schema.properties.get(label_str) else {
2164 return Ok(());
2165 };
2166
2167 // Identify CRDT properties in the insert data
2168 let crdt_keys: Vec<String> = properties
2169 .keys()
2170 .filter(|key| {
2171 props_meta.get(*key).is_some_and(|meta| {
2172 matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2173 })
2174 })
2175 .cloned()
2176 .collect();
2177
2178 if crdt_keys.is_empty() {
2179 return Ok(());
2180 }
2181
2182 // Enforce that each declared CRDT property written as a parsed CRDT value
2183 // (`Value::Map`) carries its declared variant. A mismatched variant makes
2184 // `merge_crdt_properties` overwrite rather than merge at commit, and the
2185 // OCC carve-out (`occ::crdt_carveout_overwrite` / `WriteSet::from_l0`)
2186 // would hide that as a silent lost update — reject it at the source.
2187 //
2188 // Only the `Map` form is checked: it is exactly the form the carve-out
2189 // applies to (`try_as_crdt` is `Map`-gated). A CRDT written as a JSON
2190 // string (the Cypher form) or a non-CRDT value is never carved out — it
2191 // stays conflictable — so it poses no carve-out soundness risk and is left
2192 // to the existing merge/parse path. This is the declared-property half of
2193 // the layered fix; the commit-time check covers undeclared CRDT-shaped values.
2194 Self::enforce_crdt_variants(props_meta, properties)?;
2195
2196 let ctx = self.get_query_context(tx_l0).await;
2197 for key in crdt_keys {
2198 let existing = pm.get_vertex_prop_with_ctx(vid, &key, ctx.as_ref()).await?;
2199 if !existing.is_null()
2200 && let Some(val) = properties.get_mut(&key)
2201 {
2202 *val = pm.merge_crdt_values(&existing, val)?;
2203 }
2204 }
2205
2206 Ok(())
2207 }
2208
2209 async fn prepare_edge_upsert(
2210 &self,
2211 eid: Eid,
2212 properties: &mut Properties,
2213 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2214 ) -> Result<()> {
2215 if let Some(pm) = &self.property_manager {
2216 let schema = self.schema_manager.schema();
2217 // Get edge type from L0 buffer instead of from EID
2218 let type_name = self.get_edge_type_from_l0(eid);
2219
2220 if let Some(ref t_name) = type_name
2221 && let Some(props_meta) = schema.properties.get(t_name)
2222 {
2223 let mut crdt_keys = Vec::new();
2224 for (key, _) in properties.iter() {
2225 if let Some(meta) = props_meta.get(key)
2226 && matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2227 {
2228 crdt_keys.push(key.clone());
2229 }
2230 }
2231
2232 if !crdt_keys.is_empty() {
2233 let ctx = self.get_query_context(tx_l0).await;
2234 for key in crdt_keys {
2235 let existing = pm.get_edge_prop(eid, &key, ctx.as_ref()).await?;
2236
2237 if !existing.is_null()
2238 && let Some(val) = properties.get_mut(&key)
2239 {
2240 *val = pm.merge_crdt_values(&existing, val)?;
2241 }
2242 }
2243 }
2244 }
2245 }
2246 Ok(())
2247 }
2248
2249 #[instrument(skip(self, properties), level = "trace")]
2250 pub async fn insert_vertex(
2251 &self,
2252 vid: Vid,
2253 properties: Properties,
2254 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2255 ) -> Result<()> {
2256 self.insert_vertex_with_labels(vid, properties, &[], tx_l0)
2257 .await?;
2258 Ok(())
2259 }
2260
2261 /// Component C1 (G4): before a non-transactional mutation merges into main
2262 /// L0, if an outstanding snapshot pins the current generation, freeze it
2263 /// aside so snapshots taken *before* this write stay isolated from it.
2264 ///
2265 /// `flush_lock` (acquired and released here) serializes the freeze against
2266 /// concurrent commit-time freezes/merges, matching the atomicity the tx
2267 /// commit path gets. No-op for transactional writes (their freeze happens at
2268 /// commit) and — the common case — when nothing is pinned, where it costs one
2269 /// atomic load. Freezes at most once per pinned generation: the freeze
2270 /// installs a fresh unpinned `current`, so later writes in the same bulk
2271 /// import see no pin and merge in place, and the snapshot keeps reading the
2272 /// frozen pre-import buffer.
2273 async fn freeze_for_non_tx_write_if_pinned(&self, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) {
2274 // Self-gates on the runtime SSI toggle: nothing pins a snapshot unless a
2275 // transaction began under `ssi_enabled`, so `is_current_pinned()` is
2276 // always false (one atomic load) when SSI is off.
2277 if tx_l0.is_none() && self.l0_manager.is_current_pinned() {
2278 let _flush_lock_guard = self.flush_lock.lock().await;
2279 // Re-check under the lock: a concurrent commit may have frozen first.
2280 if self.l0_manager.is_current_pinned() {
2281 self.l0_manager.freeze_current_for_snapshot();
2282 metrics::counter!("uni_l0_snapshot_freezes_total").increment(1);
2283 }
2284 }
2285 }
2286
2287 #[instrument(skip(self, properties, labels), level = "trace")]
2288 pub async fn insert_vertex_with_labels(
2289 &self,
2290 vid: Vid,
2291 mut properties: Properties,
2292 labels: &[String],
2293 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2294 ) -> Result<Properties> {
2295 let start = std::time::Instant::now();
2296 self.check_write_pressure().await?;
2297 self.check_transaction_memory(tx_l0)?;
2298
2299 // Component C1 (G4): a non-transactional write (`tx_l0 == None`, e.g. bulk
2300 // import / LOAD CSV) mutates main L0 directly, outside the commit-time
2301 // snapshot freeze. Freeze the pinned generation aside first so snapshots
2302 // taken before this write stay isolated from it.
2303 self.freeze_for_non_tx_write_if_pinned(tx_l0).await;
2304
2305 if !self.try_defer_embedding(labels, &properties, vid, tx_l0) {
2306 self.process_embeddings_for_labels(labels, &mut properties)
2307 .await?;
2308 }
2309 self.validate_vertex_constraints(vid, &properties, labels, tx_l0)
2310 .await?;
2311 self.prepare_vertex_upsert(
2312 vid,
2313 &mut properties,
2314 labels.first().map(|s| s.as_str()),
2315 tx_l0,
2316 )
2317 .await?;
2318
2319 // Clone properties and labels before moving into L0 to return them and populate constraint index
2320 let properties_copy = properties.clone();
2321 let labels_copy = labels.to_vec();
2322
2323 {
2324 // For a non-tx write, re-resolve the live `current` buffer and hold
2325 // `flush_lock` across the (synchronous) write so a concurrent flush
2326 // rotate (which takes `flush_lock` via `begin_flush`) cannot install
2327 // a fresh `current` between resolve and write, dropping our write
2328 // (Bug #4). For a tx write `resolve_l0` returns the tx-private
2329 // buffer, never the rotating `current`, so no `flush_lock` is needed
2330 // (and taking it would risk re-entrancy with the commit path).
2331 let _flush_lock_guard = if tx_l0.is_none() {
2332 Some(self.flush_lock.lock().await)
2333 } else {
2334 None
2335 };
2336 let l0 = self.resolve_l0(tx_l0);
2337 let mut l0_guard = l0.write();
2338 // Generation chaining: a non-tx CRDT write into the (post-freeze,
2339 // possibly empty) current buffer must merge against the chained
2340 // committed state, not shadow it. No-op when the chain is empty
2341 // or this is a tx-private write.
2342 if tx_l0.is_none() {
2343 let pending = self.l0_manager.get_pending_flush();
2344 if !pending.is_empty() {
2345 Self::seed_crdt_props(&pending, vid, &properties, &mut l0_guard);
2346 }
2347 }
2348 l0_guard.insert_vertex_with_labels(vid, properties, labels);
2349
2350 // Populate constraint index for O(1) duplicate detection
2351 let schema = self.schema_manager.schema();
2352 for label in &labels_copy {
2353 if schema.get_label_case_insensitive(label).is_none() {
2354 if self.config.strict_schema {
2355 return Err(anyhow::anyhow!(
2356 "Label '{}' is not defined in the schema \
2357 (strict_schema is enabled).",
2358 label
2359 ));
2360 }
2361 continue; // Schemaless: skip unknown labels.
2362 }
2363
2364 // For each unique constraint on this label, insert into constraint index
2365 for constraint in &schema.constraints {
2366 if !constraint.enabled {
2367 continue;
2368 }
2369 if let ConstraintTarget::Label(l) = &constraint.target {
2370 if l != label {
2371 continue;
2372 }
2373 } else {
2374 continue;
2375 }
2376
2377 if let ConstraintType::Unique {
2378 properties: unique_props,
2379 } = &constraint.constraint_type
2380 {
2381 let mut key_values = Vec::new();
2382 let mut all_present = true;
2383 for prop in unique_props {
2384 if let Some(val) = properties_copy.get(prop) {
2385 key_values.push((prop.clone(), val.clone()));
2386 } else {
2387 all_present = false;
2388 break;
2389 }
2390 }
2391
2392 if all_present {
2393 let key = serialize_constraint_key(label, &key_values);
2394 l0_guard.insert_constraint_key(key, vid);
2395 }
2396 }
2397 }
2398 }
2399 }
2400
2401 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2402 self.update_metrics();
2403
2404 if tx_l0.is_none() {
2405 self.check_flush().await?;
2406 }
2407 if start.elapsed().as_millis() > 100 {
2408 log::warn!("Slow insert_vertex: {}ms", start.elapsed().as_millis());
2409 }
2410 Ok(properties_copy)
2411 }
2412
2413 /// True iff routing this partial write through MergeInsert would
2414 /// miss a constraint check. Specifically: a multi-key UNIQUE
2415 /// constraint where the touched-set doesn't cover all member keys
2416 /// requires the unchanged keys from the existing row to compute
2417 /// the composite. Conservative: also returns true if any touched
2418 /// key is `ext_id` (uniqueness checked globally — handled in the
2419 /// full-row path).
2420 fn touched_needs_full_read(&self, touched: &Properties, labels: &[String]) -> bool {
2421 if touched.contains_key("ext_id") {
2422 return true;
2423 }
2424 let schema = self.schema_manager.schema();
2425 for label in labels {
2426 if schema.get_label_case_insensitive(label).is_none() {
2427 continue;
2428 }
2429 for constraint in &schema.constraints {
2430 if !constraint.enabled {
2431 continue;
2432 }
2433 if let ConstraintTarget::Label(l) = &constraint.target {
2434 if !l.eq_ignore_ascii_case(label) {
2435 continue;
2436 }
2437 } else {
2438 continue;
2439 }
2440 if let ConstraintType::Unique {
2441 properties: unique_props,
2442 } = &constraint.constraint_type
2443 {
2444 if unique_props.len() < 2 {
2445 continue; // single-key UNIQUE — partial path sees the key
2446 }
2447 if unique_props.iter().any(|p| touched.contains_key(p)) {
2448 return true;
2449 }
2450 }
2451 }
2452 }
2453 false
2454 }
2455
2456 /// Insert a vertex's FULL property row plus a touched-keys hint so
2457 /// the flush emits ONLY those columns via Lance MergeInsert.
2458 ///
2459 /// Caller must have read the full row (via PropertyManager) and
2460 /// applied SET-touched values on top before calling — same input
2461 /// shape as `insert_vertex_with_labels`. The new arg `touched_keys`
2462 /// is the set of property keys this SET statement actually
2463 /// assigned; L0 records it in `vertex_partial_keys[vid]` and the
2464 /// flush filters the MergeInsert source schema down to those keys.
2465 /// When `UniConfig::partial_lance_writes == false`, falls through
2466 /// to `insert_vertex_with_labels` (Append) — preserving bit-for-bit
2467 /// equivalence with prior releases.
2468 #[instrument(skip(self, props, touched_keys, labels), level = "trace")]
2469 pub async fn insert_vertex_partial_full(
2470 &self,
2471 vid: Vid,
2472 mut props: Properties,
2473 touched_keys: HashSet<String>,
2474 labels: &[String],
2475 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2476 ) -> Result<()> {
2477 if !self.config.partial_lance_writes
2478 || self.touched_needs_full_read(&props_subset(&props, &touched_keys), labels)
2479 {
2480 self.insert_vertex_with_labels(vid, props, labels, tx_l0)
2481 .await?;
2482 return Ok(());
2483 }
2484
2485 self.check_write_pressure().await?;
2486 self.check_transaction_memory(tx_l0)?;
2487 if !self.try_defer_embedding(labels, &props, vid, tx_l0) {
2488 self.process_embeddings_for_labels(labels, &mut props)
2489 .await?;
2490 }
2491 // Full-row validation runs because we have the complete map;
2492 // no need for the partial-only validator.
2493 self.validate_vertex_constraints(vid, &props, labels, tx_l0)
2494 .await?;
2495 {
2496 let l0 = self.resolve_l0(tx_l0);
2497 let mut l0_guard = l0.write();
2498 l0_guard.insert_vertex_partial_full(vid, props, touched_keys, labels);
2499 }
2500 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2501 metrics::counter!("uni_partial_writes_total").increment(1);
2502 self.update_metrics();
2503 if tx_l0.is_none() {
2504 self.check_flush().await?;
2505 }
2506 Ok(())
2507 }
2508
2509 /// Insert a vertex's *partial* property set without first reading the
2510 /// full row.
2511 ///
2512 /// When `WriterConfig::partial_lance_writes` is `true`, the touched
2513 /// keys flow into `L0Buffer::vertex_partial_keys` so the next flush
2514 /// emits them via Lance `MergeInsertBuilder` against a subset-of-
2515 /// schema source — preserving untouched columns (e.g., embeddings)
2516 /// byte-equal in Lance with no read at the caller and no write of
2517 /// those columns.
2518 ///
2519 /// When the flag is `false`, this falls back to the existing
2520 /// `insert_vertex_with_labels` path after merging `touched` with
2521 /// the current properties from L0/storage. The caller can therefore
2522 /// use this entry point unconditionally; the optimization activates
2523 /// only when the flag is on.
2524 #[instrument(skip(self, touched, labels), level = "trace")]
2525 pub async fn insert_vertex_partial(
2526 &self,
2527 vid: Vid,
2528 touched: Properties,
2529 labels: &[String],
2530 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2531 ) -> Result<()> {
2532 let needs_full_read =
2533 !self.config.partial_lance_writes || self.touched_needs_full_read(&touched, labels);
2534 if needs_full_read {
2535 // Flag-off fallback (or constraint-driven fallback): merge
2536 // `touched` with the current full property snapshot from
2537 // L0/storage and route through the existing path. Preserves
2538 // bit-for-bit equivalence with the pre-Round-11 release.
2539 let existing = if let Some(pm) = &self.property_manager {
2540 pm.get_all_vertex_props_with_ctx(vid, None)
2541 .await
2542 .unwrap_or_default()
2543 .unwrap_or_default()
2544 } else {
2545 Properties::new()
2546 };
2547 let mut merged = existing;
2548 for (k, v) in touched {
2549 merged.insert(k, v);
2550 }
2551 self.insert_vertex_with_labels(vid, merged, labels, tx_l0)
2552 .await?;
2553 return Ok(());
2554 }
2555
2556 // Flag-on fast path: stage the partial update directly. Pressure
2557 // checks, embedding generation, constraint validation all still
2558 // run — but the validator is the partial-aware variant that
2559 // skips NOT NULL / multi-key UNIQUE / CHECK / EXISTS for
2560 // properties not present in `touched`. Multi-key UNIQUE that
2561 // overlaps the touched set forces a fallback above via
2562 // `touched_needs_full_read`.
2563 let mut touched = touched;
2564 self.check_write_pressure().await?;
2565 self.check_transaction_memory(tx_l0)?;
2566 if !self.try_defer_embedding(labels, &touched, vid, tx_l0) {
2567 self.process_embeddings_for_labels(labels, &mut touched)
2568 .await?;
2569 }
2570 self.validate_vertex_constraints_partial(vid, &touched, labels, tx_l0)
2571 .await?;
2572
2573 {
2574 let l0 = self.resolve_l0(tx_l0);
2575 let mut l0_guard = l0.write();
2576 l0_guard.insert_vertex_partial(vid, touched, labels);
2577 }
2578
2579 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2580 metrics::counter!("uni_partial_writes_total").increment(1);
2581 self.update_metrics();
2582 if tx_l0.is_none() {
2583 self.check_flush().await?;
2584 }
2585 Ok(())
2586 }
2587
2588 /// Insert multiple vertices with batched operations.
2589 ///
2590 /// This method uses batched operations to achieve O(N) complexity instead of O(N²)
2591 /// for bulk inserts with unique constraints.
2592 ///
2593 /// # Performance Improvements
2594 /// - Batch VID allocation: 1 call instead of N calls
2595 /// - Batch constraint validation: O(N) instead of O(N²)
2596 /// - Batch embedding generation: 1 API call per config instead of N calls
2597 /// - Transaction wrapping: Automatic flush deferral, atomicity
2598 ///
2599 /// # Arguments
2600 /// * `vids` - Pre-allocated VIDs for the vertices
2601 /// * `properties_batch` - Properties for each vertex
2602 /// * `labels` - Labels for all vertices (assumes single label for simplicity)
2603 ///
2604 /// # Errors
2605 /// Returns error if:
2606 /// - VID/properties length mismatch
2607 /// - Constraint violation detected
2608 /// - Embedding generation fails
2609 /// - Transaction commit fails
2610 ///
2611 /// # Atomicity
2612 /// If this method fails, all changes are rolled back (if transaction was started here).
2613 pub async fn insert_vertices_batch(
2614 &self,
2615 vids: Vec<Vid>,
2616 mut properties_batch: Vec<Properties>,
2617 labels: Vec<String>,
2618 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2619 ) -> Result<Vec<Properties>> {
2620 let start = std::time::Instant::now();
2621
2622 // Validate inputs
2623 if vids.len() != properties_batch.len() {
2624 return Err(anyhow!(
2625 "VID/properties size mismatch: {} vids, {} properties",
2626 vids.len(),
2627 properties_batch.len()
2628 ));
2629 }
2630
2631 if vids.is_empty() {
2632 return Ok(Vec::new());
2633 }
2634
2635 // Batch operations — writes go directly to the resolved L0.
2636 // Atomicity is guaranteed by the caller holding the writer lock.
2637 let result = async {
2638 self.check_write_pressure().await?;
2639 self.check_transaction_memory(tx_l0)?;
2640
2641 // Component C1 (G4): batch bulk-import is the canonical non-tx write —
2642 // freeze the pinned generation aside before merging so snapshot
2643 // readers stay isolated. No-op when unpinned or transactional.
2644 self.freeze_for_non_tx_write_if_pinned(tx_l0).await;
2645
2646 // Batch embedding generation (1 API call per config)
2647 self.process_embeddings_for_batch(&labels, &mut properties_batch)
2648 .await?;
2649
2650 // Batch constraint validation (O(N) instead of O(N²))
2651 let label = labels
2652 .first()
2653 .ok_or_else(|| anyhow!("No labels provided"))?;
2654 self.validate_vertex_batch_constraints(&vids, &properties_batch, label, tx_l0)
2655 .await?;
2656
2657 // Batch prepare (CRDT merging if needed)
2658 // Check schema once: skip entirely if no CRDT properties for this label.
2659 // For new vertices (freshly allocated VIDs), there are no existing CRDT
2660 // values to merge, so the per-vertex lookup is unnecessary in that case.
2661 let has_crdt_fields = {
2662 let schema = self.schema_manager.schema();
2663 schema
2664 .properties
2665 .get(label.as_str())
2666 .is_some_and(|props_meta| {
2667 props_meta.values().any(|meta| {
2668 matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2669 })
2670 })
2671 };
2672
2673 if has_crdt_fields {
2674 // Layer-1 variant enforcement (G3): the batch path must reject a
2675 // declared-CRDT variant mismatch exactly as the single-vertex
2676 // `prepare_vertex_upsert` does. Without this, a wrong-variant CRDT
2677 // written via batch import slips past write-time validation and
2678 // the OCC carve-out then masks the overwrite as a lost update.
2679 {
2680 let schema = self.schema_manager.schema();
2681 if let Some(props_meta) = schema.properties.get(label.as_str()) {
2682 for props in &properties_batch {
2683 Self::enforce_crdt_variants(props_meta, props)?;
2684 }
2685 }
2686 }
2687
2688 // Batch fetch existing CRDT values: collect VIDs that need merging,
2689 // then query once via PropertyManager instead of per-vertex lookups.
2690 let schema = self.schema_manager.schema();
2691 let crdt_keys: Vec<String> = schema
2692 .properties
2693 .get(label.as_str())
2694 .map(|props_meta| {
2695 props_meta
2696 .iter()
2697 .filter(|(_, meta)| {
2698 matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2699 })
2700 .map(|(key, _)| key.clone())
2701 .collect()
2702 })
2703 .unwrap_or_default();
2704
2705 if let Some(pm) = &self.property_manager {
2706 let ctx = self.get_query_context(tx_l0).await;
2707 for (vid, props) in vids.iter().zip(&mut properties_batch) {
2708 for key in &crdt_keys {
2709 if props.contains_key(key) {
2710 let existing =
2711 pm.get_vertex_prop_with_ctx(*vid, key, ctx.as_ref()).await?;
2712 if !existing.is_null()
2713 && let Some(val) = props.get_mut(key)
2714 {
2715 *val = pm.merge_crdt_values(&existing, val)?;
2716 }
2717 }
2718 }
2719 }
2720 }
2721 }
2722
2723 // Batch L0 writes — route to active L0 (transaction L0 if active, else current).
2724 let target_l0 = self.resolve_l0(tx_l0);
2725
2726 let properties_result = properties_batch.clone();
2727 {
2728 let mut l0_guard = target_l0.write();
2729 for (vid, props) in vids.iter().zip(properties_batch.iter()) {
2730 l0_guard.insert_vertex_with_labels(*vid, props.clone(), &labels);
2731 }
2732 }
2733
2734 // Update metrics (batch increment)
2735 metrics::counter!("uni_l0_buffer_mutations_total").increment(vids.len() as u64);
2736 self.update_metrics();
2737
2738 Ok::<Vec<Properties>, anyhow::Error>(properties_result)
2739 }
2740 .await;
2741
2742 let props = result?;
2743
2744 if start.elapsed().as_millis() > 100 {
2745 log::warn!(
2746 "Slow insert_vertices_batch ({} vertices): {}ms",
2747 vids.len(),
2748 start.elapsed().as_millis()
2749 );
2750 }
2751
2752 Ok(props)
2753 }
2754
2755 /// Delete a vertex by VID.
2756 ///
2757 /// When `labels` is provided, uses them directly to populate L0 for
2758 /// correct tombstone flushing. Otherwise discovers labels from L0
2759 /// buffers and storage (which can be slow for many vertices).
2760 ///
2761 /// # Errors
2762 ///
2763 /// Returns an error if write pressure stalls, label lookup fails, or
2764 /// the L0 delete operation fails.
2765 #[instrument(skip(self, labels), level = "trace")]
2766 pub async fn delete_vertex(
2767 &self,
2768 vid: Vid,
2769 labels: Option<Vec<String>>,
2770 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2771 ) -> Result<()> {
2772 let start = std::time::Instant::now();
2773 self.check_write_pressure().await?;
2774 self.check_transaction_memory(tx_l0)?;
2775 self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
2776
2777 // Before deleting, ensure we have the vertex's labels stored in L0 so
2778 // the tombstone can be flushed to the correct label datasets. Discover
2779 // them up front (this may await storage) WITHOUT pinning the buffer we
2780 // will eventually mutate — for non-tx writes the live `current` buffer
2781 // is re-resolved below under `flush_lock`, so a concurrent rotate can't
2782 // drop our write (Bug #4). `resolve_l0` here is only used for cheap
2783 // reads that tolerate a racing rotate.
2784 let has_labels = {
2785 let l0_guard = self.resolve_l0(tx_l0);
2786 let guard = l0_guard.read();
2787 guard.vertex_labels.contains_key(&vid)
2788 };
2789
2790 let backfill_labels = if has_labels {
2791 None
2792 } else if let Some(provided) = labels {
2793 // Caller provided labels — skip the lookup entirely
2794 Some(provided)
2795 } else {
2796 // Discover labels from pending flush L0s, then storage
2797 let mut found = None;
2798 for pending_l0 in self.l0_manager.get_pending_flush() {
2799 let pending_guard = pending_l0.read();
2800 if let Some(l) = pending_guard.get_vertex_labels(vid) {
2801 found = Some(l.to_vec());
2802 break;
2803 }
2804 }
2805 if found.is_none() {
2806 found = self.find_vertex_labels_in_storage(vid).await?;
2807 }
2808 found
2809 };
2810
2811 // Test-only seam (no-op without the `failpoints` feature): pause a
2812 // non-transactional delete AFTER the awaited label discovery but BEFORE
2813 // it re-resolves the live buffer and writes the tombstone. A concurrent
2814 // flush can rotate+complete a buffer in this window; the fix re-resolves
2815 // `get_current()` and mutates it under `flush_lock`, so the tombstone
2816 // always lands in the live buffer (Bug #4 — silent lost delete across
2817 // L0 rotation).
2818 fail::fail_point!("nontx::after-capture");
2819
2820 // Apply the label backfill and the tombstone together. For a non-tx
2821 // write, hold `flush_lock` across the (synchronous) re-resolve + write
2822 // so a concurrent flush rotate (which takes `flush_lock` via
2823 // `begin_flush`) cannot install a fresh `current` between our resolve
2824 // and our write. For a tx write `resolve_l0` returns the tx-private
2825 // buffer (never the rotating `current`), so no `flush_lock` is needed
2826 // — and taking it there would risk re-entrancy with the commit path.
2827 if tx_l0.is_none() {
2828 let _flush_lock_guard = self.flush_lock.lock().await;
2829 let l0 = self.l0_manager.get_current();
2830 let mut guard = l0.write();
2831 if let Some(found_labels) = backfill_labels {
2832 guard.vertex_labels.insert(vid, found_labels);
2833 }
2834 guard.delete_vertex(vid)?;
2835 } else {
2836 let l0 = self.resolve_l0(tx_l0);
2837 let mut guard = l0.write();
2838 if let Some(found_labels) = backfill_labels {
2839 guard.vertex_labels.insert(vid, found_labels);
2840 }
2841 guard.delete_vertex(vid)?;
2842 }
2843 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2844 self.update_metrics();
2845
2846 if tx_l0.is_none() {
2847 self.check_flush().await?;
2848 }
2849 if start.elapsed().as_millis() > 100 {
2850 log::warn!("Slow delete_vertex: {}ms", start.elapsed().as_millis());
2851 }
2852 Ok(())
2853 }
2854
2855 /// Find vertex labels from storage by querying the main vertices table.
2856 /// Returns the labels from the latest non-deleted version of the vertex.
2857 async fn find_vertex_labels_in_storage(&self, vid: Vid) -> Result<Option<Vec<String>>> {
2858 use crate::backend::types::ScanRequest;
2859 use arrow_array::Array;
2860 use arrow_array::cast::AsArray;
2861
2862 let backend = self.storage.backend();
2863 let table_name = MainVertexDataset::table_name();
2864
2865 // Check if table exists first; if not, vertex hasn't been flushed to storage yet
2866 if !backend.table_exists(table_name).await? {
2867 return Ok(None);
2868 }
2869
2870 // Query for this specific vid (don't filter by _deleted yet - we need to find the latest version first)
2871 let filter = format!("_vid = {}", vid.as_u64());
2872 let batches = backend
2873 .scan(
2874 ScanRequest::all(table_name)
2875 .with_filter(filter)
2876 .with_columns(vec![
2877 "_vid".to_string(),
2878 "labels".to_string(),
2879 "_version".to_string(),
2880 "_deleted".to_string(),
2881 ]),
2882 )
2883 .await
2884 .unwrap_or_default();
2885
2886 // Find the row with the highest version number
2887 let mut max_version: Option<u64> = None;
2888 let mut labels: Option<Vec<String>> = None;
2889 let mut is_deleted = false;
2890
2891 for batch in batches {
2892 if batch.num_rows() == 0 {
2893 continue;
2894 }
2895
2896 let version_array = batch
2897 .column_by_name("_version")
2898 .unwrap()
2899 .as_primitive::<arrow_array::types::UInt64Type>();
2900
2901 let deleted_array = batch.column_by_name("_deleted").unwrap().as_boolean();
2902
2903 let labels_array = batch.column_by_name("labels").unwrap().as_list::<i32>();
2904
2905 for row_idx in 0..batch.num_rows() {
2906 let version = version_array.value(row_idx);
2907
2908 if max_version.is_none_or(|mv| version > mv) {
2909 is_deleted = deleted_array.value(row_idx);
2910
2911 let labels_list = labels_array.value(row_idx);
2912 let string_array = labels_list.as_string::<i32>();
2913 let vertex_labels: Vec<String> = (0..string_array.len())
2914 .filter(|&i| !string_array.is_null(i))
2915 .map(|i| string_array.value(i).to_string())
2916 .collect();
2917
2918 max_version = Some(version);
2919 labels = Some(vertex_labels);
2920 }
2921 }
2922 }
2923
2924 // If the latest version is deleted, return None
2925 if is_deleted { Ok(None) } else { Ok(labels) }
2926 }
2927
2928 #[expect(clippy::too_many_arguments)]
2929 #[instrument(skip(self, props, touched_keys), level = "trace")]
2930 pub async fn insert_edge_partial_full(
2931 &self,
2932 src_vid: Vid,
2933 dst_vid: Vid,
2934 edge_type: u32,
2935 eid: Eid,
2936 props: Properties,
2937 edge_type_name: Option<String>,
2938 touched_keys: HashSet<String>,
2939 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2940 ) -> Result<()> {
2941 self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
2942 if !self.config.partial_lance_writes {
2943 return self
2944 .insert_edge(
2945 src_vid,
2946 dst_vid,
2947 edge_type,
2948 eid,
2949 props,
2950 edge_type_name,
2951 tx_l0,
2952 )
2953 .await;
2954 }
2955
2956 let start = std::time::Instant::now();
2957 self.check_write_pressure().await?;
2958 self.check_transaction_memory(tx_l0)?;
2959 let mut props = props;
2960 self.prepare_edge_upsert(eid, &mut props, tx_l0).await?;
2961
2962 let l0 = self.resolve_l0(tx_l0);
2963 l0.write().insert_edge_partial_full(
2964 src_vid,
2965 dst_vid,
2966 edge_type,
2967 eid,
2968 props,
2969 edge_type_name,
2970 touched_keys,
2971 )?;
2972
2973 if tx_l0.is_none() {
2974 let version = l0.read().current_version;
2975 self.adjacency_manager
2976 .insert_edge(src_vid, dst_vid, eid, edge_type, version);
2977 }
2978
2979 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2980 metrics::counter!("uni_partial_writes_total").increment(1);
2981 self.update_metrics();
2982 if tx_l0.is_none() {
2983 self.check_flush().await?;
2984 }
2985 if start.elapsed().as_millis() > 100 {
2986 log::warn!(
2987 "Slow insert_edge_partial_full: {}ms",
2988 start.elapsed().as_millis()
2989 );
2990 }
2991 Ok(())
2992 }
2993
2994 #[expect(clippy::too_many_arguments)]
2995 pub async fn insert_edge(
2996 &self,
2997 src_vid: Vid,
2998 dst_vid: Vid,
2999 edge_type: u32,
3000 eid: Eid,
3001 mut properties: Properties,
3002 edge_type_name: Option<String>,
3003 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
3004 ) -> Result<()> {
3005 let start = std::time::Instant::now();
3006 self.check_write_pressure().await?;
3007 self.check_transaction_memory(tx_l0)?;
3008 self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
3009 self.prepare_edge_upsert(eid, &mut properties, tx_l0)
3010 .await?;
3011
3012 let l0 = self.resolve_l0(tx_l0);
3013 l0.write()
3014 .insert_edge(src_vid, dst_vid, edge_type, eid, properties, edge_type_name)?;
3015
3016 // Dual-write to AdjacencyManager overlay (survives flush).
3017 // Skip for transaction-local L0 -- transaction edges are overlaid separately.
3018 if tx_l0.is_none() {
3019 let version = l0.read().current_version;
3020 self.adjacency_manager
3021 .insert_edge(src_vid, dst_vid, eid, edge_type, version);
3022 }
3023
3024 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
3025 self.update_metrics();
3026
3027 if tx_l0.is_none() {
3028 self.check_flush().await?;
3029 }
3030 if start.elapsed().as_millis() > 100 {
3031 log::warn!("Slow insert_edge: {}ms", start.elapsed().as_millis());
3032 }
3033 Ok(())
3034 }
3035
3036 #[instrument(skip(self), level = "trace")]
3037 pub async fn delete_edge(
3038 &self,
3039 eid: Eid,
3040 src_vid: Vid,
3041 dst_vid: Vid,
3042 edge_type: u32,
3043 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
3044 ) -> Result<()> {
3045 let start = std::time::Instant::now();
3046 self.check_write_pressure().await?;
3047 self.check_transaction_memory(tx_l0)?;
3048 self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
3049 let l0 = self.resolve_l0(tx_l0);
3050
3051 l0.write().delete_edge(eid, src_vid, dst_vid, edge_type)?;
3052
3053 // Dual-write tombstone to AdjacencyManager overlay.
3054 if tx_l0.is_none() {
3055 let version = l0.read().current_version;
3056 self.adjacency_manager
3057 .add_tombstone(eid, src_vid, dst_vid, edge_type, version);
3058 }
3059 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
3060 self.update_metrics();
3061
3062 if tx_l0.is_none() {
3063 self.check_flush().await?;
3064 }
3065 if start.elapsed().as_millis() > 100 {
3066 log::warn!("Slow delete_edge: {}ms", start.elapsed().as_millis());
3067 }
3068 Ok(())
3069 }
3070
3071 /// Decide whether a flush should be triggered based on mutation count
3072 /// or elapsed time since the last flush.
3073 ///
3074 /// Extracted from [`Writer::check_flush`] so `commit_transaction_l0` can
3075 /// reuse the decision while bypassing the lock-acquiring entry point
3076 /// (it already holds `flush_lock`).
3077 fn should_flush(&self) -> bool {
3078 let count = self.l0_manager.get_current().read().mutation_count;
3079 if count == 0 {
3080 return false;
3081 }
3082 if count >= self.config.auto_flush_threshold {
3083 return true;
3084 }
3085 if let Some(interval) = self.config.auto_flush_interval
3086 && self.last_flush_time.lock().elapsed() >= interval
3087 && count >= self.config.auto_flush_min_mutations
3088 {
3089 return true;
3090 }
3091 false
3092 }
3093
3094 /// Check if flush should be triggered based on mutation count or time elapsed.
3095 /// This method is called after each write operation and can also be called
3096 /// by a background task for time-based flushing.
3097 pub async fn check_flush(&self) -> Result<()> {
3098 if self.should_flush() {
3099 self.flush_to_l1(None).await?;
3100 }
3101 Ok(())
3102 }
3103
3104 /// Process embeddings for a vertex using labels passed directly.
3105 /// Use this when labels haven't been stored to L0 yet.
3106 async fn process_embeddings_for_labels(
3107 &self,
3108 labels: &[String],
3109 properties: &mut Properties,
3110 ) -> Result<()> {
3111 let label_name = labels.first().map(|s| s.as_str());
3112 self.process_embeddings_impl(label_name, properties).await
3113 }
3114
3115 /// Phase B: if `defer_embeddings` is enabled in `UniConfig` and the
3116 /// vertex has an embedding config that hasn't been satisfied by the
3117 /// caller-provided properties, enqueue the VID in
3118 /// `L0Buffer::pending_embeddings` and return `true`. The caller then
3119 /// skips `process_embeddings_for_labels` and the embedding is computed
3120 /// in a single batched call at flush time via
3121 /// `drain_pending_embeddings`.
3122 ///
3123 /// Returns `false` (caller falls back to today's per-row eager embed)
3124 /// if any of:
3125 /// - the flag is off,
3126 /// - no label has an embedding config,
3127 /// - the user already provided the target property (matches the
3128 /// existing skip-if-present semantics at writer.rs:2727).
3129 ///
3130 /// Trade-off: when deferral is active, in-tx reads of the embedding
3131 /// column return only what was already in storage (or nothing for
3132 /// brand-new vertices). Existing tests that RETURN n.embedding in
3133 /// the same tx as a SET on the source column must run with the flag
3134 /// off; opt in only when no such reads happen between write and
3135 /// commit.
3136 fn try_defer_embedding(
3137 &self,
3138 labels: &[String],
3139 properties: &Properties,
3140 vid: Vid,
3141 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
3142 ) -> bool {
3143 if !self.config.defer_embeddings {
3144 return false;
3145 }
3146 let Some(label) = labels.first() else {
3147 return false;
3148 };
3149
3150 let schema = self.schema_manager.schema();
3151 let mut has_unsatisfied_cfg = false;
3152 for idx in &schema.indexes {
3153 if let IndexDefinition::Vector(v_cfg) = idx
3154 && v_cfg.label == *label
3155 && v_cfg.embedding_config.is_some()
3156 && !properties.contains_key(&v_cfg.property)
3157 {
3158 has_unsatisfied_cfg = true;
3159 break;
3160 }
3161 }
3162 if !has_unsatisfied_cfg {
3163 return false;
3164 }
3165
3166 let l0 = self.resolve_l0(tx_l0);
3167 let mut guard = l0.write();
3168 guard.pending_embeddings.insert(vid, label.clone());
3169 true
3170 }
3171
3172 /// Drain `pending_embeddings` from the rotated old-L0 right before
3173 /// `flush_stream_l1` reads it. Groups by label, issues one batched
3174 /// `process_embeddings_for_batch` call per label, and writes the
3175 /// resulting embedding vectors into each VID's `vertex_properties`
3176 /// map. After this returns, the flush proceeds against an L0 that
3177 /// looks no different from one whose embeddings were generated
3178 /// per-row at insert.
3179 ///
3180 /// Idempotent: a VID whose embedding was already materialized
3181 /// (e.g., by on-demand read paths in a future Phase B revision) is
3182 /// detected via `properties.contains_key(target_prop)` inside
3183 /// `process_embeddings_for_batch` (writer.rs:~2650), so re-running
3184 /// the drain is safe.
3185 async fn drain_pending_embeddings(&self, old_l0_arc: &Arc<RwLock<L0Buffer>>) -> Result<()> {
3186 let by_label: HashMap<String, Vec<Vid>> = {
3187 let guard = old_l0_arc.read();
3188 if guard.pending_embeddings.is_empty() {
3189 return Ok(());
3190 }
3191 let mut m: HashMap<String, Vec<Vid>> = HashMap::new();
3192 for (vid, label) in &guard.pending_embeddings {
3193 m.entry(label.clone()).or_default().push(*vid);
3194 }
3195 m
3196 };
3197
3198 for (label, vids) in by_label {
3199 let mut properties_batch: Vec<Properties> = {
3200 let guard = old_l0_arc.read();
3201 vids.iter()
3202 .map(|vid| {
3203 guard
3204 .vertex_properties
3205 .get(vid)
3206 .cloned()
3207 .unwrap_or_default()
3208 })
3209 .collect()
3210 };
3211
3212 self.process_embeddings_for_batch(std::slice::from_ref(&label), &mut properties_batch)
3213 .await?;
3214
3215 let mut guard = old_l0_arc.write();
3216 for (vid, props) in vids.iter().zip(properties_batch) {
3217 let target = guard.vertex_properties.entry(*vid).or_default();
3218 for (k, v) in props {
3219 target.insert(k, v);
3220 }
3221 guard.pending_embeddings.remove(vid);
3222 }
3223 }
3224 Ok(())
3225 }
3226
3227 /// Process embeddings for a batch of vertices efficiently.
3228 ///
3229 /// Groups vertices by embedding config and makes batched API calls to the
3230 /// embedding service instead of calling once per vertex.
3231 ///
3232 /// # Performance
3233 /// For N vertices with embedding config:
3234 /// - Old approach: N API calls to embedding service
3235 /// - New approach: 1 API call per embedding config (usually 1 total)
3236 async fn process_embeddings_for_batch(
3237 &self,
3238 labels: &[String],
3239 properties_batch: &mut [Properties],
3240 ) -> Result<()> {
3241 let label_name = labels.first().map(|s| s.as_str());
3242 let schema = self.schema_manager.schema();
3243
3244 if let Some(label) = label_name {
3245 // Find vector indexes with embedding config for this label
3246 let mut configs = Vec::new();
3247 for idx in &schema.indexes {
3248 if let IndexDefinition::Vector(v_config) = idx
3249 && v_config.label == label
3250 && let Some(emb_config) = &v_config.embedding_config
3251 {
3252 configs.push((v_config.property.clone(), emb_config.clone()));
3253 }
3254 }
3255
3256 if configs.is_empty() {
3257 return Ok(());
3258 }
3259
3260 for (target_prop, emb_config) in configs {
3261 // Collect input texts from all vertices that need embeddings
3262 let mut input_texts: Vec<String> = Vec::new();
3263 let mut needs_embedding: Vec<usize> = Vec::new();
3264
3265 for (idx, properties) in properties_batch.iter().enumerate() {
3266 // Skip if target property already exists
3267 if properties.contains_key(&target_prop) {
3268 continue;
3269 }
3270
3271 // Check if source properties exist
3272 let mut inputs = Vec::new();
3273 for src_prop in &emb_config.source_properties {
3274 if let Some(val) = properties.get(src_prop)
3275 && let Some(s) = val.as_str()
3276 {
3277 inputs.push(s.to_string());
3278 }
3279 }
3280
3281 if !inputs.is_empty() {
3282 let input_text = inputs.join(" ");
3283 let input_text = match &emb_config.document_prefix {
3284 Some(prefix) => format!("{prefix}{input_text}"),
3285 None => input_text,
3286 };
3287 input_texts.push(input_text);
3288 needs_embedding.push(idx);
3289 }
3290 }
3291
3292 if input_texts.is_empty() {
3293 continue;
3294 }
3295
3296 let runtime = self.xervo_runtime.get().ok_or_else(|| {
3297 anyhow!("Uni-Xervo runtime not configured for auto-embedding")
3298 })?;
3299 let embedder = runtime.embedding(&emb_config.alias).await?;
3300
3301 // Batch generate embeddings (single API call)
3302 let input_refs: Vec<&str> = input_texts.iter().map(|s| s.as_str()).collect();
3303 let embeddings = embedder.embed(input_refs).await?;
3304
3305 // Distribute results back to properties
3306 for (embedding_idx, &prop_idx) in needs_embedding.iter().enumerate() {
3307 if let Some(vec) = embeddings.get(embedding_idx) {
3308 let vals: Vec<Value> =
3309 vec.iter().map(|f| Value::Float(*f as f64)).collect();
3310 properties_batch[prop_idx].insert(target_prop.clone(), Value::List(vals));
3311 }
3312 }
3313 }
3314 }
3315
3316 Ok(())
3317 }
3318
3319 async fn process_embeddings_impl(
3320 &self,
3321 label_name: Option<&str>,
3322 properties: &mut Properties,
3323 ) -> Result<()> {
3324 let schema = self.schema_manager.schema();
3325
3326 if let Some(label) = label_name {
3327 // Find vector indexes with embedding config for this label
3328 let mut configs = Vec::new();
3329 for idx in &schema.indexes {
3330 if let IndexDefinition::Vector(v_config) = idx
3331 && v_config.label == label
3332 && let Some(emb_config) = &v_config.embedding_config
3333 {
3334 configs.push((v_config.property.clone(), emb_config.clone()));
3335 }
3336 }
3337
3338 if configs.is_empty() {
3339 log::info!("No embedding config found for label {}", label);
3340 }
3341
3342 for (target_prop, emb_config) in configs {
3343 // If target property already exists, skip (assume user provided it)
3344 if properties.contains_key(&target_prop) {
3345 continue;
3346 }
3347
3348 // Check if source properties exist
3349 let mut inputs = Vec::new();
3350 for src_prop in &emb_config.source_properties {
3351 if let Some(val) = properties.get(src_prop)
3352 && let Some(s) = val.as_str()
3353 {
3354 inputs.push(s.to_string());
3355 }
3356 }
3357
3358 if inputs.is_empty() {
3359 continue;
3360 }
3361
3362 let input_text = inputs.join(" ");
3363 let input_text = match &emb_config.document_prefix {
3364 Some(prefix) => format!("{prefix}{input_text}"),
3365 None => input_text,
3366 };
3367
3368 let runtime = self.xervo_runtime.get().ok_or_else(|| {
3369 anyhow!("Uni-Xervo runtime not configured for auto-embedding")
3370 })?;
3371 let embedder = runtime.embedding(&emb_config.alias).await?;
3372
3373 // Generate
3374 let embeddings = embedder.embed(vec![input_text.as_str()]).await?;
3375 if let Some(vec) = embeddings.first() {
3376 // Store as array of floats
3377 let vals: Vec<Value> = vec.iter().map(|f| Value::Float(*f as f64)).collect();
3378 properties.insert(target_prop.clone(), Value::List(vals));
3379 }
3380 }
3381 }
3382 Ok(())
3383 }
3384
3385 /// Flushes the current in-memory L0 buffer to L1 storage.
3386 ///
3387 /// # Lock Ordering
3388 ///
3389 /// To prevent deadlocks, locks must be acquired in the following order:
3390 /// 1. `Writer` lock (held by caller via outer `Arc<RwLock<Writer>>`; removed in Phase 4)
3391 /// 2. `flush_lock` (acquired by this entry point; held across the whole flush)
3392 /// 3. `L0Manager` lock (via `begin_flush` / `get_current`)
3393 /// 4. `L0Buffer` lock (individual buffer RWLocks)
3394 /// 5. `Index` / `Storage` locks (during actual flush)
3395 ///
3396 /// Callers that already hold `flush_lock` (today only `commit_transaction_l0`)
3397 /// must call `flush_inline_under_lock` (private) directly to avoid a re-entrant
3398 /// `tokio::sync::Mutex` deadlock — see concurrent_writer.md §5.5.
3399 pub async fn flush_to_l1(&self, name: Option<String>) -> Result<String> {
3400 // Drain any in-flight async flushes first. `flush_to_l1` is a
3401 // SYNCHRONIZATION BARRIER — callers (test fixtures, fork
3402 // setup, shutdown paths) rely on it as "all writes are now
3403 // durably in Lance". Without the drain, an async stream from
3404 // a recent commit might still be writing to Lance when
3405 // `flush_to_l1` returns, leaving a window where forks branch
3406 // off pre-write Lance state and lose data.
3407 if let Some(coord) = self.flush_coordinator.as_ref() {
3408 let _ = coord.drain(self.config.drop_fork_drain_timeout).await;
3409 }
3410 let _flush_lock_guard = self.flush_lock.lock().await;
3411 self.flush_inline_under_lock(name).await
3412 }
3413
3414 /// Async-flush entry point: rotate under `flush_lock`, release the
3415 /// lock, then submit the stream phase to the [`FlushCoordinator`].
3416 /// Returns a [`FlushTicket`](crate::runtime::flush_coordinator::FlushTicket)
3417 /// that resolves when finalize completes.
3418 ///
3419 /// Errors if `config.async_flush_enabled = false` (the coordinator
3420 /// is `None` in that case — see `flush_coordinator` field doc).
3421 pub async fn flush_to_l1_async(
3422 self: &Arc<Self>,
3423 name: Option<String>,
3424 ) -> Result<crate::runtime::flush_coordinator::FlushTicket> {
3425 let coord = self
3426 .flush_coordinator
3427 .as_ref()
3428 .ok_or_else(|| anyhow!("async flush not enabled (config.async_flush_enabled=false)"))?
3429 .clone();
3430 // 1. Acquire permit FIRST (outside flush_lock) so we don't
3431 // introduce a permit-while-holding-flush-lock convoy.
3432 let permit = coord.acquire_permit().await?;
3433 // 2. Rotate under flush_lock (µs work), then allocate the rotate seq
3434 // and bump pending ONLY after the rotate succeeds (Bug #3). A failed
3435 // rotate (the `?` below) must consume neither: the finalizer
3436 // advances in strictly consecutive seq order and only decrements
3437 // pending on finalize, so a leaked seq/pending would wedge it
3438 // forever. The seq is allocated under `flush_lock`, immediately
3439 // after the rotate and before the guard drops, so concurrent rotates
3440 // keep seq order == rotation order, and the seq is unused until
3441 // submit. On the `?` error path the permit drops, freeing the slot.
3442 let (
3443 RotateOutput {
3444 old_l0_arc,
3445 wal_lsn,
3446 current_version,
3447 flush_in_progress_guard,
3448 },
3449 seq,
3450 ) = {
3451 let _flush_lock_guard = self.flush_lock.lock().await;
3452 let rotate_out = self.flush_l0_rotate().await?;
3453 let seq = coord.next_rotate_seq();
3454 coord.note_pending();
3455 (rotate_out, seq)
3456 };
3457 // 3. Build the coordinator's RotatedFlush. parent_manifest is the
3458 // cached_manifest snapshot at this moment.
3459 let parent_manifest = self.cached_manifest.lock().clone();
3460 let rotated = RotatedFlush {
3461 seq,
3462 old_l0_arc: old_l0_arc.clone(),
3463 wal_lsn,
3464 current_version,
3465 name: name.clone(),
3466 parent_manifest,
3467 permit,
3468 flush_in_progress_guard,
3469 };
3470 // 4. Spawn the stream phase via the coordinator. The closure
3471 // captures Arc<Writer> transiently — drops when stream
3472 // completes (bounded, ~50-500 ms).
3473 let writer = self.clone();
3474 let ticket = coord.submit_for_stream(rotated, move |old_l0, wal, ver, n| async move {
3475 let outcome = writer.flush_stream_l1(old_l0, wal, ver, n).await?;
3476 Ok(crate::runtime::flush_coordinator::FlushOutcome {
3477 new_manifest: outcome.manifest,
3478 snapshot_id: outcome.snapshot_id,
3479 })
3480 });
3481 Ok(ticket)
3482 }
3483
3484 /// Phase A+B+C of the flush: flush the WAL, rotate L0 (so the
3485 /// to-be-flushed buffer moves to `pending_flush` and a fresh L0 takes
3486 /// its place), and hand off the WAL to the new L0.
3487 ///
3488 /// Runs in microseconds. Must be called under `flush_lock` (the caller
3489 /// is responsible). The returned [`RotateOutput`] carries everything
3490 /// the subsequent stream + finalize phases need; in particular the
3491 /// [`FlushInProgressGuard`] is bound to the return value so it stays
3492 /// alive for the full flush lifetime — including any future async
3493 /// path where stream runs on a spawned task.
3494 async fn flush_l0_rotate(&self) -> Result<RotateOutput> {
3495 // Acquire the in-progress counter BEFORE any heavy work. The
3496 // guard lives on RotateOutput; dropping RotateOutput drops the
3497 // guard, so the counter goes back to zero exactly when the flush
3498 // is fully done.
3499 let flush_in_progress_guard = FlushInProgressGuard::new(&self.storage);
3500
3501 // A. Flush WAL BEFORE rotating L0. If WAL flush fails, the
3502 // current L0 is still active and mutations are retained in
3503 // memory until restart/retry.
3504 let wal_for_truncate = {
3505 let current_l0 = self.l0_manager.get_current();
3506 let l0_guard = current_l0.read();
3507 l0_guard.wal.clone()
3508 };
3509 // Test-only seam (no-op without the `failpoints` feature): inject a
3510 // WAL-flush failure here to drive the "failed async rotate wedges the
3511 // finalizer" regression (Bug #3). When configured to "return" it makes
3512 // `flush_l0_rotate` return Err exactly as a real WAL-flush failure would.
3513 fail::fail_point!("flush::rotate-fail", |_| {
3514 Err(anyhow!("flush::rotate-fail injected WAL-flush failure"))
3515 });
3516 let wal_lsn = if let Some(ref w) = wal_for_truncate {
3517 w.flush().await?
3518 } else {
3519 0
3520 };
3521
3522 // B. Begin flush: rotate L0 and keep old L0 visible to reads via
3523 // pending_flush until complete_flush is called by finalize.
3524 let old_l0_arc = self.l0_manager.begin_flush(0, None);
3525 metrics::counter!("uni_l0_buffer_rotations_total").increment(1);
3526
3527 // C. WAL handoff: record wal_lsn on old L0, transfer WAL handle
3528 // and current_version to the new L0.
3529 let current_version;
3530 {
3531 let mut old_l0_guard = old_l0_arc.write();
3532 current_version = old_l0_guard.current_version;
3533 old_l0_guard.wal_lsn_at_flush = wal_lsn;
3534 let wal = old_l0_guard.wal.take();
3535 let new_l0_arc = self.l0_manager.get_current();
3536 let mut new_l0_guard = new_l0_arc.write();
3537 new_l0_guard.wal = wal;
3538 new_l0_guard.current_version = current_version;
3539 }
3540
3541 Ok(RotateOutput {
3542 old_l0_arc,
3543 wal_lsn,
3544 current_version,
3545 flush_in_progress_guard,
3546 })
3547 }
3548
3549 /// Phases D, E, F, G of the flush: L1 collect, orphan resolve,
3550 /// manifest seed, Lance writes. Reads from `old_l0_arc` (kept in
3551 /// pending_flush by Phase B); writes append-only Lance datasets; does
3552 /// NOT call save_snapshot / set_latest_snapshot — those are
3553 /// finalize's job, so the manifest doesn't get published until the
3554 /// next phase.
3555 ///
3556 /// Today takes `&self`; in a follow-up commit this becomes a
3557 /// static `Send + 'static` function over `SharedFlushCtx` so it can
3558 /// run on a spawned task while concurrent commits proceed.
3559 async fn flush_stream_l1(
3560 &self,
3561 old_l0_arc: Arc<RwLock<L0Buffer>>,
3562 wal_lsn: u64,
3563 current_version: u64,
3564 name: Option<String>,
3565 ) -> Result<FlushOutcome> {
3566 // Test-only seam (no-op without the `failpoints` feature): the rotate
3567 // (begin_flush) already moved the to-be-flushed buffer onto
3568 // pending_flush and installed a fresh empty current buffer, but the
3569 // rotated rows are NOT yet durable in Lance. Pausing here holds that
3570 // window open to drive the unique-constraint-hole regression
3571 // (Bug #9 Mechanism A).
3572 fail::fail_point!("flush::after-rotate-before-lance");
3573
3574 // Phase B: materialize any deferred embeddings before column
3575 // extraction. No-op when `defer_embeddings` is off (the set will
3576 // be empty). On-demand reads of the embedding column are a TODO
3577 // for a future revision (see UniConfig::defer_embeddings docs).
3578 self.drain_pending_embeddings(&old_l0_arc).await?;
3579
3580 let schema = self.schema_manager.schema();
3581 // 2. Acquire Read lock on Old L0 for flushing
3582 let mut entries_by_type: HashMap<u32, Vec<L1Entry>> = HashMap::new();
3583 // (Vid, labels, properties, deleted, version)
3584 type VertexEntry = (Vid, Vec<String>, Properties, bool, u64);
3585 let mut vertices_by_label: HashMap<u16, Vec<VertexEntry>> = HashMap::new();
3586 // Partial-column updates (Lance MergeInsert path). Per-VID tuple:
3587 // (vid, full L0 properties map, version, set of keys to update).
3588 // Only the keys in the HashSet are emitted to the partial source;
3589 // the full props map is retained so the per-row column extractor
3590 // can read each touched key's value.
3591 type PartialEntry = (Vid, Properties, u64, std::collections::HashSet<String>);
3592 let mut partial_by_label: HashMap<u16, Vec<PartialEntry>> = HashMap::new();
3593 // DELETE-via-MergeInsert (Round-12 §B): tombstones flush as a
3594 // partial source with just `_vid`, `_deleted=true`, `_version`,
3595 // `_updated_at`. Skips the wide-row Append payload that adds
3596 // nothing on a soft-delete.
3597 let mut tombstones_by_label: HashMap<u16, Vec<(Vid, u64)>> = HashMap::new();
3598 let mut main_vertex_tombstones: Vec<(Vid, u64)> = Vec::new();
3599 // Collect vertex timestamps from L0 for flushing to storage
3600 let mut vertex_created_at: HashMap<Vid, i64> = HashMap::new();
3601 let mut vertex_updated_at: HashMap<Vid, i64> = HashMap::new();
3602 // Track tombstones missing labels for storage query fallback
3603 let mut orphaned_tombstones: Vec<(Vid, u64)> = Vec::new();
3604
3605 {
3606 let old_l0 = old_l0_arc.read();
3607
3608 // 1. Collect all edges and tombstones from L0
3609 for edge in old_l0.graph.edges() {
3610 let properties = old_l0
3611 .edge_properties
3612 .get(&edge.eid)
3613 .cloned()
3614 .unwrap_or_default();
3615 let version = old_l0.edge_versions.get(&edge.eid).copied().unwrap_or(0);
3616
3617 // Get timestamps from L0 buffer (populated during insert)
3618 let created_at = old_l0.edge_created_at.get(&edge.eid).copied();
3619 let updated_at = old_l0.edge_updated_at.get(&edge.eid).copied();
3620
3621 entries_by_type
3622 .entry(edge.edge_type)
3623 .or_default()
3624 .push(L1Entry {
3625 src_vid: edge.src_vid,
3626 dst_vid: edge.dst_vid,
3627 eid: edge.eid,
3628 op: Op::Insert,
3629 version,
3630 properties,
3631 created_at,
3632 updated_at,
3633 });
3634 }
3635
3636 // From tombstones
3637 for tombstone in old_l0.tombstones.values() {
3638 let version = old_l0
3639 .edge_versions
3640 .get(&tombstone.eid)
3641 .copied()
3642 .unwrap_or(0);
3643 // Get timestamps - for deletes, updated_at reflects deletion time
3644 let created_at = old_l0.edge_created_at.get(&tombstone.eid).copied();
3645 let updated_at = old_l0.edge_updated_at.get(&tombstone.eid).copied();
3646
3647 entries_by_type
3648 .entry(tombstone.edge_type)
3649 .or_default()
3650 .push(L1Entry {
3651 src_vid: tombstone.src_vid,
3652 dst_vid: tombstone.dst_vid,
3653 eid: tombstone.eid,
3654 op: Op::Delete,
3655 version,
3656 properties: HashMap::new(),
3657 created_at,
3658 updated_at,
3659 });
3660 }
3661
3662 // 1b. Collect vertices by label (using vertex_labels from L0)
3663 //
3664 // Helper: fan-out a single vertex entry into per-label buckets.
3665 // Each per-label table row carries the full label set so multi-label
3666 // info is preserved after flush.
3667 let push_vertex_to_labels =
3668 |vid: Vid,
3669 all_labels: &[String],
3670 props: Properties,
3671 deleted: bool,
3672 version: u64,
3673 out: &mut HashMap<u16, Vec<VertexEntry>>| {
3674 for label in all_labels {
3675 if let Some(label_id) = schema.label_id_by_name(label) {
3676 out.entry(label_id).or_default().push((
3677 vid,
3678 all_labels.to_vec(),
3679 props.clone(),
3680 deleted,
3681 version,
3682 ));
3683 }
3684 }
3685 };
3686
3687 for (vid, props) in &old_l0.vertex_properties {
3688 let version = old_l0.vertex_versions.get(vid).copied().unwrap_or(0);
3689 // Collect timestamps for this vertex
3690 if let Some(&ts) = old_l0.vertex_created_at.get(vid) {
3691 vertex_created_at.insert(*vid, ts);
3692 }
3693 if let Some(&ts) = old_l0.vertex_updated_at.get(vid) {
3694 vertex_updated_at.insert(*vid, ts);
3695 }
3696 if let Some(labels) = old_l0.vertex_labels.get(vid) {
3697 // Partial-write routing: when this VID was last
3698 // touched via `insert_vertex_partial` AND the
3699 // partial_lance_writes flag is on, send only the
3700 // touched columns to a MergeInsert batch. Otherwise
3701 // (CREATE, MERGE-ON-CREATE, full-replace SET, DELETE
3702 // — or flag off) use the existing full-row Append.
3703 let is_partial = self.config.partial_lance_writes
3704 && old_l0.vertex_partial_keys.contains_key(vid);
3705 if is_partial {
3706 if let Some(touched) = old_l0.vertex_partial_keys.get(vid) {
3707 for label in labels {
3708 if let Some(label_id) = schema.label_id_by_name(label) {
3709 partial_by_label.entry(label_id).or_default().push((
3710 *vid,
3711 props.clone(),
3712 version,
3713 touched.clone(),
3714 ));
3715 }
3716 }
3717 }
3718 } else {
3719 push_vertex_to_labels(
3720 *vid,
3721 labels,
3722 props.clone(),
3723 false,
3724 version,
3725 &mut vertices_by_label,
3726 );
3727 }
3728 }
3729 }
3730 for &vid in &old_l0.vertex_tombstones {
3731 let version = old_l0.vertex_versions.get(&vid).copied().unwrap_or(0);
3732 if let Some(&ts) = old_l0.vertex_updated_at.get(&vid) {
3733 vertex_updated_at.insert(vid, ts);
3734 }
3735 if let Some(labels) = old_l0.vertex_labels.get(&vid) {
3736 // Round-12 §B: tombstones flush via Lance MergeInsert
3737 // (just `_vid`, `_deleted=true`, `_version`,
3738 // `_updated_at`) — skipping the wide-row Append.
3739 // Unconditional (no `partial_lance_writes` gating);
3740 // tombstone Append carries no useful payload.
3741 for label in labels {
3742 if let Some(label_id) = schema.label_id_by_name(label) {
3743 tombstones_by_label
3744 .entry(label_id)
3745 .or_default()
3746 .push((vid, version));
3747 }
3748 }
3749 } else {
3750 // Tombstone missing labels (old WAL format) - collect for storage query fallback
3751 orphaned_tombstones.push((vid, version));
3752 }
3753 }
3754 } // Drop read lock
3755
3756 // Resolve orphaned tombstones (missing labels) from storage
3757 if !orphaned_tombstones.is_empty() {
3758 tracing::warn!(
3759 count = orphaned_tombstones.len(),
3760 "Tombstones missing labels in L0, querying storage as fallback"
3761 );
3762 for (vid, version) in orphaned_tombstones {
3763 if let Ok(Some(labels)) = self.find_vertex_labels_in_storage(vid).await
3764 && !labels.is_empty()
3765 {
3766 for label in &labels {
3767 if let Some(label_id) = schema.label_id_by_name(label) {
3768 // Round-12 §B: route through partial tombstone too.
3769 tombstones_by_label
3770 .entry(label_id)
3771 .or_default()
3772 .push((vid, version));
3773 }
3774 }
3775 }
3776 }
3777 }
3778
3779 // 1. Load previous snapshot from cache, or fall back to storage.
3780 //
3781 // Use clone() not take(): for the async path, multiple
3782 // concurrent streams may run; if we take() here, a sibling
3783 // stream sees cached_manifest = None and seeds from
3784 // load_latest_snapshot (stale), losing the chain. clone()
3785 // preserves the parent. Finalize writes back the new manifest
3786 // unconditionally.
3787 let mut manifest = if let Some(cached) = self.cached_manifest.lock().clone() {
3788 cached
3789 } else {
3790 self.storage
3791 .snapshot_manager()
3792 .load_latest_snapshot()
3793 .await?
3794 .unwrap_or_else(|| {
3795 SnapshotManifest::new(Uuid::new_v4().to_string(), schema.schema_version)
3796 })
3797 };
3798
3799 // Update snapshot metadata
3800 // Save parent snapshot ID before generating new one (for lineage tracking)
3801 let parent_id = manifest.snapshot_id.clone();
3802 manifest.parent_snapshot = Some(parent_id);
3803 manifest.snapshot_id = Uuid::new_v4().to_string();
3804 manifest.name = name;
3805 manifest.created_at = Utc::now();
3806 manifest.version_high_water_mark = current_version;
3807 manifest.wal_high_water_mark = wal_lsn;
3808 let snapshot_id = manifest.snapshot_id.clone();
3809
3810 tracing::Span::current().record("snapshot_id", &snapshot_id);
3811
3812 // 2. Write main unified tables FIRST (before deltas).
3813 // Ensures the dual-write invariant: by the time an EID appears in a
3814 // delta table, it already exists in main_edges. This prevents the
3815 // compaction debug_assert from firing when compaction interleaves
3816 // with flush at async yield points.
3817 //
3818 // 2.1 Main edges table
3819 let (main_edges, edge_created_at_map, edge_updated_at_map) = {
3820 let _old_l0 = old_l0_arc.read();
3821 let mut main_edges: Vec<(
3822 uni_common::core::id::Eid,
3823 Vid,
3824 Vid,
3825 String,
3826 Properties,
3827 bool,
3828 u64,
3829 )> = Vec::new();
3830 let mut edge_created_at_map: HashMap<uni_common::core::id::Eid, i64> = HashMap::new();
3831 let mut edge_updated_at_map: HashMap<uni_common::core::id::Eid, i64> = HashMap::new();
3832
3833 for (&edge_type_id, entries) in entries_by_type.iter() {
3834 for entry in entries {
3835 let edge_type_name = self
3836 .storage
3837 .schema_manager()
3838 .edge_type_name_by_id_unified(edge_type_id)
3839 .unwrap_or_else(|| "unknown".to_string());
3840
3841 let deleted = matches!(entry.op, Op::Delete);
3842 main_edges.push((
3843 entry.eid,
3844 entry.src_vid,
3845 entry.dst_vid,
3846 edge_type_name,
3847 entry.properties.clone(),
3848 deleted,
3849 entry.version,
3850 ));
3851
3852 if let Some(ts) = entry.created_at {
3853 edge_created_at_map.insert(entry.eid, ts);
3854 }
3855 if let Some(ts) = entry.updated_at {
3856 edge_updated_at_map.insert(entry.eid, ts);
3857 }
3858 }
3859 }
3860
3861 (main_edges, edge_created_at_map, edge_updated_at_map)
3862 };
3863
3864 if !main_edges.is_empty() {
3865 let main_edge_batch = MainEdgeDataset::build_record_batch(
3866 &main_edges,
3867 Some(&edge_created_at_map),
3868 Some(&edge_updated_at_map),
3869 )?;
3870 MainEdgeDataset::write_batch(self.storage.backend(), main_edge_batch).await?;
3871 MainEdgeDataset::ensure_default_indexes(self.storage.backend()).await?;
3872 }
3873
3874 // 2.2 Main vertices table
3875 let main_vertices: Vec<(Vid, Vec<String>, Properties, bool, u64)> = {
3876 let old_l0 = old_l0_arc.read();
3877 let mut vertices = Vec::new();
3878
3879 // Live vertices: full-row Append on the main table (the
3880 // props_json blob is required for global ID lookups). For
3881 // partial-row VIDs (vertex_partial_keys non-empty), the
3882 // main table still needs the full props for the
3883 // ext_id-uniqueness path; we keep the Append here. The
3884 // per-label Lance write IS partial via MergeInsert.
3885 for (vid, props) in &old_l0.vertex_properties {
3886 let version = old_l0.vertex_versions.get(vid).copied().unwrap_or(0);
3887 let labels = old_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
3888 vertices.push((*vid, labels, props.clone(), false, version));
3889 }
3890
3891 // Tombstones: collected into `main_vertex_tombstones` for
3892 // the MergeInsert path below; skipping the wide-row Append.
3893 for &vid in &old_l0.vertex_tombstones {
3894 let version = old_l0.vertex_versions.get(&vid).copied().unwrap_or(0);
3895 main_vertex_tombstones.push((vid, version));
3896 }
3897
3898 vertices
3899 };
3900
3901 if !main_vertices.is_empty() {
3902 let main_vertex_batch = MainVertexDataset::build_record_batch(
3903 &main_vertices,
3904 Some(&vertex_created_at),
3905 Some(&vertex_updated_at),
3906 )?;
3907 MainVertexDataset::write_batch(self.storage.backend(), main_vertex_batch).await?;
3908 }
3909 // Round-12 §B: tombstones via MergeInsert on the main vertices
3910 // table. Independent of `vertex_properties` length.
3911 if !main_vertex_tombstones.is_empty() {
3912 let tomb_batch = MainVertexDataset::build_tombstone_partial_batch(
3913 &main_vertex_tombstones,
3914 Some(&vertex_updated_at),
3915 )?;
3916 MainVertexDataset::merge_insert_tombstone_batch(self.storage.backend(), tomb_batch)
3917 .await?;
3918 }
3919 if !main_vertices.is_empty() || !main_vertex_tombstones.is_empty() {
3920 MainVertexDataset::ensure_default_indexes(self.storage.backend()).await?;
3921 }
3922
3923 // 3. For each edge type, write FWD and BWD delta runs
3924 for (&edge_type_id, entries) in entries_by_type.iter() {
3925 // Get edge type name from unified lookup (handles both schema'd and schemaless)
3926 let edge_type_name = self
3927 .storage
3928 .schema_manager()
3929 .edge_type_name_by_id_unified(edge_type_id)
3930 .ok_or_else(|| anyhow!("Edge type ID {} not found", edge_type_id))?;
3931
3932 // FWD Run (sorted by src_vid)
3933 // Round-12 §A: split entries into full-row Append and
3934 // partial MergeInsert routes based on `edge_partial_keys`.
3935 // Edges in `edge_partial_keys` were last written via
3936 // `insert_edge_partial_full`; the per-edge-type delta
3937 // tables receive only the touched schema columns plus
3938 // (when any overflow key was touched) the regenerated
3939 // `overflow_json` blob. Untouched columns retain their
3940 // previous-version value via Lance MergeInsert.
3941 let partial_eids: std::collections::HashSet<Eid> = {
3942 let old_l0 = old_l0_arc.read();
3943 entries
3944 .iter()
3945 .filter(|e| {
3946 self.config.partial_lance_writes
3947 && old_l0.edge_partial_keys.contains_key(&e.eid)
3948 })
3949 .map(|e| e.eid)
3950 .collect()
3951 };
3952 let touched_union_by_eid: HashMap<Eid, std::collections::HashSet<String>> = {
3953 let old_l0 = old_l0_arc.read();
3954 partial_eids
3955 .iter()
3956 .filter_map(|eid| old_l0.edge_partial_keys.get(eid).map(|s| (*eid, s.clone())))
3957 .collect()
3958 };
3959 let (full_entries, partial_entries): (Vec<L1Entry>, Vec<L1Entry>) = entries
3960 .clone()
3961 .into_iter()
3962 .partition(|e| !partial_eids.contains(&e.eid));
3963
3964 let backend = self.storage.backend();
3965
3966 // FWD run (sorted by src_vid)
3967 let mut fwd_full = full_entries.clone();
3968 fwd_full.sort_by_key(|e| e.src_vid);
3969 let mut fwd_partial = partial_entries.clone();
3970 fwd_partial.sort_by_key(|e| e.src_vid);
3971 let fwd_ds = self.storage.delta_dataset(&edge_type_name, "fwd")?;
3972 if !fwd_full.is_empty() {
3973 let fwd_batch = fwd_ds.build_record_batch(&fwd_full, &schema)?;
3974 fwd_ds.write_run(backend, fwd_batch).await?;
3975 }
3976 if !fwd_partial.is_empty() {
3977 let touched_union: std::collections::HashSet<String> = fwd_partial
3978 .iter()
3979 .flat_map(|e| {
3980 touched_union_by_eid
3981 .get(&e.eid)
3982 .cloned()
3983 .unwrap_or_default()
3984 .into_iter()
3985 })
3986 .collect();
3987 let fwd_partial_batch =
3988 fwd_ds.build_partial_record_batch(&fwd_partial, &touched_union, &schema)?;
3989 fwd_ds
3990 .merge_insert_partial_run(backend, fwd_partial_batch)
3991 .await?;
3992 }
3993 fwd_ds.ensure_eid_index(backend).await?;
3994
3995 // BWD Run (sorted by dst_vid)
3996 let mut bwd_full = full_entries.clone();
3997 bwd_full.sort_by_key(|e| e.dst_vid);
3998 let mut bwd_partial = partial_entries.clone();
3999 bwd_partial.sort_by_key(|e| e.dst_vid);
4000 let bwd_ds = self.storage.delta_dataset(&edge_type_name, "bwd")?;
4001 if !bwd_full.is_empty() {
4002 let bwd_batch = bwd_ds.build_record_batch(&bwd_full, &schema)?;
4003 bwd_ds.write_run(backend, bwd_batch).await?;
4004 }
4005 if !bwd_partial.is_empty() {
4006 let touched_union: std::collections::HashSet<String> = bwd_partial
4007 .iter()
4008 .flat_map(|e| {
4009 touched_union_by_eid
4010 .get(&e.eid)
4011 .cloned()
4012 .unwrap_or_default()
4013 .into_iter()
4014 })
4015 .collect();
4016 let bwd_partial_batch =
4017 bwd_ds.build_partial_record_batch(&bwd_partial, &touched_union, &schema)?;
4018 bwd_ds
4019 .merge_insert_partial_run(backend, bwd_partial_batch)
4020 .await?;
4021 }
4022 bwd_ds.ensure_eid_index(backend).await?;
4023
4024 // Update Manifest
4025 let current_snap =
4026 manifest
4027 .edges
4028 .entry(edge_type_name.to_string())
4029 .or_insert(EdgeSnapshot {
4030 version: 0,
4031 count: 0,
4032 lance_version: 0,
4033 });
4034 current_snap.version += 1;
4035 current_snap.count += entries.len() as u64;
4036 // LanceDB tables don't expose Lance version directly
4037 current_snap.lance_version = 0;
4038
4039 // Note: No CSR invalidation needed. AdjacencyManager's overlay
4040 // already has these edges via dual-write in insert_edge/delete_edge.
4041 }
4042
4043 // 4. Per-label vertex table writes
4044 // Iterate all labels that have either full-row OR partial-write
4045 // data pending. A label may appear in only one of the two maps
4046 // (e.g., all updates on this label were partial-only).
4047 let all_label_ids: std::collections::HashSet<u16> = vertices_by_label
4048 .keys()
4049 .chain(partial_by_label.keys())
4050 .chain(tombstones_by_label.keys())
4051 .copied()
4052 .collect();
4053 for label_id in all_label_ids {
4054 let vertices = vertices_by_label.remove(&label_id).unwrap_or_default();
4055 let label_name = schema
4056 .label_name_by_id(label_id)
4057 .ok_or_else(|| anyhow!("Label ID {} not found", label_id))?;
4058
4059 let ds = self.storage.vertex_dataset(label_name)?;
4060
4061 // Collect inverted index updates before consuming vertices
4062 // Maps: cfg.property -> (added, removed)
4063 type InvertedUpdateMap = HashMap<String, (HashMap<Vid, Vec<String>>, HashSet<Vid>)>;
4064 let mut inverted_updates: InvertedUpdateMap = HashMap::new();
4065
4066 for idx in &schema.indexes {
4067 if let IndexDefinition::Inverted(cfg) = idx
4068 && cfg.label == label_name
4069 {
4070 let mut added: HashMap<Vid, Vec<String>> = HashMap::new();
4071 let mut removed: HashSet<Vid> = HashSet::new();
4072
4073 for (vid, _labels, props, deleted, _version) in &vertices {
4074 if *deleted {
4075 removed.insert(*vid);
4076 } else if let Some(prop_value) = props.get(&cfg.property) {
4077 // Extract terms from the property value (List<String>)
4078 if let Some(arr) = prop_value.as_array() {
4079 let terms: Vec<String> = arr
4080 .iter()
4081 .filter_map(|v| v.as_str().map(ToString::to_string))
4082 .collect();
4083 if !terms.is_empty() {
4084 added.insert(*vid, terms);
4085 }
4086 }
4087 }
4088 }
4089 // Round-12 §B: tombstones no longer in `vertices`;
4090 // pull them from `tombstones_by_label` for inverted
4091 // index removal.
4092 if let Some(tomb_rows) = tombstones_by_label.get(&label_id) {
4093 for (vid, _) in tomb_rows {
4094 removed.insert(*vid);
4095 }
4096 }
4097
4098 if !added.is_empty() || !removed.is_empty() {
4099 inverted_updates.insert(cfg.property.clone(), (added, removed));
4100 }
4101 }
4102 }
4103
4104 let mut v_data = Vec::new();
4105 let mut d_data = Vec::new();
4106 let mut ver_data = Vec::new();
4107 for (vid, labels, props, deleted, version) in vertices {
4108 v_data.push((vid, labels, props));
4109 d_data.push(deleted);
4110 ver_data.push(version);
4111 }
4112
4113 let backend = self.storage.backend();
4114
4115 // Skip the full-row Append entirely if this label only has
4116 // partial-write rows pending.
4117 if !v_data.is_empty() {
4118 let batch = ds.build_record_batch_with_timestamps(
4119 &v_data,
4120 &d_data,
4121 &ver_data,
4122 &schema,
4123 Some(&vertex_created_at),
4124 Some(&vertex_updated_at),
4125 )?;
4126 ds.write_batch(backend, batch, &schema).await?;
4127 }
4128
4129 // Partial-column batch (Lance MergeInsert path). The flag
4130 // gates whether the routing classified any VIDs as partial;
4131 // outside the flag this collection is always empty so the
4132 // call below is a cheap no-op.
4133 if let Some(partial_rows) = partial_by_label.remove(&label_id)
4134 && !partial_rows.is_empty()
4135 {
4136 let touched_union: std::collections::HashSet<String> = partial_rows
4137 .iter()
4138 .flat_map(|(_, _, _, keys)| keys.iter().cloned())
4139 .collect();
4140 let pairs: Vec<(Vid, Properties)> = partial_rows
4141 .iter()
4142 .map(|(vid, props, _, _)| (*vid, props.clone()))
4143 .collect();
4144 let versions: Vec<u64> = partial_rows.iter().map(|(_, _, v, _)| *v).collect();
4145 let partial_batch = ds.build_partial_record_batch(
4146 &pairs,
4147 &versions,
4148 &touched_union,
4149 &schema,
4150 Some(&vertex_updated_at),
4151 )?;
4152 if partial_batch.num_rows() > 0 {
4153 ds.merge_insert_batch(backend, partial_batch).await?;
4154 }
4155 }
4156
4157 // Tombstone batch (Round-12 §B): always MergeInsert with
4158 // just `_vid`, `_deleted=true`, `_version`, `_updated_at`.
4159 // No partial_lance_writes gating — tombstones never carry
4160 // useful property payload to write. Captured tombstone vids
4161 // also drive `remove_from_vid_labels_index` below.
4162 let tombstone_rows = tombstones_by_label.remove(&label_id).unwrap_or_default();
4163 if !tombstone_rows.is_empty() {
4164 let tomb_batch =
4165 ds.build_tombstone_partial_batch(&tombstone_rows, Some(&vertex_updated_at))?;
4166 if tomb_batch.num_rows() > 0 {
4167 ds.merge_insert_batch(backend, tomb_batch).await?;
4168 }
4169 }
4170
4171 ds.ensure_default_indexes(backend).await?;
4172
4173 // Update VidLabelsIndex (if enabled). v_data carries live
4174 // vertices; tombstone removals come from the
4175 // `tombstone_rows` captured above the MergeInsert call.
4176 for ((vid, labels, _props), &deleted) in v_data.iter().zip(d_data.iter()) {
4177 if deleted {
4178 self.storage.remove_from_vid_labels_index(*vid);
4179 } else {
4180 self.storage.update_vid_labels_index(*vid, labels.clone());
4181 }
4182 }
4183 for (vid, _) in &tombstone_rows {
4184 self.storage.remove_from_vid_labels_index(*vid);
4185 }
4186
4187 // Update Manifest
4188 let current_snap =
4189 manifest
4190 .vertices
4191 .entry(label_name.to_string())
4192 .or_insert(LabelSnapshot {
4193 version: 0,
4194 count: 0,
4195 lance_version: 0,
4196 });
4197 current_snap.version += 1;
4198 current_snap.count += v_data.len() as u64;
4199 // LanceDB tables don't expose Lance version directly
4200 current_snap.lance_version = 0;
4201
4202 // Invalidate table cache to ensure next read picks up new version
4203 self.storage.invalidate_table_cache(label_name);
4204
4205 // Apply inverted index updates incrementally
4206 #[cfg(feature = "lance-backend")]
4207 for idx in &schema.indexes {
4208 if let IndexDefinition::Inverted(cfg) = idx
4209 && cfg.label == label_name
4210 && let Some((added, removed)) = inverted_updates.get(&cfg.property)
4211 {
4212 self.storage
4213 .index_manager()
4214 .update_inverted_index_incremental(cfg, added, removed)
4215 .await?;
4216 }
4217 }
4218
4219 // Update UID index with new vertex mappings
4220 // Collect (UniId, Vid) mappings from non-deleted vertices
4221 #[cfg(feature = "lance-backend")]
4222 {
4223 let mut uid_mappings: Vec<(uni_common::core::id::UniId, Vid)> = Vec::new();
4224 for (vid, _labels, props) in &v_data {
4225 let ext_id = props.get("ext_id").and_then(|v| v.as_str());
4226 let uid = crate::storage::vertex::VertexDataset::compute_vertex_uid(
4227 label_name, ext_id, props,
4228 );
4229 uid_mappings.push((uid, *vid));
4230 }
4231
4232 if !uid_mappings.is_empty()
4233 && let Ok(uid_index) = self.storage.uid_index(label_name)
4234 {
4235 // Stamp mappings with this flush's MVCC version so a later
4236 // re-create of the same UID deterministically outranks the
4237 // stale mapping (review C3).
4238 uid_index
4239 .write_mapping_versioned(&uid_mappings, current_version)
4240 .await?;
4241 }
4242 }
4243 }
4244 Ok(FlushOutcome {
4245 manifest,
4246 snapshot_id,
4247 })
4248 }
4249
4250 /// Composition entry that assumes the caller already holds `flush_lock`.
4251 /// Runs rotate + stream + finalize_locked in sequence. Used by
4252 /// [`Writer::flush_to_l1`] (acquires the lock first) and by
4253 /// `commit_transaction_l0`'s post-merge auto-flush branch (which already
4254 /// holds the lock from the commit critical section).
4255 #[instrument(
4256 skip(self),
4257 fields(snapshot_id, mutations_count, size_bytes),
4258 level = "info"
4259 )]
4260 async fn flush_inline_under_lock(&self, name: Option<String>) -> Result<String> {
4261 let start = std::time::Instant::now();
4262
4263 let (initial_size, initial_count) = {
4264 let l0_arc = self.l0_manager.get_current();
4265 let l0 = l0_arc.read();
4266 (l0.estimated_size, l0.mutation_count)
4267 };
4268 tracing::Span::current().record("size_bytes", initial_size);
4269 tracing::Span::current().record("mutations_count", initial_count);
4270
4271 debug!("Starting L0 flush to L1");
4272
4273 // Phases A (WAL pre-flush), B (rotate), C (WAL handoff).
4274 // FlushInProgressGuard lives on RotateOutput and stays alive for
4275 // the full flush — including the finalize_locked call below.
4276 let RotateOutput {
4277 old_l0_arc,
4278 wal_lsn,
4279 current_version,
4280 flush_in_progress_guard: _flush_guard,
4281 } = self.flush_l0_rotate().await?;
4282
4283 // Phases D (L1 collect), E (orphan resolve), F (manifest seed),
4284 // G (Lance writes). Builds the manifest but does NOT publish it.
4285 let FlushOutcome {
4286 manifest,
4287 snapshot_id,
4288 } = self
4289 .flush_stream_l1(old_l0_arc.clone(), wal_lsn, current_version, name)
4290 .await?;
4291
4292 // Phases H..S: publish manifest, complete_flush, WAL truncate,
4293 // property cache clear, last_flush_time, metrics, l1_runs++,
4294 // compaction trigger, index-rebuild scheduling, fork tick.
4295 self.flush_finalize_locked(
4296 old_l0_arc,
4297 wal_lsn,
4298 manifest,
4299 snapshot_id,
4300 initial_size,
4301 initial_count,
4302 start,
4303 )
4304 .await
4305 }
4306
4307 /// Phases H..S of the flush: publish the manifest and run all
4308 /// post-publish bookkeeping. Assumes the caller already holds
4309 /// `flush_lock` — see [`Writer::flush_finalize_now`] for the
4310 /// lock-acquiring variant used by the async finalize path.
4311 #[allow(clippy::too_many_arguments)]
4312 async fn flush_finalize_locked(
4313 &self,
4314 old_l0_arc: Arc<RwLock<L0Buffer>>,
4315 wal_lsn: u64,
4316 manifest: SnapshotManifest,
4317 snapshot_id: String,
4318 initial_size: usize,
4319 initial_count: usize,
4320 start: std::time::Instant,
4321 ) -> Result<String> {
4322 Self::flush_finalize_body(
4323 &self.shared_ctx(),
4324 old_l0_arc,
4325 wal_lsn,
4326 manifest,
4327 snapshot_id,
4328 initial_size,
4329 initial_count,
4330 start,
4331 )
4332 .await
4333 }
4334
4335 /// Phases H..S of the flush, lock-acquiring variant. Used by the
4336 /// async-flush finalizer task (running on a spawned tokio task),
4337 /// which holds neither `&self` nor `flush_lock`. Briefly re-acquires
4338 /// `flush_lock` to serialize the publish boundary, then runs the
4339 /// same body as `flush_finalize_locked` but over a SharedFlushCtx.
4340 #[allow(clippy::too_many_arguments)]
4341 pub(crate) async fn flush_finalize_now(
4342 shared: SharedFlushCtx,
4343 old_l0_arc: Arc<RwLock<L0Buffer>>,
4344 wal_lsn: u64,
4345 manifest: SnapshotManifest,
4346 snapshot_id: String,
4347 initial_size: usize,
4348 initial_count: usize,
4349 start: std::time::Instant,
4350 ) -> Result<String> {
4351 let _flush_lock_guard = shared.flush_lock.clone().lock_owned().await;
4352 Self::flush_finalize_body(
4353 &shared,
4354 old_l0_arc,
4355 wal_lsn,
4356 manifest,
4357 snapshot_id,
4358 initial_size,
4359 initial_count,
4360 start,
4361 )
4362 .await
4363 }
4364
4365 /// Shared body of `flush_finalize_locked` and `flush_finalize_now`.
4366 /// Static over `SharedFlushCtx`; the caller is responsible for
4367 /// holding `flush_lock`.
4368 #[allow(clippy::too_many_arguments)]
4369 async fn flush_finalize_body(
4370 shared: &SharedFlushCtx,
4371 old_l0_arc: Arc<RwLock<L0Buffer>>,
4372 wal_lsn: u64,
4373 mut manifest: SnapshotManifest,
4374 snapshot_id: String,
4375 initial_size: usize,
4376 initial_count: usize,
4377 start: std::time::Instant,
4378 ) -> Result<String> {
4379 // Parent-snapshot fixup. The stream phase built `manifest` with
4380 // parent_snapshot set from cached_manifest at stream time. If
4381 // OTHER flushes (sync or async) have finalized since then,
4382 // cached_manifest has advanced. Re-link this manifest to the
4383 // current cached chain so we don't orphan their data when we
4384 // overwrite cached_manifest below.
4385 let current_parent_id = shared
4386 .cached_manifest
4387 .lock()
4388 .as_ref()
4389 .map(|m| m.snapshot_id.clone());
4390 if current_parent_id.is_some() && manifest.parent_snapshot != current_parent_id {
4391 manifest.parent_snapshot = current_parent_id;
4392 metrics::counter!("uni_flush_parent_chain_fixups_total").increment(1);
4393 }
4394
4395 // H. Publish manifest (body first, then pointer — recovery is
4396 // idempotent if we crash between the two).
4397 shared
4398 .storage
4399 .snapshot_manager()
4400 .save_snapshot(&manifest)
4401 .await?;
4402 shared
4403 .storage
4404 .snapshot_manager()
4405 .set_latest_snapshot(&manifest.snapshot_id)
4406 .await?;
4407
4408 // H2. Durability barrier (review C4). `save_snapshot` / `set_latest_snapshot`
4409 // wrote the manifest body and the `catalog/latest` pointer through the
4410 // object store, which does NOT fsync. WAL truncation (K, below) removes
4411 // the only other durable copy of this flush's data, so a crash after K
4412 // but before the OS flushed those writes would lose the snapshot —
4413 // recovery could not resolve `latest`. Make them durable now (local-fs
4414 // only; remote stores provide their own durability on `put`).
4415 crate::snapshot::manager::fsync_snapshot_pointer(
4416 shared.storage.local_fs_root().as_deref(),
4417 &manifest.snapshot_id,
4418 )
4419 .map_err(|e| {
4420 anyhow!(
4421 "fsync snapshot {} before WAL truncate: {}",
4422 manifest.snapshot_id,
4423 e
4424 )
4425 })?;
4426
4427 // I. Cache manifest for next flush to avoid re-reading from object store.
4428 *shared.cached_manifest.lock() = Some(manifest.clone());
4429
4430 // L. Invalidate the property cache BEFORE removing the flushed buffer
4431 // from the L0 chain (Bug #10). `clear_cache` has no dependency on the
4432 // complete_flush (J) / WAL-truncate (K) steps below, so clearing it
4433 // first closes the non-monotonic-read window: once the buffer leaves
4434 // the L0 chain at J a freshly-written value would otherwise miss the
4435 // chain and fall through to a stale cache entry. By the time finalize
4436 // runs the streamed rows are already durable in L1, so a post-clear
4437 // read falls through to fresh storage instead. The finalizer holds
4438 // `flush_lock` throughout, so reordering L ahead of J is safe.
4439 if let Some(ref pm) = shared.property_manager {
4440 pm.clear_cache().await;
4441 }
4442
4443 // J. Complete flush: remove old L0 from pending_flush. MUST happen
4444 // BEFORE WAL truncation so min_pending_wal_lsn is accurate.
4445 shared.l0_manager.complete_flush(&old_l0_arc);
4446
4447 // Test-only seam (no-op without the `failpoints` feature): pause AFTER
4448 // complete_flush removed the buffer from the L0 chain (J) but BEFORE
4449 // WAL truncation (K). The property cache is already cleared (L moved
4450 // ahead of J above), so a read in this window falls through to fresh
4451 // L1 storage rather than a stale cache entry (Bug #10 — non-monotonic
4452 // read after flush finalize).
4453 fail::fail_point!("flush::after-complete-before-cache-clear");
4454
4455 // K. Truncate WAL up to the safe LSN.
4456 let wal_handle = shared.l0_manager.get_current().read().wal.clone();
4457 if let Some(w) = wal_handle {
4458 let safe_lsn = shared
4459 .l0_manager
4460 .min_pending_wal_lsn()
4461 .map(|min_pending| min_pending.min(wal_lsn))
4462 .unwrap_or(wal_lsn);
4463 w.truncate_before(safe_lsn).await?;
4464 }
4465
4466 // M. Reset last flush time for time-based auto-flush.
4467 *shared.last_flush_time.lock() = std::time::Instant::now();
4468
4469 info!(
4470 snapshot_id,
4471 mutations_count = initial_count,
4472 size_bytes = initial_size,
4473 "L0 flush to L1 completed successfully"
4474 );
4475 metrics::histogram!("uni_flush_duration_seconds").record(start.elapsed().as_secs_f64());
4476 metrics::counter!("uni_flush_bytes_total").increment(initial_size as u64);
4477 metrics::counter!("uni_flush_rows_total").increment(initial_count as u64);
4478
4479 // P. Increment flush generation counter for write throttling.
4480 {
4481 let mut status = uni_common::sync::acquire_mutex(
4482 &shared.storage.compaction_status,
4483 "compaction_status",
4484 )?;
4485 status.l1_runs += 1;
4486 }
4487
4488 // Q. Trigger CSR compaction if enough frozen segments have accumulated.
4489 let am = shared.adjacency_manager.clone();
4490 if am.should_compact(shared.compaction_config.frozen_segments_compact_threshold) {
4491 let previous_still_running = {
4492 let guard = shared.compaction_handle.read();
4493 guard.as_ref().is_some_and(|h| !h.is_finished())
4494 };
4495 if previous_still_running {
4496 info!("Skipping compaction: previous compaction still in progress");
4497 } else {
4498 let handle = tokio::spawn(async move {
4499 am.compact();
4500 });
4501 *shared.compaction_handle.write() = Some(handle);
4502 }
4503 }
4504
4505 // R. Post-flush: check if any indexes need rebuilding based on thresholds.
4506 if shared.auto_rebuild_enabled
4507 && let Some(rebuild_mgr) = shared.index_rebuild_manager.get()
4508 {
4509 Self::schedule_index_rebuilds_if_needed_static(
4510 &manifest,
4511 rebuild_mgr.clone(),
4512 shared.schema_manager.clone(),
4513 shared.index_rebuild_config.clone(),
4514 );
4515 }
4516
4517 // S. Emit fork-fragment observability after a successful forked flush.
4518 Self::tick_fork_fragment_observability_static(
4519 shared.fork_id,
4520 shared.fork_flush_count.clone(),
4521 shared.fork_fragment_warn_fired.clone(),
4522 shared.fork_fragment_warn_threshold,
4523 );
4524
4525 Ok(snapshot_id)
4526 }
4527
4528 /// Increment fork-flush bookkeeping and fire the fragment warn
4529 /// once if the threshold is crossed.
4530 ///
4531 /// Each flush typically appends ~1 fragment per touched dataset on
4532 /// the fork's branches; without compaction (deferred to Phase 5)
4533 /// long-lived heavy-write forks degrade. The flush count is a
4534 /// proxy for actual fragment growth — reading
4535 /// `Dataset::manifest().fragments.len()` per dataset would add a
4536 /// per-flush object-store roundtrip on the hot commit path, which
4537 /// is too costly for a purely observational guard rail.
4538 ///
4539 /// No-op for primary writers (`fork_id == None`).
4540 #[allow(dead_code)] // called by tests; production path uses _static
4541 pub(crate) fn tick_fork_fragment_observability(&self) {
4542 Self::tick_fork_fragment_observability_static(
4543 self.fork_id,
4544 self.fork_flush_count.clone(),
4545 self.fork_fragment_warn_fired.clone(),
4546 self.config.fork_fragment_warn_threshold,
4547 );
4548 }
4549
4550 /// Static variant of [`Writer::tick_fork_fragment_observability`].
4551 /// Used by the async-flush finalize path, where we hold a
4552 /// [`SharedFlushCtx`] bundle of Arcs rather than `&Writer`.
4553 pub(crate) fn tick_fork_fragment_observability_static(
4554 fork_id: Option<ForkId>,
4555 fork_flush_count: Arc<AtomicU64>,
4556 fork_fragment_warn_fired: Arc<AtomicBool>,
4557 warn_threshold: usize,
4558 ) {
4559 let Some(fork_id) = fork_id else { return };
4560 // `Relaxed` is sufficient: observational counter, no synchronizes-with.
4561 let new_count = fork_flush_count.fetch_add(1, Ordering::Relaxed) + 1;
4562 let fork_label = fork_id.to_string();
4563 metrics::gauge!(
4564 "uni_fork_l1_flushes",
4565 "fork" => fork_label.clone(),
4566 )
4567 .set(new_count as f64);
4568 let threshold = warn_threshold as u64;
4569 if !fork_fragment_warn_fired.load(Ordering::Relaxed)
4570 && threshold > 0
4571 && new_count >= threshold
4572 {
4573 fork_fragment_warn_fired.store(true, Ordering::Relaxed);
4574 tracing::warn!(
4575 fork = %fork_label,
4576 flush_count = new_count,
4577 threshold,
4578 "fork has exceeded the L1 flush-count threshold; \
4579 fork compaction is deferred to Phase 5 — consider \
4580 drop+recreate or promotion to bound fragment growth"
4581 );
4582 }
4583 }
4584
4585 /// Check rebuild thresholds and schedule background index rebuilds for
4586 /// labels that exceed growth or age limits. Marks affected indexes as
4587 /// `Stale` and spawns an async task to schedule the rebuild.
4588 #[allow(dead_code)] // production path uses _static; kept as the
4589 // documented instance entry point.
4590 fn schedule_index_rebuilds_if_needed(
4591 &self,
4592 manifest: &SnapshotManifest,
4593 rebuild_mgr: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
4594 ) {
4595 Self::schedule_index_rebuilds_if_needed_static(
4596 manifest,
4597 rebuild_mgr,
4598 self.schema_manager.clone(),
4599 self.config.index_rebuild.clone(),
4600 );
4601 }
4602
4603 /// Static variant of [`Writer::schedule_index_rebuilds_if_needed`].
4604 /// Used by the async-flush finalize path, where we hold the
4605 /// [`SchemaManager`] via `SharedFlushCtx` rather than `&Writer`.
4606 pub(crate) fn schedule_index_rebuilds_if_needed_static(
4607 manifest: &SnapshotManifest,
4608 rebuild_mgr: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
4609 schema_manager: Arc<uni_common::core::schema::SchemaManager>,
4610 index_rebuild_config: uni_common::config::IndexRebuildConfig,
4611 ) {
4612 let checker =
4613 crate::storage::index_rebuild::RebuildTriggerChecker::new(index_rebuild_config);
4614 let schema = schema_manager.schema();
4615 let labels = checker.labels_needing_rebuild(manifest, &schema.indexes);
4616
4617 if labels.is_empty() {
4618 return;
4619 }
4620
4621 // Mark affected indexes as Stale
4622 for label in &labels {
4623 for idx in &schema.indexes {
4624 if idx.label() == label {
4625 let _ = schema_manager.update_index_metadata(idx.name(), |m| {
4626 m.status = uni_common::core::schema::IndexStatus::Stale;
4627 });
4628 }
4629 }
4630 }
4631
4632 tokio::spawn(async move {
4633 if let Err(e) = rebuild_mgr.schedule(labels).await {
4634 tracing::warn!("Failed to schedule index rebuild: {e}");
4635 }
4636 });
4637 }
4638}
4639
4640/// `FinalizeFn` implementation that the `FlushCoordinator` invokes from
4641/// its single-task finalizer loop. Unit struct on purpose: it must NOT
4642/// hold `Arc<Writer>` (that would create a reference cycle Writer ->
4643/// FlushCoordinator -> Arc<dyn FinalizeFn> -> Writer). All state needed
4644/// for finalize travels in via `SharedFlushCtx`.
4645pub(crate) struct WriterFinalizer;
4646
4647impl FinalizeFn for WriterFinalizer {
4648 fn finalize<'a>(
4649 &'a self,
4650 rotated: RotatedFlush,
4651 outcome: AsyncFlushOutcome,
4652 shared: SharedFlushCtx,
4653 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<String>> + Send + 'a>> {
4654 Box::pin(async move {
4655 // Read initial_size / initial_count from the rotated L0 so
4656 // we don't have to plumb them through the coordinator
4657 // submission. The buffer is still alive in pending_flush
4658 // until `complete_flush` (J) below pops it.
4659 let (initial_size, initial_count) = {
4660 let l0 = rotated.old_l0_arc.read();
4661 (l0.estimated_size, l0.mutation_count)
4662 };
4663 let result = Writer::flush_finalize_now(
4664 shared,
4665 rotated.old_l0_arc.clone(),
4666 rotated.wal_lsn,
4667 outcome.new_manifest,
4668 outcome.snapshot_id,
4669 initial_size,
4670 initial_count,
4671 std::time::Instant::now(),
4672 )
4673 .await;
4674 // `rotated` (permit + flush_in_progress_guard) drops here.
4675 drop(rotated.permit);
4676 result
4677 })
4678 }
4679
4680 fn finalize_failure<'a>(
4681 &'a self,
4682 rotated: RotatedFlush,
4683 err: anyhow::Error,
4684 _shared: SharedFlushCtx,
4685 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Error> + Send + 'a>> {
4686 Box::pin(async move {
4687 tracing::warn!(
4688 error = %err,
4689 seq = rotated.seq,
4690 "async flush stream failed; old L0 remains in pending_flush, \
4691 WAL retains its data, recovery via WAL replay on restart"
4692 );
4693 metrics::counter!("uni_flush_failures_total").increment(1);
4694 // Permit + guard drop here so back-pressure releases even on
4695 // failure.
4696 drop(rotated.permit);
4697 err
4698 })
4699 }
4700}
4701
4702#[cfg(test)]
4703mod tests {
4704 use super::*;
4705 use tempfile::tempdir;
4706
4707 /// Test that commit_transaction writes mutations to WAL before merging to main L0.
4708 /// This verifies fix for issue #137 (transaction commit atomicity).
4709 #[tokio::test]
4710 async fn test_commit_transaction_wal_before_merge() -> Result<()> {
4711 use crate::runtime::wal::WriteAheadLog;
4712 use crate::storage::manager::StorageManager;
4713 use object_store::local::LocalFileSystem;
4714 use object_store::path::Path as ObjectStorePath;
4715 use uni_common::core::schema::SchemaManager;
4716
4717 let dir = tempdir()?;
4718 let path = dir.path().to_str().unwrap();
4719 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
4720 let schema_path = ObjectStorePath::from("schema.json");
4721
4722 let schema_manager =
4723 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
4724 let _label_id = schema_manager.add_label("Test")?;
4725 schema_manager.save().await?;
4726
4727 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
4728
4729 // Create WAL for main L0
4730 let wal_path = ObjectStorePath::from("wal");
4731 let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
4732
4733 let writer = Writer::new_with_config(
4734 storage.clone(),
4735 schema_manager.clone(),
4736 1,
4737 UniConfig::default(),
4738 Some(wal),
4739 None,
4740 )
4741 .await?;
4742
4743 // Begin transaction — create a transaction L0
4744 let tx_l0 = writer.create_transaction_l0();
4745
4746 // Insert data in transaction
4747 let vid_a = writer.next_vid().await?;
4748 let vid_b = writer.next_vid().await?;
4749
4750 let mut props = std::collections::HashMap::new();
4751 props.insert("test".to_string(), Value::String("data".to_string()));
4752
4753 writer
4754 .insert_vertex_with_labels(vid_a, props.clone(), &["Test".to_string()], Some(&tx_l0))
4755 .await?;
4756 writer
4757 .insert_vertex_with_labels(
4758 vid_b,
4759 std::collections::HashMap::new(),
4760 &["Test".to_string()],
4761 Some(&tx_l0),
4762 )
4763 .await?;
4764
4765 let eid = writer.next_eid(1).await?;
4766 writer
4767 .insert_edge(
4768 vid_a,
4769 vid_b,
4770 1,
4771 eid,
4772 std::collections::HashMap::new(),
4773 None,
4774 Some(&tx_l0),
4775 )
4776 .await?;
4777
4778 // Get WAL before commit
4779 let l0 = writer.l0_manager.get_current();
4780 let wal = l0.read().wal.clone().expect("Main L0 should have WAL");
4781 let mutations_before = wal.replay().await?;
4782 let count_before = mutations_before.len();
4783
4784 // Commit transaction - this should write to WAL first
4785 let writer = Arc::new(writer);
4786 writer.commit_transaction_l0(tx_l0).await?;
4787
4788 // Verify WAL has the new mutations
4789 let mutations_after = wal.replay().await?;
4790 assert!(
4791 mutations_after.len() > count_before,
4792 "WAL should contain transaction mutations after commit"
4793 );
4794
4795 // Verify mutations are in correct order: vertices first, then edges
4796 let new_mutations: Vec<_> = mutations_after.into_iter().skip(count_before).collect();
4797
4798 let mut saw_vertex_a = false;
4799 let mut saw_vertex_b = false;
4800 let mut saw_edge = false;
4801
4802 for mutation in &new_mutations {
4803 match mutation {
4804 crate::runtime::wal::Mutation::InsertVertex { vid, .. } => {
4805 if *vid == vid_a {
4806 saw_vertex_a = true;
4807 }
4808 if *vid == vid_b {
4809 saw_vertex_b = true;
4810 }
4811 // Vertices should come before edges
4812 assert!(!saw_edge, "Vertices should be logged to WAL before edges");
4813 }
4814 crate::runtime::wal::Mutation::InsertEdge { eid: e, .. } => {
4815 if *e == eid {
4816 saw_edge = true;
4817 }
4818 // Edges should come after vertices
4819 assert!(
4820 saw_vertex_a && saw_vertex_b,
4821 "Edge should be logged after both vertices"
4822 );
4823 }
4824 _ => {}
4825 }
4826 }
4827
4828 assert!(saw_vertex_a, "Vertex A should be in WAL");
4829 assert!(saw_vertex_b, "Vertex B should be in WAL");
4830 assert!(saw_edge, "Edge should be in WAL");
4831
4832 // Verify data is also in main L0
4833 let l0_read = l0.read();
4834 assert!(
4835 l0_read.vertex_properties.contains_key(&vid_a),
4836 "Vertex A should be in main L0"
4837 );
4838 assert!(
4839 l0_read.vertex_properties.contains_key(&vid_b),
4840 "Vertex B should be in main L0"
4841 );
4842 assert!(
4843 l0_read.edge_endpoints.contains_key(&eid),
4844 "Edge should be in main L0"
4845 );
4846
4847 Ok(())
4848 }
4849
4850 /// Test that failed WAL flush leaves transaction intact for retry or rollback.
4851 #[tokio::test]
4852 async fn test_commit_transaction_wal_failure_rollback() -> Result<()> {
4853 use crate::runtime::wal::WriteAheadLog;
4854 use crate::storage::manager::StorageManager;
4855 use object_store::local::LocalFileSystem;
4856 use object_store::path::Path as ObjectStorePath;
4857 use uni_common::core::schema::SchemaManager;
4858
4859 let dir = tempdir()?;
4860 let path = dir.path().to_str().unwrap();
4861 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
4862 let schema_path = ObjectStorePath::from("schema.json");
4863
4864 let schema_manager =
4865 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
4866 let _label_id = schema_manager.add_label("Test")?;
4867 let _baseline_label_id = schema_manager.add_label("Baseline")?;
4868 let _txdata_label_id = schema_manager.add_label("TxData")?;
4869 schema_manager.save().await?;
4870
4871 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
4872
4873 // Create WAL for main L0
4874 let wal_path = ObjectStorePath::from("wal");
4875 let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
4876
4877 let writer = Writer::new_with_config(
4878 storage.clone(),
4879 schema_manager.clone(),
4880 1,
4881 UniConfig::default(),
4882 Some(wal),
4883 None,
4884 )
4885 .await?;
4886
4887 // Insert baseline data (outside transaction)
4888 let baseline_vid = writer.next_vid().await?;
4889 writer
4890 .insert_vertex_with_labels(
4891 baseline_vid,
4892 [("baseline".to_string(), Value::Bool(true))]
4893 .into_iter()
4894 .collect(),
4895 &["Baseline".to_string()],
4896 None,
4897 )
4898 .await?;
4899
4900 // Begin transaction — create a transaction L0
4901 let tx_l0 = writer.create_transaction_l0();
4902
4903 // Insert data in transaction
4904 let tx_vid = writer.next_vid().await?;
4905 writer
4906 .insert_vertex_with_labels(
4907 tx_vid,
4908 [("tx_data".to_string(), Value::Bool(true))]
4909 .into_iter()
4910 .collect(),
4911 &["TxData".to_string()],
4912 Some(&tx_l0),
4913 )
4914 .await?;
4915
4916 // Capture main L0 state before rollback
4917 let l0 = writer.l0_manager.get_current();
4918 let vertex_count_before = l0.read().vertex_properties.len();
4919
4920 // Rollback transaction (simulating what would happen after WAL flush failure)
4921 drop(tx_l0);
4922
4923 // Verify main L0 is unchanged
4924 let vertex_count_after = l0.read().vertex_properties.len();
4925 assert_eq!(
4926 vertex_count_before, vertex_count_after,
4927 "Main L0 should not change after rollback"
4928 );
4929
4930 // Baseline should still be present
4931 assert!(
4932 l0.read().vertex_properties.contains_key(&baseline_vid),
4933 "Baseline data should remain"
4934 );
4935
4936 // Transaction data should NOT be in main L0
4937 assert!(
4938 !l0.read().vertex_properties.contains_key(&tx_vid),
4939 "Transaction data should not be in main L0 after rollback"
4940 );
4941
4942 Ok(())
4943 }
4944
4945 /// Test that batch insert with shared labels does not clone labels per vertex.
4946 /// This verifies fix for issue #161 (redundant label cloning).
4947 #[tokio::test]
4948 async fn test_batch_insert_shared_labels() -> Result<()> {
4949 use crate::storage::manager::StorageManager;
4950 use object_store::local::LocalFileSystem;
4951 use object_store::path::Path as ObjectStorePath;
4952 use uni_common::core::schema::SchemaManager;
4953
4954 let dir = tempdir()?;
4955 let path = dir.path().to_str().unwrap();
4956 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
4957 let schema_path = ObjectStorePath::from("schema.json");
4958
4959 let schema_manager =
4960 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
4961 let _label_id = schema_manager.add_label("Person")?;
4962 schema_manager.save().await?;
4963
4964 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
4965
4966 let writer = Writer::new(storage.clone(), schema_manager.clone(), 1).await?;
4967
4968 // Shared labels - should not be cloned per vertex
4969 let labels = &["Person".to_string()];
4970
4971 // Insert batch of vertices with same labels
4972 let mut vids = Vec::new();
4973 for i in 0..100 {
4974 let vid = writer.next_vid().await?;
4975 let mut props = std::collections::HashMap::new();
4976 props.insert("id".to_string(), Value::Int(i));
4977 writer
4978 .insert_vertex_with_labels(vid, props, labels, None)
4979 .await?;
4980 vids.push(vid);
4981 }
4982
4983 // Verify all vertices have the correct labels
4984 let l0 = writer.l0_manager.get_current();
4985 for vid in vids {
4986 let l0_guard = l0.read();
4987 let vertex_labels = l0_guard.vertex_labels.get(&vid);
4988 assert!(vertex_labels.is_some(), "Vertex should have labels");
4989 assert_eq!(
4990 vertex_labels.unwrap(),
4991 &vec!["Person".to_string()],
4992 "Labels should match"
4993 );
4994 }
4995
4996 Ok(())
4997 }
4998
4999 /// Test that estimated_size tracks mutations correctly and approximates size_bytes().
5000 /// This verifies fix for issue #147 (O(V+E) size_bytes() in metrics).
5001 #[tokio::test]
5002 async fn test_estimated_size_tracks_mutations() -> Result<()> {
5003 use crate::storage::manager::StorageManager;
5004 use object_store::local::LocalFileSystem;
5005 use object_store::path::Path as ObjectStorePath;
5006 use uni_common::core::schema::SchemaManager;
5007
5008 let dir = tempdir()?;
5009 let path = dir.path().to_str().unwrap();
5010 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5011 let schema_path = ObjectStorePath::from("schema.json");
5012
5013 let schema_manager =
5014 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5015 let _label_id = schema_manager.add_label("Test")?;
5016 schema_manager.save().await?;
5017
5018 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5019
5020 let writer = Writer::new(storage.clone(), schema_manager.clone(), 1).await?;
5021
5022 let l0 = writer.l0_manager.get_current();
5023
5024 // Initial state should be empty
5025 let initial_estimated = l0.read().estimated_size;
5026 let initial_actual = l0.read().size_bytes();
5027 assert_eq!(initial_estimated, 0, "Initial estimated_size should be 0");
5028 assert_eq!(initial_actual, 0, "Initial size_bytes should be 0");
5029
5030 // Insert vertices with properties
5031 let mut vids = Vec::new();
5032 for i in 0..10 {
5033 let vid = writer.next_vid().await?;
5034 let mut props = std::collections::HashMap::new();
5035 props.insert("name".to_string(), Value::String(format!("vertex_{}", i)));
5036 props.insert("index".to_string(), Value::Int(i));
5037 writer
5038 .insert_vertex_with_labels(vid, props, &[], None)
5039 .await?;
5040 vids.push(vid);
5041 }
5042
5043 // Verify estimated_size grew
5044 let after_vertices_estimated = l0.read().estimated_size;
5045 let after_vertices_actual = l0.read().size_bytes();
5046 assert!(
5047 after_vertices_estimated > 0,
5048 "estimated_size should grow after insertions"
5049 );
5050
5051 // Verify estimated_size is within reasonable bounds of actual size (within 2x)
5052 let ratio = after_vertices_estimated as f64 / after_vertices_actual as f64;
5053 assert!(
5054 (0.5..=2.0).contains(&ratio),
5055 "estimated_size ({}) should be within 2x of size_bytes ({}), ratio: {}",
5056 after_vertices_estimated,
5057 after_vertices_actual,
5058 ratio
5059 );
5060
5061 // Insert edges with a simple edge type
5062 let edge_type = 1u32;
5063 for i in 0..9 {
5064 let eid = writer.next_eid(edge_type).await?;
5065 writer
5066 .insert_edge(
5067 vids[i],
5068 vids[i + 1],
5069 edge_type,
5070 eid,
5071 std::collections::HashMap::new(),
5072 Some("NEXT".to_string()),
5073 None,
5074 )
5075 .await?;
5076 }
5077
5078 // Verify estimated_size grew further
5079 let after_edges_estimated = l0.read().estimated_size;
5080 let after_edges_actual = l0.read().size_bytes();
5081 assert!(
5082 after_edges_estimated > after_vertices_estimated,
5083 "estimated_size should grow after edge insertions"
5084 );
5085
5086 // Verify still within reasonable bounds
5087 let ratio = after_edges_estimated as f64 / after_edges_actual as f64;
5088 assert!(
5089 (0.5..=2.0).contains(&ratio),
5090 "estimated_size ({}) should be within 2x of size_bytes ({}), ratio: {}",
5091 after_edges_estimated,
5092 after_edges_actual,
5093 ratio
5094 );
5095
5096 Ok(())
5097 }
5098
5099 /// Test that flushing WAL on a writer with no mutations succeeds cleanly.
5100 #[tokio::test]
5101 async fn test_flush_wal_empty_l0_is_noop() -> Result<()> {
5102 use crate::runtime::wal::WriteAheadLog;
5103 use crate::storage::manager::StorageManager;
5104 use object_store::local::LocalFileSystem;
5105 use object_store::path::Path as ObjectStorePath;
5106 use uni_common::core::schema::SchemaManager;
5107
5108 let dir = tempdir()?;
5109 let path = dir.path().to_str().unwrap();
5110 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5111 let schema_path = ObjectStorePath::from("schema.json");
5112
5113 let schema_manager =
5114 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5115 schema_manager.save().await?;
5116
5117 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5118
5119 let wal_path = ObjectStorePath::from("wal");
5120 let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
5121
5122 let writer = Writer::new_with_config(
5123 storage.clone(),
5124 schema_manager.clone(),
5125 1,
5126 UniConfig::default(),
5127 Some(wal.clone()),
5128 None,
5129 )
5130 .await?;
5131
5132 // Flush with no mutations — should succeed cleanly
5133 let lsn = writer.flush_wal().await?;
5134 // LSN should be 0 or 1 (no real mutations flushed)
5135 assert!(lsn <= 1, "Empty flush should produce low LSN, got {}", lsn);
5136
5137 Ok(())
5138 }
5139
5140 /// Test that transaction data does not leak into main L0 without commit.
5141 #[tokio::test]
5142 async fn test_transaction_isolation_without_commit() -> Result<()> {
5143 use crate::runtime::wal::WriteAheadLog;
5144 use crate::storage::manager::StorageManager;
5145 use object_store::local::LocalFileSystem;
5146 use object_store::path::Path as ObjectStorePath;
5147 use uni_common::core::schema::SchemaManager;
5148
5149 let dir = tempdir()?;
5150 let path = dir.path().to_str().unwrap();
5151 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5152 let schema_path = ObjectStorePath::from("schema.json");
5153
5154 let schema_manager =
5155 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5156 let _label_id = schema_manager.add_label("Person")?;
5157 schema_manager.save().await?;
5158
5159 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5160
5161 let wal_path = ObjectStorePath::from("wal");
5162 let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
5163
5164 let writer = Writer::new_with_config(
5165 storage.clone(),
5166 schema_manager.clone(),
5167 1,
5168 UniConfig::default(),
5169 Some(wal),
5170 None,
5171 )
5172 .await?;
5173
5174 // Create transaction L0
5175 let tx_l0 = writer.create_transaction_l0();
5176
5177 // Insert vertex into transaction L0
5178 let vid = writer.next_vid().await?;
5179 writer
5180 .insert_vertex_with_labels(
5181 vid,
5182 [("name".to_string(), Value::String("Ghost".to_string()))]
5183 .into_iter()
5184 .collect(),
5185 &["Person".to_string()],
5186 Some(&tx_l0),
5187 )
5188 .await?;
5189
5190 // Verify data is in transaction L0
5191 assert!(
5192 tx_l0.read().vertex_properties.contains_key(&vid),
5193 "Transaction L0 should contain the vertex"
5194 );
5195
5196 // Verify data is NOT in main L0
5197 let main_l0 = writer.l0_manager.get_current();
5198 assert!(
5199 !main_l0.read().vertex_properties.contains_key(&vid),
5200 "Main L0 should NOT contain uncommitted transaction data"
5201 );
5202
5203 // Drop transaction without committing — data should be lost
5204 drop(tx_l0);
5205
5206 // Main L0 still should not have it
5207 assert!(
5208 !main_l0.read().vertex_properties.contains_key(&vid),
5209 "Main L0 should remain clean after dropped transaction"
5210 );
5211
5212 Ok(())
5213 }
5214
5215 /// Phase 2 Day 12: the fork-fragment warn fires exactly once when
5216 /// the flush count crosses the configured threshold and stays
5217 /// silent on subsequent flushes for the lifetime of the writer.
5218 /// Primary writers (`fork_id == None`) never fire it.
5219 ///
5220 /// Tested directly against `tick_fork_fragment_observability` so
5221 /// the contract is locked in independently of the broader
5222 /// `flush_to_l1` path (the end-to-end fork-flush path is blocked
5223 /// on Day 10's on-the-fly schema overlay growth).
5224 #[tokio::test]
5225 async fn fork_fragment_warn_fires_once_then_silences() -> Result<()> {
5226 use crate::storage::manager::StorageManager;
5227 use object_store::local::LocalFileSystem;
5228 use object_store::path::Path as ObjectStorePath;
5229 use uni_common::core::fork::ForkId;
5230 use uni_common::core::schema::SchemaManager;
5231
5232 let dir = tempdir()?;
5233 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5234 let schema_path = ObjectStorePath::from("schema.json");
5235 let schema_manager =
5236 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5237 let storage = Arc::new(
5238 StorageManager::new(dir.path().to_str().unwrap(), schema_manager.clone()).await?,
5239 );
5240
5241 let config = UniConfig {
5242 fork_fragment_warn_threshold: 3,
5243 ..Default::default()
5244 };
5245 let mut writer =
5246 Writer::new_with_config(storage, schema_manager, 1, config, None, None).await?;
5247
5248 // Primary path: never fires.
5249 for _ in 0..10 {
5250 writer.tick_fork_fragment_observability();
5251 }
5252 assert!(!writer.fork_fragment_warn_fired.load(Ordering::Relaxed));
5253 assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 0);
5254
5255 // Fork path: tag and tick. Below threshold → no fire.
5256 writer.fork_id = Some(ForkId::new());
5257 writer.tick_fork_fragment_observability();
5258 writer.tick_fork_fragment_observability();
5259 assert!(!writer.fork_fragment_warn_fired.load(Ordering::Relaxed));
5260 assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 2);
5261
5262 // Crossing threshold → fires once.
5263 writer.tick_fork_fragment_observability();
5264 assert!(writer.fork_fragment_warn_fired.load(Ordering::Relaxed));
5265 assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 3);
5266
5267 // Subsequent ticks bump the gauge but do not re-fire.
5268 let fired_after = writer.fork_fragment_warn_fired.load(Ordering::Relaxed);
5269 for _ in 0..5 {
5270 writer.tick_fork_fragment_observability();
5271 }
5272 assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 8);
5273 assert_eq!(
5274 writer.fork_fragment_warn_fired.load(Ordering::Relaxed),
5275 fired_after
5276 );
5277
5278 Ok(())
5279 }
5280
5281 /// The hot-path mutators must not write to any `Writer` struct field.
5282 /// Phase 2 of the refactor
5283 /// gave them `&self` receivers, which the compiler enforces against
5284 /// direct `self.x = y` assignment — but interior-mutable writes
5285 /// (Mutex/Atomic/OnceLock) still compile. This regression test snapshots
5286 /// every potentially-writable field, calls each hot-path mutator, and
5287 /// asserts no field changed.
5288 ///
5289 /// Cold-path methods (`flush_to_l1`, `commit_transaction_l0`,
5290 /// `tick_fork_fragment_observability`) DO mutate fields by design and
5291 /// are intentionally out of scope here.
5292 #[tokio::test]
5293 async fn hot_path_mutators_do_not_change_writer_fields() -> Result<()> {
5294 use crate::storage::manager::StorageManager;
5295 use object_store::local::LocalFileSystem;
5296 use object_store::path::Path as ObjectStorePath;
5297 use uni_common::core::schema::SchemaManager;
5298
5299 let dir = tempdir()?;
5300 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5301 let schema_path = ObjectStorePath::from("schema.json");
5302 let schema_manager =
5303 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5304 schema_manager.add_label("Person")?;
5305 schema_manager.save().await?;
5306 let storage = Arc::new(
5307 StorageManager::new(dir.path().to_str().unwrap(), schema_manager.clone()).await?,
5308 );
5309
5310 let writer =
5311 Writer::new_with_config(storage, schema_manager, 1, UniConfig::default(), None, None)
5312 .await?;
5313
5314 /// Captures every `Writer` field that *could* be written by a
5315 /// hot-path mutator (i.e., every non-Arc, non-immutable-after-
5316 /// construction field). Arc'd substructures (`l0_manager`,
5317 /// `storage`, etc.) are intentionally not checked — they are
5318 /// re-pointed only at construction.
5319 #[derive(Debug, PartialEq)]
5320 struct Snapshot {
5321 last_flush_time: std::time::Instant,
5322 cached_manifest_some: bool,
5323 fork_flush_count: u64,
5324 fork_fragment_warn_fired: bool,
5325 xervo_runtime_some: bool,
5326 index_rebuild_manager_some: bool,
5327 fork_id: Option<ForkId>,
5328 }
5329
5330 fn snap(w: &Writer) -> Snapshot {
5331 Snapshot {
5332 last_flush_time: *w.last_flush_time.lock(),
5333 cached_manifest_some: w.cached_manifest.lock().is_some(),
5334 fork_flush_count: w.fork_flush_count.load(Ordering::Relaxed),
5335 fork_fragment_warn_fired: w.fork_fragment_warn_fired.load(Ordering::Relaxed),
5336 xervo_runtime_some: w.xervo_runtime.get().is_some(),
5337 index_rebuild_manager_some: w.index_rebuild_manager.get().is_some(),
5338 fork_id: w.fork_id,
5339 }
5340 }
5341
5342 // 1. insert_vertex_with_labels
5343 let before = snap(&writer);
5344 let vid = writer.next_vid().await?;
5345 writer
5346 .insert_vertex_with_labels(vid, Properties::new(), &["Person".to_string()], None)
5347 .await?;
5348 assert_eq!(
5349 snap(&writer),
5350 before,
5351 "insert_vertex_with_labels mutated a Writer field"
5352 );
5353
5354 // 2. insert_vertices_batch
5355 let before = snap(&writer);
5356 let vids = writer.allocate_vids(2).await?;
5357 writer
5358 .insert_vertices_batch(
5359 vids,
5360 vec![Properties::new(), Properties::new()],
5361 vec!["Person".into()],
5362 None,
5363 )
5364 .await?;
5365 assert_eq!(
5366 snap(&writer),
5367 before,
5368 "insert_vertices_batch mutated a Writer field"
5369 );
5370
5371 // 3. delete_vertex
5372 let before = snap(&writer);
5373 writer.delete_vertex(vid, None, None).await?;
5374 assert_eq!(
5375 snap(&writer),
5376 before,
5377 "delete_vertex mutated a Writer field"
5378 );
5379
5380 // (insert_edge / delete_edge are skipped here: their fixture cost is
5381 // disproportionate to the audit's marginal value, and the same
5382 // structural argument plus the compiler-enforced `&self` covers them.)
5383
5384 Ok(())
5385 }
5386}