uni_store/runtime/writer.rs
1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::runtime::context::QueryContext;
5use crate::runtime::flush_coordinator::{
6 FinalizeFn, FlushCoordinator, FlushOutcome as AsyncFlushOutcome, RotatedFlush, SharedFlushCtx,
7};
8use crate::runtime::id_allocator::IdAllocator;
9use crate::runtime::l0::{L0Buffer, serialize_constraint_key};
10use crate::runtime::l0_manager::L0Manager;
11use crate::runtime::property_manager::PropertyManager;
12use crate::runtime::wal::WriteAheadLog;
13use crate::storage::adjacency_manager::AdjacencyManager;
14use crate::storage::delta::{L1Entry, Op};
15use crate::storage::main_edge::MainEdgeDataset;
16use crate::storage::main_vertex::MainVertexDataset;
17use crate::storage::manager::StorageManager;
18use anyhow::{Result, anyhow};
19use chrono::Utc;
20use metrics;
21use parking_lot::{Mutex as PlMutex, RwLock};
22use std::collections::{HashMap, HashSet};
23use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
24use std::sync::{Arc, OnceLock};
25use tracing::{debug, info, instrument};
26use uni_common::Properties;
27use uni_common::Value;
28use uni_common::config::UniConfig;
29use uni_common::core::fork::ForkId;
30use uni_common::core::id::{Eid, Vid};
31use uni_common::core::schema::{ConstraintTarget, ConstraintType, IndexDefinition};
32use uni_common::core::snapshot::{EdgeSnapshot, LabelSnapshot, SnapshotManifest};
33use uni_xervo::runtime::ModelRuntime;
34use uuid::Uuid;
35
36#[derive(Clone, Debug)]
37pub struct WriterConfig {
38 pub max_mutations: usize,
39 /// Enable the partial-column MergeInsert path for SET-only flushes.
40 ///
41 /// When `true`, `Writer::insert_vertex_partial` records the touched
42 /// property keys into `L0Buffer::vertex_partial_keys` and the flush
43 /// routes those VIDs through Lance `MergeInsertBuilder` with a
44 /// subset-of-schema source, skipping the read of (and write of)
45 /// the unchanged columns — including wide ones like embeddings.
46 ///
47 /// When `false`, `insert_vertex_partial` falls back to the
48 /// read-modify-write `insert_vertex_with_labels` path (preserving
49 /// bit-for-bit equivalence with prior releases). Default `false`
50 /// for the first release; flip to `true` after telemetry on the
51 /// issue #72 ingest workload confirms the win.
52 ///
53 /// See the soundness probe at
54 /// `crates/uni-store/tests/common/storage/lance_merge_insert_probe.rs`.
55 pub partial_lance_writes: bool,
56}
57
58impl Default for WriterConfig {
59 fn default() -> Self {
60 Self {
61 max_mutations: 10_000,
62 partial_lance_writes: false,
63 }
64 }
65}
66
67/// RAII latch on [`StorageManager::flush_in_progress`].
68///
69/// Sets the flag to `true` on construction (via CAS) and back to `false` on
70/// drop, so any `?` early-exit inside `flush_to_l1` cannot leave the flag
71/// stuck. Returns `None` if a flush is already in progress, providing
72/// forward-compatible exclusion once the outer writer-RwLock is removed in
73/// Phase 4 of the concurrent-writer refactor.
74// FlushInProgressGuard moved to storage/manager.rs so flush_coordinator.rs
75// can hold it on RotatedFlush without a writer.rs back-import cycle.
76pub use crate::storage::manager::FlushInProgressGuard;
77
78/// Output of [`Writer::flush_l0_rotate`]: the to-be-flushed L0 buffer,
79/// captured WAL LSN, current_version, and the in-progress guard whose
80/// lifetime spans the full flush (including the future async stream
81/// phase that runs on a spawned task).
82struct RotateOutput {
83 old_l0_arc: Arc<RwLock<L0Buffer>>,
84 wal_lsn: u64,
85 current_version: u64,
86 flush_in_progress_guard: FlushInProgressGuard,
87}
88
89/// Project a property map to a subset selected by `keys`. Used to
90/// run `touched_needs_full_read` against just the SET-touched keys
91/// when the caller passes a fully-merged `props` map.
92fn props_subset(props: &Properties, keys: &HashSet<String>) -> Properties {
93 let mut out = Properties::new();
94 for k in keys {
95 if let Some(v) = props.get(k) {
96 out.insert(k.clone(), v.clone());
97 }
98 }
99 out
100}
101
102/// Output of [`Writer::flush_stream_l1`]: the built (but not yet
103/// published) snapshot manifest and its id. Finalize is responsible
104/// for `save_snapshot` + `set_latest_snapshot` + `cached_manifest`
105/// update.
106struct FlushOutcome {
107 manifest: SnapshotManifest,
108 snapshot_id: String,
109}
110
111pub struct Writer {
112 pub l0_manager: Arc<L0Manager>,
113 pub storage: Arc<StorageManager>,
114 pub schema_manager: Arc<uni_common::core::schema::SchemaManager>,
115 pub allocator: Arc<IdAllocator>,
116 pub config: UniConfig,
117 /// Optional embedding runtime. `OnceLock` so the initializer can run
118 /// on `&self` after the `Writer` has been wrapped in `Arc<Writer>`
119 /// (Phase 4 of concurrent_writer.md). Read through
120 /// [`Writer::xervo_runtime`] — the field itself is private to keep
121 /// callers oblivious to the OnceLock representation.
122 xervo_runtime: OnceLock<Arc<ModelRuntime>>,
123 /// Property manager for cache invalidation after flush
124 pub property_manager: Option<Arc<PropertyManager>>,
125 /// Adjacency manager for dual-write (edges survive flush).
126 adjacency_manager: Arc<AdjacencyManager>,
127 /// Timestamp of last flush or creation. Interior-mutable so that
128 /// `&self` callers can update it; uncontended in practice because all
129 /// writes happen inside the single-flusher critical section.
130 /// Arc-wrapped so it can travel into the SharedFlushCtx that the
131 /// async-flush coordinator passes to spawned stream/finalize tasks.
132 last_flush_time: Arc<PlMutex<std::time::Instant>>,
133 /// Background compaction task handle (prevents concurrent compaction races)
134 compaction_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
135 /// Optional index rebuild manager for post-flush automatic rebuild scheduling.
136 /// `OnceLock` for the same reason as `xervo_runtime`.
137 /// Wrapped in `Arc` so the async-flush finalize path can read it
138 /// from a spawned task via `SharedFlushCtx`.
139 index_rebuild_manager: Arc<OnceLock<Arc<crate::storage::index_rebuild::IndexRebuildManager>>>,
140 /// Cached snapshot manifest from the last flush. Avoids re-reading from
141 /// object store on every flush_to_l1 call. Wrapped in a `Mutex` for
142 /// `&self` access; uncontended because all access is inside the
143 /// single-flusher critical section.
144 cached_manifest: Arc<PlMutex<Option<SnapshotManifest>>>,
145 /// Identifier of the fork this writer serves, if any. `None` for
146 /// primary's writer. Set by [`crate::fork::writer_factory::new_for_fork`]
147 /// and read in `flush_to_l1` to emit fork-tagged metrics and to fire
148 /// the fragment-count guard rail (Phase 2 Day 12).
149 pub fork_id: Option<ForkId>,
150 /// Number of `flush_to_l1` calls since this writer was constructed.
151 /// Used as a proxy for L1 fragment growth on the fork's branches:
152 /// each flush typically appends ~1 fragment per touched dataset, so
153 /// the count tracks the order of magnitude of fragment accumulation.
154 /// Reading the actual `Dataset::manifest().fragments.len()` per
155 /// flush would add a per-dataset object-store roundtrip on the hot
156 /// commit path; the proxy keeps the guard rail purely observational
157 /// (Phase 5 introduces fork compaction proper). Only meaningful when
158 /// `fork_id.is_some()`. `Relaxed` is sufficient — observational only.
159 fork_flush_count: Arc<AtomicU64>,
160 /// Whether the fork-fragment warning has already fired at the
161 /// configured threshold. One-shot per writer lifetime. `Relaxed` is
162 /// sufficient — observational only.
163 fork_fragment_warn_fired: Arc<AtomicBool>,
164 /// Dedicated lock for the genuinely-exclusive flush path. Acquired by
165 /// the [`Writer::flush_to_l1`] entry and by `commit_transaction_l0`
166 /// across its WAL-append + L0-merge window. Replaces the outer
167 /// `Arc<RwLock<Writer>>` for flush exclusion once Phase 4 drops it.
168 /// Arc-wrapped so async-flush coordinator's finalize path can
169 /// re-acquire it from a spawned task via SharedFlushCtx.
170 flush_lock: Arc<tokio::sync::Mutex<()>>,
171 /// Coordinator for async-flush pipeline. Owns the back-pressure
172 /// semaphore, rotate-order sequence, single-finalizer task, and
173 /// pending-flush counter. Always present even when async flush is
174 /// disabled — the sync `flush_to_l1` path uses it for the future
175 /// `FlushInProgressGuard`/permit ownership model.
176 /// Coordinator is `None` when `async_flush_enabled = false`. The
177 /// coordinator's finalizer task captures `SharedFlushCtx` which
178 /// includes `Arc<StorageManager>`; on a fork-scoped Writer that
179 /// also pins the fork's `ForkScope` via `storage.fork_scope`, so
180 /// the holder count never drops. Constructing it only when the
181 /// feature is actually on avoids that side-effect for all
182 /// existing sync-flush paths. When async-flush graduates from
183 /// opt-in to default (Commit 12), `drop_fork` (Commit 8) handles
184 /// the drain explicitly.
185 #[allow(dead_code)] // first production use lands in Commit 6/7
186 pub(crate) flush_coordinator: Option<Arc<crate::runtime::flush_coordinator::FlushCoordinator>>,
187 /// Optimistic-concurrency commit-sequence counter (SSI). Incremented once
188 /// per successful commit under `flush_lock`; a transaction captures the
189 /// current value at begin as its read sequence (`L0Buffer::occ_read_seq`).
190 ///
191 /// Always allocated; consulted only when `config.ssi_enabled` is `true`.
192 ///
193 /// Typed through the [`crate::runtime::sync`] shim so the OCC commit core can
194 /// be model-checked under loom/shuttle; aliases to `std::AtomicU64` normally.
195 commit_sequence: Arc<crate::runtime::sync::AtomicU64>,
196 /// Bounded log of recently-committed write-sets for OCC conflict detection.
197 /// Read and updated only under `flush_lock`.
198 ///
199 /// Always allocated; consulted only when `config.ssi_enabled` is `true`.
200 committed_writes: Arc<PlMutex<crate::runtime::occ::CommitRegistry>>,
201 /// Per-row pessimistic locks for `FOR UPDATE` (SSI escape hatch), keyed by
202 /// canonical (label, key-props) bytes. A transaction holds the lock from
203 /// MATCH until commit/rollback, serializing concurrent `FOR UPDATE` writers
204 /// on the same key (avoiding optimistic abort-retry on hot keys).
205 ///
206 /// Always allocated; populated only when `config.ssi_enabled` is `true`.
207 for_update_locks: Arc<dashmap::DashMap<Vec<u8>, Arc<tokio::sync::Mutex<()>>>>,
208}
209
210/// Number of recent commits retained for OCC conflict detection. Large enough
211/// that under-run — and the resulting conservative abort — is rare in practice;
212/// each entry is a small set of touched ids.
213const OCC_REGISTRY_CAPACITY: usize = 4096;
214
215impl Writer {
216 pub async fn new(
217 storage: Arc<StorageManager>,
218 schema_manager: Arc<uni_common::core::schema::SchemaManager>,
219 start_version: u64,
220 ) -> Result<Self> {
221 Self::new_with_config(
222 storage,
223 schema_manager,
224 start_version,
225 UniConfig::default(),
226 None,
227 None,
228 )
229 .await
230 }
231
232 pub async fn new_with_config(
233 storage: Arc<StorageManager>,
234 schema_manager: Arc<uni_common::core::schema::SchemaManager>,
235 start_version: u64,
236 config: UniConfig,
237 wal: Option<Arc<WriteAheadLog>>,
238 allocator: Option<Arc<IdAllocator>>,
239 ) -> Result<Self> {
240 let allocator = if let Some(a) = allocator {
241 a
242 } else {
243 let store = storage.store();
244 let path = object_store::path::Path::from("id_allocator.json");
245 Arc::new(IdAllocator::new(store, path, 1000).await?)
246 };
247
248 let l0_manager = Arc::new(L0Manager::new(start_version, wal));
249
250 let property_manager = Some(Arc::new(PropertyManager::new(
251 storage.clone(),
252 schema_manager.clone(),
253 1000,
254 )));
255
256 let adjacency_manager = storage.adjacency_manager();
257
258 // Hoist the Arc'd fields so we can both stash them on Writer and
259 // hand the same Arcs to the SharedFlushCtx that FlushCoordinator
260 // captures. Single-source-of-truth for each piece of mutable
261 // shared state.
262 let last_flush_time = Arc::new(PlMutex::new(std::time::Instant::now()));
263 let cached_manifest = Arc::new(PlMutex::new(None));
264 let fork_flush_count = Arc::new(AtomicU64::new(0));
265 let fork_fragment_warn_fired = Arc::new(AtomicBool::new(false));
266 let flush_lock = Arc::new(tokio::sync::Mutex::new(()));
267 let compaction_handle = Arc::new(RwLock::new(None));
268 let index_rebuild_manager: Arc<
269 OnceLock<Arc<crate::storage::index_rebuild::IndexRebuildManager>>,
270 > = Arc::new(OnceLock::new());
271
272 let flush_coordinator = if config.async_flush_enabled {
273 let shared = SharedFlushCtx {
274 storage: storage.clone(),
275 l0_manager: l0_manager.clone(),
276 adjacency_manager: adjacency_manager.clone(),
277 property_manager: property_manager.clone(),
278 schema_manager: schema_manager.clone(),
279 cached_manifest: cached_manifest.clone(),
280 last_flush_time: last_flush_time.clone(),
281 fork_id: None,
282 fork_flush_count: fork_flush_count.clone(),
283 fork_fragment_warn_fired: fork_fragment_warn_fired.clone(),
284 fork_fragment_warn_threshold: config.fork_fragment_warn_threshold,
285 flush_lock: flush_lock.clone(),
286 index_rebuild_manager: index_rebuild_manager.clone(),
287 compaction_handle: compaction_handle.clone(),
288 compaction_config: config.compaction.clone(),
289 index_rebuild_config: config.index_rebuild.clone(),
290 auto_rebuild_enabled: config.index_rebuild.auto_rebuild_enabled,
291 };
292 let finalize_fn: Arc<dyn FinalizeFn> = Arc::new(WriterFinalizer);
293 Some(Arc::new(FlushCoordinator::new(
294 config.max_pending_flushes,
295 shared,
296 finalize_fn,
297 )))
298 } else {
299 None
300 };
301
302 let commit_sequence = Arc::new(crate::runtime::sync::AtomicU64::new(0));
303 let committed_writes = Arc::new(PlMutex::new(crate::runtime::occ::CommitRegistry::new(
304 OCC_REGISTRY_CAPACITY,
305 )));
306 let for_update_locks = Arc::new(dashmap::DashMap::new());
307
308 Ok(Self {
309 l0_manager,
310 storage,
311 schema_manager,
312 allocator,
313 config,
314 xervo_runtime: OnceLock::new(),
315 property_manager,
316 adjacency_manager,
317 last_flush_time,
318 compaction_handle,
319 index_rebuild_manager,
320 cached_manifest,
321 fork_id: None,
322 fork_flush_count,
323 fork_fragment_warn_fired,
324 flush_lock,
325 flush_coordinator,
326 commit_sequence,
327 committed_writes,
328 for_update_locks,
329 })
330 }
331
332 /// Returns the shared pessimistic lock handle for a `FOR UPDATE` row key,
333 /// creating it on first use. The caller `.lock_owned().await`s the returned
334 /// mutex and holds the guard for the transaction's lifetime.
335 pub fn row_lock_handle(&self, key: &[u8]) -> Arc<tokio::sync::Mutex<()>> {
336 self.for_update_locks
337 .entry(key.to_vec())
338 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
339 .clone()
340 }
341
342 /// Prunes `FOR UPDATE` lock-map entries for `keys` that no live transaction
343 /// holds anymore, so the map does not grow without bound across the keyspace.
344 ///
345 /// Called when a transaction ends, **after** its guards have been dropped.
346 /// `remove_if` evaluates its predicate under the DashMap shard lock, which is
347 /// the same lock `row_lock_handle` takes to clone an entry — so the check
348 /// `strong_count == 1` (only the map holds the `Arc`) is race-free: a
349 /// concurrent acquirer either already cloned the `Arc` (count ≥ 2 → we skip
350 /// removal) or has not yet taken the shard lock (it will mint a fresh entry
351 /// after we remove). Either way no two transactions ever lock different
352 /// `Mutex` instances for the same key.
353 pub fn release_for_update_locks(&self, keys: &[Vec<u8>]) {
354 for key in keys {
355 self.for_update_locks
356 .remove_if(key, |_, handle| Arc::strong_count(handle) == 1);
357 }
358 }
359
360 /// Number of live entries in the `FOR UPDATE` lock map. Introspection for
361 /// tests that the map does not leak entries across transactions (G5).
362 pub fn for_update_lock_count(&self) -> usize {
363 self.for_update_locks.len()
364 }
365
366 /// The current OCC commit sequence. A `FOR UPDATE` acquisition re-stamps a
367 /// fresh transaction's `occ_read_seq` to this so its conflict-detection
368 /// baseline advances to lock-acquisition time (read-latest under the lock).
369 pub fn current_commit_sequence(&self) -> u64 {
370 self.commit_sequence
371 .load(crate::runtime::sync::Ordering::Relaxed)
372 }
373
374 /// Build a fresh `SharedFlushCtx` from this Writer's current state.
375 /// Used by the async-flush stream/finalize paths to pass into spawned
376 /// tasks without smuggling `Arc<Writer>` (which would create a cycle
377 /// with `flush_coordinator -> FinalizeFn -> Writer`).
378 pub(crate) fn shared_ctx(&self) -> SharedFlushCtx {
379 SharedFlushCtx {
380 storage: self.storage.clone(),
381 l0_manager: self.l0_manager.clone(),
382 adjacency_manager: self.adjacency_manager.clone(),
383 property_manager: self.property_manager.clone(),
384 schema_manager: self.schema_manager.clone(),
385 cached_manifest: self.cached_manifest.clone(),
386 last_flush_time: self.last_flush_time.clone(),
387 fork_id: self.fork_id,
388 fork_flush_count: self.fork_flush_count.clone(),
389 fork_fragment_warn_fired: self.fork_fragment_warn_fired.clone(),
390 fork_fragment_warn_threshold: self.config.fork_fragment_warn_threshold,
391 flush_lock: self.flush_lock.clone(),
392 index_rebuild_manager: self.index_rebuild_manager.clone(),
393 compaction_handle: self.compaction_handle.clone(),
394 compaction_config: self.config.compaction.clone(),
395 index_rebuild_config: self.config.index_rebuild.clone(),
396 auto_rebuild_enabled: self.config.index_rebuild.auto_rebuild_enabled,
397 }
398 }
399
400 /// Borrow the flush coordinator if async flush is enabled.
401 /// Returns `None` when `config.async_flush_enabled = false`.
402 /// External callers (`drop_fork`) use this to drain pending streams.
403 pub fn flush_coordinator(
404 &self,
405 ) -> Option<&Arc<crate::runtime::flush_coordinator::FlushCoordinator>> {
406 self.flush_coordinator.as_ref()
407 }
408
409 /// Set the index rebuild manager for post-flush automatic rebuild scheduling.
410 ///
411 /// One-shot: returns `Err` if already set. The receiver is `&self` so this
412 /// can be called after the `Writer` has been wrapped in `Arc<Writer>`.
413 pub fn set_index_rebuild_manager(
414 &self,
415 manager: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
416 ) -> Result<()> {
417 self.index_rebuild_manager
418 .set(manager)
419 .map_err(|_| anyhow!("index_rebuild_manager already set"))
420 }
421
422 /// Replay WAL mutations into the current L0 buffer.
423 pub async fn replay_wal(&self, wal_high_water_mark: u64) -> Result<usize> {
424 let l0 = self.l0_manager.get_current();
425 let wal = l0.read().wal.clone();
426
427 if let Some(wal) = wal {
428 wal.initialize().await?;
429 let mutations = wal.replay_since(wal_high_water_mark).await?;
430 let count = mutations.len();
431
432 if count > 0 {
433 log::info!(
434 "Replaying {} mutations from WAL (LSN > {})",
435 count,
436 wal_high_water_mark
437 );
438 let mut l0_guard = l0.write();
439 l0_guard.replay_mutations(mutations)?;
440 // Rebuild the UNIQUE constraint index over the recovered rows
441 // (Bug #9 Mechanism B). `replay_mutations` restores
442 // vertices/properties/labels but never repopulates
443 // `constraint_index` (its only other caller is the live insert
444 // path). Without this, a unique key that lives only in the WAL
445 // (committed but not yet flushed to Lance) is invisible to
446 // `check_unique_constraint_multi` after recovery and a
447 // duplicate of it could be created.
448 self.rebuild_constraint_index(&mut l0_guard);
449 }
450
451 Ok(count)
452 } else {
453 Ok(0)
454 }
455 }
456
457 /// Rebuild the UNIQUE constraint index on a recovered L0 buffer.
458 ///
459 /// Scans every recovered vertex's properties and, for each enabled UNIQUE
460 /// constraint whose target label the vertex carries and whose member
461 /// properties are all present, inserts the same constraint key the live
462 /// insert path builds (`serialize_constraint_key`). Tombstoned vertices are
463 /// skipped. Called after [`L0Buffer::replay_mutations`] under the buffer's
464 /// write lock; the schema is already loaded on the `Writer`.
465 fn rebuild_constraint_index(&self, l0_guard: &mut L0Buffer) {
466 let schema = self.schema_manager.schema();
467 // Collect entries first to avoid borrowing `vertex_properties`
468 // immutably while mutating `constraint_index` through the same guard.
469 let mut keys: Vec<(Vec<u8>, Vid)> = Vec::new();
470 for (&vid, props) in &l0_guard.vertex_properties {
471 if l0_guard.vertex_tombstones.contains(&vid) {
472 continue;
473 }
474 let Some(labels) = l0_guard.vertex_labels.get(&vid) else {
475 continue;
476 };
477 for label in labels {
478 for constraint in &schema.constraints {
479 if !constraint.enabled {
480 continue;
481 }
482 let ConstraintTarget::Label(l) = &constraint.target else {
483 continue;
484 };
485 if l != label {
486 continue;
487 }
488 let ConstraintType::Unique {
489 properties: unique_props,
490 } = &constraint.constraint_type
491 else {
492 continue;
493 };
494 let mut key_values = Vec::new();
495 let mut all_present = true;
496 for prop in unique_props {
497 if let Some(val) = props.get(prop) {
498 key_values.push((prop.clone(), val.clone()));
499 } else {
500 all_present = false;
501 break;
502 }
503 }
504 if all_present {
505 keys.push((serialize_constraint_key(label, &key_values), vid));
506 }
507 }
508 }
509 }
510 for (key, vid) in keys {
511 l0_guard.insert_constraint_key(key, vid);
512 }
513 }
514
515 /// Allocates the next VID (pure auto-increment).
516 pub async fn next_vid(&self) -> Result<Vid> {
517 self.allocator.allocate_vid().await
518 }
519
520 /// Allocates multiple VIDs at once for bulk operations.
521 /// This is more efficient than calling next_vid() in a loop.
522 pub async fn allocate_vids(&self, count: usize) -> Result<Vec<Vid>> {
523 self.allocator.allocate_vids(count).await
524 }
525
526 /// Allocates the next EID (pure auto-increment).
527 pub async fn next_eid(&self, _type_id: u32) -> Result<Eid> {
528 self.allocator.allocate_eid().await
529 }
530
531 /// Allocates multiple EIDs at once for bulk operations.
532 /// This is more efficient than calling next_eid() in a loop.
533 pub async fn allocate_eids(&self, count: usize) -> Result<Vec<Eid>> {
534 self.allocator.allocate_eids(count).await
535 }
536
537 /// Install the embedding runtime exactly once. Receiver is `&self` so it
538 /// can be called after the `Writer` has been wrapped in `Arc<Writer>`.
539 pub fn set_xervo_runtime(&self, runtime: Arc<ModelRuntime>) -> Result<()> {
540 self.xervo_runtime
541 .set(runtime)
542 .map_err(|_| anyhow!("xervo_runtime already set"))
543 }
544
545 pub fn xervo_runtime(&self) -> Option<Arc<ModelRuntime>> {
546 self.xervo_runtime.get().cloned()
547 }
548
549 /// Create a new empty L0 buffer for transaction-scoped mutations.
550 ///
551 /// Only reads the current version — no exclusive lock required on Writer.
552 /// The returned buffer has no WAL reference; mutations are logged at
553 /// commit time via [`Self::commit_transaction_l0`].
554 pub fn create_transaction_l0(&self) -> Arc<RwLock<L0Buffer>> {
555 let current_version = self.l0_manager.get_current().read().current_version;
556 // Transaction mutations are logged to WAL at COMMIT time, not during the transaction.
557 let buf = L0Buffer::new(current_version, None);
558 // SSI: stamp the OCC read sequence at begin so commit can detect any
559 // transaction that committed since. Gated on the runtime `ssi_enabled`
560 // toggle — when off, `occ_read_set` stays `None` and every downstream
561 // read-set recording / commit validation self-gates to a no-op.
562 let buf = if self.config.ssi_enabled {
563 let mut buf = buf;
564 buf.occ_read_seq = self
565 .commit_sequence
566 .load(crate::runtime::sync::Ordering::Relaxed);
567 // The read path records observed ids here for SSI antidependency
568 // detection; commit consults it.
569 buf.occ_read_set = Some(Arc::new(parking_lot::Mutex::new(
570 crate::runtime::l0::OccReadSet::default(),
571 )));
572 buf
573 } else {
574 buf
575 };
576 Arc::new(RwLock::new(buf))
577 }
578
579 /// Resolve the target L0 buffer for a mutation.
580 ///
581 /// When `tx_l0` is `Some`, the mutation targets a transaction-private buffer.
582 /// When `None`, it targets the global L0 from the manager.
583 fn resolve_l0(&self, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) -> Arc<RwLock<L0Buffer>> {
584 tx_l0
585 .cloned()
586 .unwrap_or_else(|| self.l0_manager.get_current())
587 }
588
589 fn update_metrics(&self) {
590 let l0 = self.l0_manager.get_current();
591 let size = l0.read().estimated_size;
592 metrics::gauge!("l0_buffer_size_bytes").set(size as f64);
593 }
594
595 /// Overlay-aware issue-#77 edge-endpoint validation.
596 ///
597 /// The current buffer alone does not hold all committed-but-unflushed
598 /// tombstones — a flush rotation moves them onto `pending_flush` until
599 /// the Lance write completes. A vertex is effectively deleted iff,
600 /// walking newest-first (tx → current → pending newest→oldest), the
601 /// first buffer that knows the vid says "tombstoned" (an insert clears
602 /// the tombstone within a buffer, so props/tombstone are mutually
603 /// exclusive per buffer).
604 ///
605 /// Must run under `flush_lock` so the overlay cannot change before the
606 /// merge, and BEFORE the durable WAL flush (see the call site).
607 fn validate_edge_endpoints_overlay(&self, tx_l0: &L0Buffer) -> Result<()> {
608 let chain = self.l0_manager.get_pending_flush();
609 let current = self.l0_manager.get_current();
610 let effectively_deleted = |vid: &Vid| -> bool {
611 if tx_l0.vertex_properties.contains_key(vid) {
612 return false;
613 }
614 if tx_l0.vertex_tombstones.contains(vid) {
615 return true;
616 }
617 {
618 let cur = current.read();
619 if cur.vertex_properties.contains_key(vid) {
620 return false;
621 }
622 if cur.vertex_tombstones.contains(vid) {
623 return true;
624 }
625 }
626 for frozen in chain.iter().rev() {
627 let g = frozen.read();
628 if g.vertex_properties.contains_key(vid) {
629 return false;
630 }
631 if g.vertex_tombstones.contains(vid) {
632 return true;
633 }
634 }
635 false
636 };
637 for (eid, (src_vid, dst_vid, _etype)) in &tx_l0.edge_endpoints {
638 if tx_l0.tombstones.contains_key(eid) {
639 continue; // a deletion, not an insertion — never resurrects a vertex
640 }
641 if effectively_deleted(src_vid) {
642 anyhow::bail!(
643 "Cannot insert edge {}: source vertex {} has been deleted (issue #77)",
644 eid,
645 src_vid
646 );
647 }
648 if effectively_deleted(dst_vid) {
649 anyhow::bail!(
650 "Cannot insert edge {}: destination vertex {} has been deleted (issue #77)",
651 eid,
652 dst_vid
653 );
654 }
655 }
656 Ok(())
657 }
658
659 /// Seed `main_l0` (the current buffer) with the newest pending-overlay
660 /// value for each CRDT property the transaction writes, so the commit
661 /// merge MERGES against the committed CRDT state instead of shadowing it
662 /// (the carve-out lets concurrent CRDT writers commit on the assumption
663 /// that the merge sees the committed value — true only while that value
664 /// lives in current, not mid-flush on `pending_flush`). No-op when
665 /// nothing is pending or the property already exists in current. Vertex
666 /// properties only, mirroring the carve-out itself.
667 fn seed_crdt_state_from_chain(&self, tx_l0: &L0Buffer, main_l0: &mut L0Buffer) {
668 let chain = self.l0_manager.get_pending_flush();
669 if chain.is_empty() {
670 return;
671 }
672 for (vid, props) in &tx_l0.vertex_properties {
673 Self::seed_crdt_props(&chain, *vid, props, main_l0);
674 }
675 }
676
677 /// Per-vertex CRDT seeding (see [`Self::seed_crdt_state_from_chain`]).
678 /// Also used by the non-transactional vertex write path, which CRDT-merges
679 /// into the current buffer directly and has the same shadowing hazard
680 /// during a flush window.
681 fn seed_crdt_props(
682 chain: &[Arc<RwLock<L0Buffer>>],
683 vid: Vid,
684 props: &Properties,
685 target: &mut L0Buffer,
686 ) {
687 for (key, value) in props {
688 if crate::runtime::l0::try_as_crdt(value).is_none() {
689 continue;
690 }
691 if target
692 .vertex_properties
693 .get(&vid)
694 .is_some_and(|p| p.contains_key(key))
695 {
696 continue;
697 }
698 // Newest generation first: the first hit is the live state.
699 for frozen in chain.iter().rev() {
700 let g = frozen.read();
701 if let Some(v) = g.vertex_properties.get(&vid).and_then(|p| p.get(key)) {
702 if crate::runtime::l0::try_as_crdt(v).is_some() {
703 target
704 .vertex_properties
705 .entry(vid)
706 .or_default()
707 .insert(key.clone(), v.clone());
708 }
709 break;
710 }
711 }
712 }
713 }
714
715 /// Commit an externally-owned transaction L0 buffer.
716 ///
717 /// Writes mutations to WAL, flushes, merges into main L0, and replays
718 /// edges into the AdjacencyManager. Returns the WAL LSN of the commit
719 /// (0 when no WAL is configured).
720 /// Commit a transaction's private L0 buffer into main L0.
721 ///
722 /// Returns `(wal_lsn, flush_pending)`. When `flush_pending == true`, the
723 /// post-commit `should_flush()` predicate fired but no flush ran — the
724 /// caller is expected to spawn a background `flush_to_l1`. This is the
725 /// shape used when `UniConfig::async_flush_enabled` is set, so commits
726 /// don't block on L1-streaming I/O.
727 pub async fn commit_transaction_l0(
728 self: &Arc<Self>,
729 tx_l0_arc: Arc<RwLock<L0Buffer>>,
730 ) -> Result<(u64, bool)> {
731 // Hold `flush_lock` across WAL append + flush + main-L0 merge.
732 // Two concurrent commits serialize here; in Phase 3 the outer
733 // `Arc<RwLock<Writer>>` already provides this exclusion, so the
734 // acquisition is uncontended. Phase 4 drops the outer lock and
735 // this becomes the load-bearing serialization point.
736 let _flush_lock_guard = self.flush_lock.lock().await;
737
738 // Crash-recovery seam: simulate process death immediately after winning
739 // the commit serialization point but before any durable work. No-op
740 // unless built with `--features failpoints`. (See ssi_resilience tests.)
741 fail::fail_point!("commit::after-flush-lock");
742
743 // SSI: optimistic conflict detection. This MUST run before any WAL
744 // write — `flush_wal()` below is the durable commit point and the WAL
745 // has no abort marker, so aborting after it would resurrect this
746 // transaction on crash recovery. The write-set is reused for
747 // registration after a successful merge.
748 // Runtime-gated on `config.ssi_enabled`. When off, no validation runs
749 // and `occ_write_set` is `None`, so the post-merge registration below
750 // is skipped — reproducing last-writer-wins exactly.
751 let occ_write_set: Option<crate::runtime::occ::WriteSet> = if self.config.ssi_enabled {
752 let tx_l0 = tx_l0_arc.read();
753 let read_seq = tx_l0.occ_read_seq;
754 let write_set = crate::runtime::occ::WriteSet::from_l0(&tx_l0);
755 if !write_set.is_empty() {
756 // Telemetry: one validation per non-empty (writing) commit. The
757 // ratio of conflicts to validations is the headline abort rate.
758 metrics::counter!("uni_ssi_commit_validations_total").increment(1);
759 // Read-set is consulted only for writing transactions, so a
760 // read-only commit (empty write-set) runs at snapshot isolation.
761 let read_guard = tx_l0.occ_read_set.as_ref().map(|rs| rs.lock());
762 if let Some(conflict) =
763 self.committed_writes
764 .lock()
765 .check(read_seq, &write_set, read_guard.as_deref())
766 {
767 use crate::runtime::occ::Conflict;
768 match &conflict {
769 Conflict::WriteWrite { .. } => metrics::counter!(
770 "uni_ssi_serialization_conflicts_total",
771 "kind" => "write_write",
772 )
773 .increment(1),
774 Conflict::ReadWrite { .. } => metrics::counter!(
775 "uni_ssi_serialization_conflicts_total",
776 "kind" => "read_write",
777 )
778 .increment(1),
779 Conflict::HistoryTruncated { .. } => {
780 metrics::counter!("uni_ssi_history_truncated_total").increment(1)
781 }
782 }
783 return Err(anyhow::Error::new(
784 uni_common::UniError::SerializationConflict {
785 message: conflict.to_string(),
786 },
787 ));
788 }
789 }
790
791 // Validate against the committed-but-unflushed overlay under
792 // `flush_lock`: serializable MERGE uniqueness + CRDT carve-out
793 // soundness. The current buffer alone does not hold all committed
794 // state — a flush rotation moves it onto `pending_flush` until the
795 // Lance write completes (the Bug #9A window, here at the
796 // commit-time layer) — so every check walks [current, pending…].
797 {
798 let pending = self.l0_manager.get_pending_flush();
799 let main_l0 = self.l0_manager.get_current();
800 let overlay: Vec<Arc<RwLock<L0Buffer>>> =
801 std::iter::once(main_l0).chain(pending).collect();
802
803 // SSI / serializable MERGE: abort if a concurrent transaction has
804 // already committed a row with one of this transaction's unique
805 // keys. Commits serialize here, so this closes the race window
806 // left by the per-insert check. (Empty index → no iterations.)
807 for (key, vid) in &tx_l0.constraint_index {
808 if overlay
809 .iter()
810 .any(|b| b.read().has_constraint_key(key, *vid))
811 {
812 metrics::counter!("uni_ssi_constraint_conflicts_total").increment(1);
813 return Err(anyhow::Error::new(
814 uni_common::UniError::ConstraintConflict {
815 message: "unique key already committed by a concurrent \
816 transaction"
817 .to_string(),
818 },
819 ));
820 }
821 }
822
823 // Implicit MERGE phantom guard: a `MERGE` that *created* a node
824 // registered its (label, key-props) here even with no declared
825 // UNIQUE constraint. If a concurrent transaction already committed
826 // the same MERGE key, abort retriably so the two converge to one
827 // node on retry (the loser's MATCH then finds the committed row).
828 // Only MERGE-creates register keys, so a plain CREATE of the same
829 // properties never lands here. (Empty index → no iterations.)
830 for (key, vid) in &tx_l0.merge_guard_index {
831 if overlay
832 .iter()
833 .any(|b| b.read().has_merge_guard_key(key, *vid))
834 {
835 metrics::counter!("uni_ssi_constraint_conflicts_total").increment(1);
836 return Err(anyhow::Error::new(
837 uni_common::UniError::ConstraintConflict {
838 message: "MERGE key already committed by a concurrent \
839 transaction"
840 .to_string(),
841 },
842 ));
843 }
844 }
845
846 // Same race window for global ext_id uniqueness: the per-insert
847 // check ran against an older main L0; re-probe the committed
848 // index here, where commits serialize.
849 for (ext_id, vid) in &tx_l0.extid_index {
850 let taken = overlay.iter().any(|b| {
851 matches!(b.read().extid_index.get(ext_id), Some(&owner) if owner != *vid)
852 });
853 if taken {
854 metrics::counter!("uni_ssi_constraint_conflicts_total").increment(1);
855 return Err(anyhow::Error::new(
856 uni_common::UniError::ConstraintConflict {
857 message: format!(
858 "ext_id '{ext_id}' already committed by a concurrent \
859 transaction"
860 ),
861 },
862 ));
863 }
864 }
865
866 // CRDT carve-out soundness: a pure-CRDT write was dropped from the
867 // write-set assuming its merge commutes. If the overlay holds a
868 // *different* CRDT variant for the same property, the merge would
869 // silently overwrite it — abort instead of losing the update.
870 // (Checked against every overlay buffer: conservative if an old
871 // generation held a different variant that a newer commit already
872 // replaced, but an abort+retry is always sound.)
873 for buf in &overlay {
874 if let Some(conflict) =
875 crate::runtime::occ::crdt_carveout_overwrite(&tx_l0, &buf.read())
876 {
877 metrics::counter!("uni_ssi_crdt_aborts_total").increment(1);
878 return Err(anyhow::Error::new(
879 uni_common::UniError::SerializationConflict {
880 message: conflict.to_string(),
881 },
882 ));
883 }
884 }
885 }
886 Some(write_set)
887 } else {
888 None
889 };
890
891 // Issue #77: an edge whose endpoint is effectively deleted makes the
892 // merge below bail. That bail MUST happen before the durable WAL flush —
893 // after it the transaction is committed-but-unmerged (a ghost commit),
894 // and WAL replay re-hits the same bail, making the database unopenable.
895 // SSI validation was deliberately placed before the flush for exactly
896 // this reason; the endpoint check belongs here too. Runs unconditionally
897 // (issue #77 is not SSI-gated) under `flush_lock`, so the overlay
898 // tombstone state cannot change between here and the merge. A
899 // tombstone may live in a flush-rotated pending buffer rather than
900 // the current buffer, so the check walks the overlay newest-first.
901 {
902 let tx_l0 = tx_l0_arc.read();
903 self.validate_edge_endpoints_overlay(&tx_l0)?;
904 }
905
906 // Crash-recovery seam: SSI validation has passed; the transaction is
907 // about to become durable. A crash here must leave NO trace (validation
908 // happens before the WAL is touched). No-op unless `failpoints`.
909 fail::fail_point!("commit::after-validate");
910
911 // 1. Write transaction mutations to WAL BEFORE merging into main L0
912 // This ensures durability before visibility.
913 {
914 let tx_l0 = tx_l0_arc.read();
915 let main_l0_arc = self.l0_manager.get_current();
916 let main_l0 = main_l0_arc.read();
917
918 // If WAL exists, write mutations to it for durability
919 if let Some(wal) = main_l0.wal.as_ref() {
920 // Order: vertices first, then edges (to ensure src/dst exist on replay)
921
922 // Vertex insertions
923 for (vid, properties) in &tx_l0.vertex_properties {
924 if !tx_l0.vertex_tombstones.contains(vid) {
925 let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
926 wal.append(crate::runtime::wal::Mutation::InsertVertex {
927 vid: *vid,
928 properties: properties.clone(),
929 labels,
930 })?;
931 }
932 }
933
934 // Vertex deletions
935 for vid in &tx_l0.vertex_tombstones {
936 let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
937 wal.append(crate::runtime::wal::Mutation::DeleteVertex { vid: *vid, labels })?;
938 }
939
940 // Label-only mutations (SET n:Label / REMOVE n:Label). After
941 // vertex inserts (so the vertex exists on replay), before edges,
942 // and skipping vertices deleted in this same commit.
943 for vid in &tx_l0.vertex_label_overwrites {
944 if tx_l0.vertex_tombstones.contains(vid) {
945 continue;
946 }
947 let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
948 wal.append(crate::runtime::wal::Mutation::SetVertexLabels {
949 vid: *vid,
950 labels,
951 })?;
952 }
953
954 // Crash-recovery seam: vertices appended, edges not yet. Tests
955 // assert that a crash here (before `flush_wal`) recovers NOTHING
956 // — the durable commit point is the flush below, not append.
957 fail::fail_point!("commit::mid-wal");
958
959 // Edge insertions and deletions from edge_endpoints
960 for (eid, (src_vid, dst_vid, edge_type)) in &tx_l0.edge_endpoints {
961 if tx_l0.tombstones.contains_key(eid) {
962 let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
963 wal.append(crate::runtime::wal::Mutation::DeleteEdge {
964 eid: *eid,
965 src_vid: *src_vid,
966 dst_vid: *dst_vid,
967 edge_type: *edge_type,
968 version,
969 })?;
970 } else {
971 let properties =
972 tx_l0.edge_properties.get(eid).cloned().unwrap_or_default();
973 let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
974 let edge_type_name = tx_l0.edge_types.get(eid).cloned();
975 wal.append(crate::runtime::wal::Mutation::InsertEdge {
976 src_vid: *src_vid,
977 dst_vid: *dst_vid,
978 edge_type: *edge_type,
979 eid: *eid,
980 version,
981 properties,
982 edge_type_name,
983 })?;
984 }
985 }
986
987 // Tombstones for edges that only exist in the global L0 (not in
988 // this transaction's edge_endpoints). Without this, deletes of
989 // pre-existing edges would be silently lost.
990 for (eid, tombstone) in &tx_l0.tombstones {
991 if !tx_l0.edge_endpoints.contains_key(eid) {
992 let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
993 wal.append(crate::runtime::wal::Mutation::DeleteEdge {
994 eid: *eid,
995 src_vid: tombstone.src_vid,
996 dst_vid: tombstone.dst_vid,
997 edge_type: tombstone.edge_type,
998 version,
999 })?;
1000 }
1001 }
1002 }
1003 }
1004
1005 // 2. Flush WAL to durable storage - THIS IS THE COMMIT POINT
1006 let wal_lsn = self.flush_wal().await?;
1007
1008 // Crash-recovery seam: the WAL is durable but main L0 has NOT merged.
1009 // A crash here must RECOVER the transaction on replay (it is committed),
1010 // even though it was never made visible in-process. No-op unless `failpoints`.
1011 fail::fail_point!("commit::after-wal-flush");
1012
1013 // Component C1: if an outstanding snapshot pins the current generation,
1014 // clone it aside (lazy copy-on-write) before merging, so the pinning
1015 // transaction's reads stay isolated from this commit. No-op — and zero
1016 // cost — when nothing is pinned (the common case). We hold `flush_lock`,
1017 // so this cannot race a flush rotate or another commit's merge; the merge
1018 // below re-fetches `get_current()`, landing in the fresh post-freeze buffer.
1019 // Self-gates on the runtime SSI toggle: a snapshot is only ever pinned by
1020 // a transaction begun under `ssi_enabled`, so `is_current_pinned()` is
1021 // always false when SSI is off and this is a zero-cost no-op.
1022 if self.l0_manager.is_current_pinned() {
1023 self.l0_manager.freeze_current_for_snapshot();
1024 metrics::counter!("uni_l0_snapshot_freezes_total").increment(1);
1025 }
1026
1027 // 3. Merge into main L0 and make visible
1028 {
1029 // Write-lock the tx buffer: `merge_take` moves its property maps
1030 // into main L0 instead of cloning them. The commit consumes the
1031 // transaction, so the drained maps are never observed afterwards;
1032 // everything read below (endpoints, versions, tombstones) is left
1033 // intact.
1034 let mut tx_l0 = tx_l0_arc.write();
1035 let main_l0_arc = self.l0_manager.get_current();
1036 let mut main_l0 = main_l0_arc.write();
1037 // A CRDT property's committed state may live only in a
1038 // flush-rotated pending buffer (the post-rotation current is
1039 // empty until the Lance write completes — the Bug #9A window).
1040 // `merge_crdt_properties` merges against the CURRENT buffer's
1041 // value — without seeding, the tx's CRDT state would SHADOW the
1042 // pending buffer's at read time (newest buffer wins per property)
1043 // and concurrent increments would be lost. Seed the newest
1044 // overlay value for each CRDT property the tx writes that current
1045 // lacks, so the merge below merges instead of replaces.
1046 self.seed_crdt_state_from_chain(&tx_l0, &mut main_l0);
1047 main_l0.merge_take(&mut tx_l0)?;
1048
1049 // Replay transaction edges into the AdjacencyManager overlay
1050 for (eid, (src, dst, etype)) in &tx_l0.edge_endpoints {
1051 let edge_version = tx_l0
1052 .edge_versions
1053 .get(eid)
1054 .copied()
1055 .unwrap_or(main_l0.current_version);
1056 if tx_l0.tombstones.contains_key(eid) {
1057 self.adjacency_manager
1058 .add_tombstone(*eid, *src, *dst, *etype, edge_version);
1059 } else {
1060 self.adjacency_manager
1061 .insert_edge(*src, *dst, *eid, *etype, edge_version);
1062 }
1063 }
1064
1065 // Replay tombstones for edges that only exist in the global L0
1066 // (not in this transaction's edge_endpoints).
1067 for (eid, tombstone) in &tx_l0.tombstones {
1068 if !tx_l0.edge_endpoints.contains_key(eid) {
1069 let edge_version = tx_l0
1070 .edge_versions
1071 .get(eid)
1072 .copied()
1073 .unwrap_or(main_l0.current_version);
1074 self.adjacency_manager.add_tombstone(
1075 *eid,
1076 tombstone.src_vid,
1077 tombstone.dst_vid,
1078 tombstone.edge_type,
1079 edge_version,
1080 );
1081 }
1082 }
1083 }
1084
1085 // Crash-recovery seam: durable AND merged, but the in-memory commit
1086 // registry has not recorded this write-set yet. A crash here is
1087 // indistinguishable from one at `after-wal-flush` on reopen (the
1088 // registry is in-memory and rebuilt empty); the tx still recovers.
1089 fail::fail_point!("commit::after-merge");
1090
1091 // SSI: register this commit's write-set under a fresh commit sequence so
1092 // later transactions detect conflicts against it. Still under
1093 // `flush_lock`, before the async-flush branch can drop the guard.
1094 // `occ_write_set` is `Some` only when `config.ssi_enabled`.
1095 if let Some(write_set) = occ_write_set
1096 && !write_set.is_empty()
1097 {
1098 // Bump-then-record via the shared OCC seam (see `CommitRegistry::commit`)
1099 // so production and the loom/shuttle models exercise identical logic.
1100 self.committed_writes
1101 .lock()
1102 .commit(&self.commit_sequence, write_set);
1103 }
1104
1105 self.update_metrics();
1106
1107 // 4. Best-effort post-commit auto-flush.
1108 //
1109 // Two paths:
1110 // - async_flush_enabled = false (default): inline under our
1111 // existing flush_lock guard via flush_inline_under_lock.
1112 // - async_flush_enabled = true: rotate inline, drop flush_lock,
1113 // then submit the stream phase to the coordinator. Gated on
1114 // `pending_flush_count() < max_pending_flushes` so we don't
1115 // stack up rotations beyond the configured pipeline depth.
1116 // `try_acquire_permit` is non-blocking: if we lose the race
1117 // for the last permit, we just skip this trigger (the next
1118 // commit retries).
1119 let mut flush_pending = false;
1120 if self.should_flush() {
1121 if self.config.async_flush_enabled
1122 && let Some(coord) = self.flush_coordinator.as_ref()
1123 && coord.pending_flush_count() < self.config.max_pending_flushes
1124 {
1125 match coord.try_acquire_permit() {
1126 Some(permit) => {
1127 match self.flush_l0_rotate().await {
1128 Ok(rotate_out) => {
1129 // Allocate the rotate seq and bump pending ONLY
1130 // after the rotate succeeds (Bug #3). A failed
1131 // rotate must consume neither: the finalizer
1132 // advances strictly in consecutive seq order and
1133 // only decrements pending on finalize, so a
1134 // leaked seq/pending from a failed rotate would
1135 // wedge the finalizer forever and climb pending
1136 // toward `max_pending_flushes`. The seq is still
1137 // allocated under `flush_lock` (immediately after
1138 // the rotate, before the guard drops below), so
1139 // concurrent rotates keep seq order == rotation
1140 // order, and the seq is not used until submit.
1141 let seq = coord.next_rotate_seq();
1142 coord.note_pending();
1143 // Release flush_lock BEFORE the spawn so concurrent
1144 // commits can proceed while the stream runs.
1145 drop(_flush_lock_guard);
1146 let parent_manifest = self.cached_manifest.lock().clone();
1147 let rotated = crate::runtime::flush_coordinator::RotatedFlush {
1148 seq,
1149 old_l0_arc: rotate_out.old_l0_arc.clone(),
1150 wal_lsn: rotate_out.wal_lsn,
1151 current_version: rotate_out.current_version,
1152 name: None,
1153 parent_manifest,
1154 permit,
1155 flush_in_progress_guard: rotate_out.flush_in_progress_guard,
1156 };
1157 let writer = self.clone();
1158 let _ticket = coord.submit_for_stream(
1159 rotated,
1160 move |old_l0, wal, ver, n| async move {
1161 let outcome =
1162 writer.flush_stream_l1(old_l0, wal, ver, n).await?;
1163 Ok(crate::runtime::flush_coordinator::FlushOutcome {
1164 new_manifest: outcome.manifest,
1165 snapshot_id: outcome.snapshot_id,
1166 })
1167 },
1168 );
1169 flush_pending = true;
1170 // Early return — flush_lock already dropped.
1171 return Ok((wal_lsn, flush_pending));
1172 }
1173 Err(e) => {
1174 tracing::warn!("Async rotate failed (non-critical): {}", e);
1175 // No seq was allocated and pending was not
1176 // bumped (both moved into the Ok arm for Bug
1177 // #3), so the finalizer is not wedged. The
1178 // permit drops here, freeing the slot.
1179 }
1180 }
1181 }
1182 None => {
1183 // Race: someone else grabbed the last permit. Skip;
1184 // next commit will retry should_flush().
1185 metrics::counter!("uni_flush_trigger_skipped_total").increment(1);
1186 }
1187 }
1188 } else if let Err(e) = self.flush_inline_under_lock(None).await {
1189 tracing::warn!("Post-commit flush check failed (non-critical): {}", e);
1190 }
1191 }
1192
1193 Ok((wal_lsn, flush_pending))
1194 }
1195
1196 /// Flush the WAL buffer to durable storage.
1197 ///
1198 /// Returns the LSN of the flushed segment, or `0` when no WAL is configured.
1199 pub async fn flush_wal(&self) -> Result<u64> {
1200 let l0 = self.l0_manager.get_current();
1201 let wal = l0.read().wal.clone();
1202
1203 match wal {
1204 Some(wal) => Ok(wal.flush().await?),
1205 None => Ok(0),
1206 }
1207 }
1208
1209 /// Record property removals in the active L0 mutation stats.
1210 ///
1211 /// Routes to the transaction L0 if provided, otherwise to the main L0.
1212 pub fn track_properties_removed(&self, count: usize, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) {
1213 if count == 0 {
1214 return;
1215 }
1216 let l0 = self.resolve_l0(tx_l0);
1217 l0.write().mutation_stats.properties_removed += count;
1218 }
1219
1220 /// Validates vertex constraints for the given properties.
1221 /// In the new design, label is passed as a parameter since VID no longer embeds label.
1222 async fn validate_vertex_constraints_for_label(
1223 &self,
1224 vid: Vid,
1225 properties: &Properties,
1226 label: &str,
1227 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1228 ) -> Result<()> {
1229 self.validate_vertex_constraints_for_label_impl(vid, properties, label, tx_l0, false)
1230 .await
1231 }
1232
1233 /// Partial-update sibling: validates only constraints touching keys
1234 /// present in `properties` (the touched set). NOT NULL is checked
1235 /// only for touched keys; multi-key UNIQUE / CHECK / EXISTS are
1236 /// skipped when any referenced key is absent (the caller is
1237 /// expected to have routed to the full-row path in that case via
1238 /// `touched_needs_full_read`).
1239 async fn validate_vertex_constraints_for_label_partial(
1240 &self,
1241 vid: Vid,
1242 properties: &Properties,
1243 label: &str,
1244 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1245 ) -> Result<()> {
1246 self.validate_vertex_constraints_for_label_impl(vid, properties, label, tx_l0, true)
1247 .await
1248 }
1249
1250 async fn validate_vertex_constraints_for_label_impl(
1251 &self,
1252 vid: Vid,
1253 properties: &Properties,
1254 label: &str,
1255 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1256 partial: bool,
1257 ) -> Result<()> {
1258 let schema = self.schema_manager.schema();
1259
1260 {
1261 // 1. Check NOT NULL constraints (from Property definitions).
1262 // Under partial-update mode, skip properties NOT in
1263 // `properties` — they retain their previous (already-
1264 // validated) value.
1265 if let Some(props_meta) = schema.properties.get(label) {
1266 for (prop_name, meta) in props_meta {
1267 if !meta.nullable {
1268 let present = properties.get(prop_name);
1269 if partial && present.is_none() {
1270 continue;
1271 }
1272 if present.is_none_or(|v| v.is_null()) {
1273 log::warn!(
1274 "Constraint violation: Property '{}' cannot be null for label '{}'",
1275 prop_name,
1276 label
1277 );
1278 return Err(anyhow!(
1279 "Constraint violation: Property '{}' cannot be null",
1280 prop_name
1281 ));
1282 }
1283 }
1284 }
1285 }
1286
1287 // 2. Check Explicit Constraints (Unique, Check, etc.)
1288 for constraint in &schema.constraints {
1289 if !constraint.enabled {
1290 continue;
1291 }
1292 match &constraint.target {
1293 ConstraintTarget::Label(l) if l == label => {}
1294 _ => continue,
1295 }
1296
1297 match &constraint.constraint_type {
1298 ConstraintType::Unique {
1299 properties: unique_props,
1300 } => {
1301 // Support single and multi-property unique constraints
1302 if !unique_props.is_empty() {
1303 let mut key_values = Vec::new();
1304 let mut missing = false;
1305 for prop in unique_props {
1306 if let Some(val) = properties.get(prop) {
1307 key_values.push((prop.clone(), val.clone()));
1308 } else {
1309 missing = true; // Can't enforce if property missing (partial update?)
1310 // For INSERT, missing means null?
1311 // If property is nullable, unique constraint typically allows multiple nulls or ignores?
1312 // For now, only check if ALL keys are present
1313 }
1314 }
1315
1316 if !missing {
1317 self.check_unique_constraint_multi(label, &key_values, vid, tx_l0)
1318 .await?;
1319 }
1320 }
1321 }
1322 ConstraintType::Exists { property } => {
1323 if properties.get(property).is_none_or(|v| v.is_null()) {
1324 log::warn!(
1325 "Constraint violation: Property '{}' must exist for label '{}'",
1326 property,
1327 label
1328 );
1329 return Err(anyhow!(
1330 "Constraint violation: Property '{}' must exist",
1331 property
1332 ));
1333 }
1334 }
1335 ConstraintType::Check { expression } => {
1336 if !self.evaluate_check_constraint(expression, properties)? {
1337 return Err(anyhow!(
1338 "CHECK constraint '{}' violated: expression '{}' evaluated to false",
1339 constraint.name,
1340 expression
1341 ));
1342 }
1343 }
1344 _ => {
1345 return Err(anyhow!("Unsupported constraint type"));
1346 }
1347 }
1348 }
1349 }
1350 Ok(())
1351 }
1352
1353 /// Validates vertex constraints for a vertex with the given labels.
1354 /// Labels must be passed explicitly since the vertex may not yet be in L0.
1355 /// Unknown labels (not in schema) are skipped.
1356 async fn validate_vertex_constraints(
1357 &self,
1358 vid: Vid,
1359 properties: &Properties,
1360 labels: &[String],
1361 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1362 ) -> Result<()> {
1363 let schema = self.schema_manager.schema();
1364
1365 // Validate constraints only for known labels
1366 for label in labels {
1367 // Skip unknown labels (schemaless support)
1368 if schema.get_label_case_insensitive(label).is_none() {
1369 continue;
1370 }
1371 self.validate_vertex_constraints_for_label(vid, properties, label, tx_l0)
1372 .await?;
1373 }
1374
1375 // Check global ext_id uniqueness if ext_id is provided
1376 if let Some(ext_id) = properties.get("ext_id").and_then(|v| v.as_str()) {
1377 self.check_extid_globally_unique(ext_id, vid, tx_l0).await?;
1378 }
1379
1380 Ok(())
1381 }
1382
1383 /// Partial sibling of `validate_vertex_constraints` — validates only
1384 /// constraints touching keys present in `properties`. Used by
1385 /// `insert_vertex_partial`'s fast path; the caller pre-screens for
1386 /// multi-key UNIQUE constraints via `touched_needs_full_read`.
1387 async fn validate_vertex_constraints_partial(
1388 &self,
1389 vid: Vid,
1390 touched: &Properties,
1391 labels: &[String],
1392 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1393 ) -> Result<()> {
1394 let schema = self.schema_manager.schema();
1395 for label in labels {
1396 if schema.get_label_case_insensitive(label).is_none() {
1397 continue;
1398 }
1399 self.validate_vertex_constraints_for_label_partial(vid, touched, label, tx_l0)
1400 .await?;
1401 }
1402 if let Some(ext_id) = touched.get("ext_id").and_then(|v| v.as_str()) {
1403 self.check_extid_globally_unique(ext_id, vid, tx_l0).await?;
1404 }
1405 Ok(())
1406 }
1407
1408 /// Collect ext_ids and unique constraint keys from an iterator of vertex properties.
1409 ///
1410 /// Used to build a constraint key index from L0 buffers for batch validation.
1411 fn collect_constraint_keys_from_properties<'a>(
1412 properties_iter: impl Iterator<Item = &'a Properties>,
1413 label: &str,
1414 constraints: &[uni_common::core::schema::Constraint],
1415 existing_keys: &mut HashMap<String, HashSet<String>>,
1416 existing_extids: &mut HashSet<String>,
1417 ) {
1418 for props in properties_iter {
1419 if let Some(ext_id) = props.get("ext_id").and_then(|v| v.as_str()) {
1420 existing_extids.insert(ext_id.to_string());
1421 }
1422
1423 for constraint in constraints {
1424 if !constraint.enabled {
1425 continue;
1426 }
1427 if let ConstraintTarget::Label(l) = &constraint.target {
1428 if l != label {
1429 continue;
1430 }
1431 } else {
1432 continue;
1433 }
1434
1435 if let ConstraintType::Unique {
1436 properties: unique_props,
1437 } = &constraint.constraint_type
1438 {
1439 let mut key_parts = Vec::new();
1440 let mut all_present = true;
1441 for prop in unique_props {
1442 if let Some(val) = props.get(prop) {
1443 key_parts.push(format!("{}:{}", prop, val));
1444 } else {
1445 all_present = false;
1446 break;
1447 }
1448 }
1449 if all_present {
1450 let key = key_parts.join("|");
1451 existing_keys
1452 .entry(constraint.name.clone())
1453 .or_default()
1454 .insert(key);
1455 }
1456 }
1457 }
1458 }
1459 }
1460
1461 /// Validates constraints for a batch of vertices efficiently.
1462 ///
1463 /// This method builds an in-memory index from L0 buffers ONCE instead of scanning
1464 /// per vertex, reducing complexity from O(n²) to O(n) for bulk inserts.
1465 ///
1466 /// # Arguments
1467 /// * `vids` - VIDs of vertices being inserted
1468 /// * `properties_batch` - Properties for each vertex
1469 /// * `label` - Label for all vertices (assumes single label for now)
1470 ///
1471 /// # Performance
1472 /// For N vertices with unique constraints:
1473 /// - Old approach: O(N²) - scan L0 buffer N times
1474 /// - New approach: O(N) - scan L0 buffer once, build HashSet, check each vertex in O(1)
1475 async fn validate_vertex_batch_constraints(
1476 &self,
1477 vids: &[Vid],
1478 properties_batch: &[Properties],
1479 label: &str,
1480 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1481 ) -> Result<()> {
1482 if vids.len() != properties_batch.len() {
1483 return Err(anyhow!("VID/properties length mismatch"));
1484 }
1485
1486 let schema = self.schema_manager.schema();
1487
1488 // 1. Validate NOT NULL constraints for each vertex
1489 if let Some(props_meta) = schema.properties.get(label) {
1490 for (idx, properties) in properties_batch.iter().enumerate() {
1491 for (prop_name, meta) in props_meta {
1492 if !meta.nullable && properties.get(prop_name).is_none_or(|v| v.is_null()) {
1493 return Err(anyhow!(
1494 "Constraint violation at index {}: Property '{}' cannot be null",
1495 idx,
1496 prop_name
1497 ));
1498 }
1499 }
1500 }
1501 }
1502
1503 // 2. Build constraint key index from L0 buffers (ONCE for entire batch)
1504 let mut existing_keys: HashMap<String, HashSet<String>> = HashMap::new();
1505 let mut existing_extids: HashSet<String> = HashSet::new();
1506
1507 // Scan current L0 buffer
1508 {
1509 let l0 = self.l0_manager.get_current();
1510 let l0_guard = l0.read();
1511 Self::collect_constraint_keys_from_properties(
1512 l0_guard.vertex_properties.values(),
1513 label,
1514 &schema.constraints,
1515 &mut existing_keys,
1516 &mut existing_extids,
1517 );
1518 }
1519
1520 // Scan transaction L0 if present
1521 if let Some(tx_l0) = tx_l0 {
1522 let tx_l0_guard = tx_l0.read();
1523 Self::collect_constraint_keys_from_properties(
1524 tx_l0_guard.vertex_properties.values(),
1525 label,
1526 &schema.constraints,
1527 &mut existing_keys,
1528 &mut existing_extids,
1529 );
1530 }
1531
1532 // 3. Check batch vertices against index AND check for duplicates within batch
1533 let mut batch_keys: HashMap<String, HashMap<String, usize>> = HashMap::new();
1534 let mut batch_extids: HashMap<String, usize> = HashMap::new();
1535
1536 for (idx, (_vid, properties)) in vids.iter().zip(properties_batch.iter()).enumerate() {
1537 // Check ext_id uniqueness
1538 if let Some(ext_id) = properties.get("ext_id").and_then(|v| v.as_str()) {
1539 if existing_extids.contains(ext_id) {
1540 return Err(anyhow!(
1541 "Constraint violation at index {}: ext_id '{}' already exists",
1542 idx,
1543 ext_id
1544 ));
1545 }
1546 if let Some(first_idx) = batch_extids.get(ext_id) {
1547 return Err(anyhow!(
1548 "Constraint violation: ext_id '{}' duplicated in batch at indices {} and {}",
1549 ext_id,
1550 first_idx,
1551 idx
1552 ));
1553 }
1554 batch_extids.insert(ext_id.to_string(), idx);
1555 }
1556
1557 // Check unique constraints
1558 for constraint in &schema.constraints {
1559 if !constraint.enabled {
1560 continue;
1561 }
1562 if let ConstraintTarget::Label(l) = &constraint.target {
1563 if l != label {
1564 continue;
1565 }
1566 } else {
1567 continue;
1568 }
1569
1570 match &constraint.constraint_type {
1571 ConstraintType::Unique {
1572 properties: unique_props,
1573 } => {
1574 let mut key_parts = Vec::new();
1575 let mut all_present = true;
1576 for prop in unique_props {
1577 if let Some(val) = properties.get(prop) {
1578 key_parts.push(format!("{}:{}", prop, val));
1579 } else {
1580 all_present = false;
1581 break;
1582 }
1583 }
1584
1585 if all_present {
1586 let key = key_parts.join("|");
1587
1588 // Check against existing L0 keys
1589 if let Some(keys) = existing_keys.get(&constraint.name)
1590 && keys.contains(&key)
1591 {
1592 return Err(anyhow!(
1593 "Constraint violation at index {}: Duplicate composite key for label '{}' (constraint '{}')",
1594 idx,
1595 label,
1596 constraint.name
1597 ));
1598 }
1599
1600 // Check for duplicates within batch
1601 let batch_constraint_keys =
1602 batch_keys.entry(constraint.name.clone()).or_default();
1603 if let Some(first_idx) = batch_constraint_keys.get(&key) {
1604 return Err(anyhow!(
1605 "Constraint violation: Duplicate key '{}' in batch at indices {} and {}",
1606 key,
1607 first_idx,
1608 idx
1609 ));
1610 }
1611 batch_constraint_keys.insert(key, idx);
1612 }
1613 }
1614 ConstraintType::Exists { property }
1615 if properties.get(property).is_none_or(|v| v.is_null()) =>
1616 {
1617 return Err(anyhow!(
1618 "Constraint violation at index {}: Property '{}' must exist",
1619 idx,
1620 property
1621 ));
1622 }
1623 ConstraintType::Check { expression }
1624 if !self.evaluate_check_constraint(expression, properties)? =>
1625 {
1626 return Err(anyhow!(
1627 "Constraint violation at index {}: CHECK constraint '{}' violated",
1628 idx,
1629 constraint.name
1630 ));
1631 }
1632 _ => {}
1633 }
1634 }
1635 }
1636
1637 // 4. Check storage for unique constraints (can batch this into a single query)
1638 for constraint in &schema.constraints {
1639 if !constraint.enabled {
1640 continue;
1641 }
1642 if let ConstraintTarget::Label(l) = &constraint.target {
1643 if l != label {
1644 continue;
1645 }
1646 } else {
1647 continue;
1648 }
1649
1650 if let ConstraintType::Unique {
1651 properties: unique_props,
1652 } = &constraint.constraint_type
1653 {
1654 // Build compound OR filter for all batch vertices
1655 let mut or_filters = Vec::new();
1656 for properties in properties_batch.iter() {
1657 let mut and_parts = Vec::new();
1658 let mut all_present = true;
1659 for prop in unique_props {
1660 if let Some(val) = properties.get(prop) {
1661 let val_str = match val {
1662 Value::String(s) => format!("'{}'", s.replace('\'', "''")),
1663 Value::Int(n) => n.to_string(),
1664 Value::Float(f) => f.to_string(),
1665 Value::Bool(b) => b.to_string(),
1666 _ => {
1667 all_present = false;
1668 break;
1669 }
1670 };
1671 and_parts.push(format!("{} = {}", prop, val_str));
1672 } else {
1673 all_present = false;
1674 break;
1675 }
1676 }
1677 if all_present {
1678 or_filters.push(format!("({})", and_parts.join(" AND ")));
1679 }
1680 }
1681
1682 #[cfg(feature = "lance-backend")]
1683 if !or_filters.is_empty() {
1684 let vid_list: Vec<String> =
1685 vids.iter().map(|v| v.as_u64().to_string()).collect();
1686 let filter = format!(
1687 "({}) AND _deleted = false AND _vid NOT IN ({})",
1688 or_filters.join(" OR "),
1689 vid_list.join(", ")
1690 );
1691
1692 if let Ok(ds) = self.storage.vertex_dataset(label)
1693 && let Ok(lance_ds) = ds.open_raw().await
1694 {
1695 let count = lance_ds.count_rows(Some(filter.clone())).await?;
1696 if count > 0 {
1697 return Err(anyhow!(
1698 "Constraint violation: Duplicate composite key for label '{}' in storage (constraint '{}')",
1699 label,
1700 constraint.name
1701 ));
1702 }
1703 }
1704 }
1705 }
1706 }
1707
1708 Ok(())
1709 }
1710
1711 /// Checks that ext_id is globally unique across all vertices.
1712 ///
1713 /// Searches L0 buffers (current, transaction, pending) and the main vertices table
1714 /// to ensure no other vertex uses this ext_id.
1715 ///
1716 /// # Errors
1717 ///
1718 /// Returns error if another vertex with the same ext_id exists.
1719 async fn check_extid_globally_unique(
1720 &self,
1721 ext_id: &str,
1722 current_vid: Vid,
1723 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1724 ) -> Result<()> {
1725 // Check L0 buffers: current, transaction, and pending flush
1726 let l0_buffers_to_check: Vec<Arc<RwLock<L0Buffer>>> = {
1727 let mut buffers = vec![self.l0_manager.get_current()];
1728 if let Some(tx_l0) = tx_l0 {
1729 buffers.push(tx_l0.clone());
1730 }
1731 buffers.extend(self.l0_manager.get_pending_flush());
1732 buffers
1733 };
1734
1735 for l0 in &l0_buffers_to_check {
1736 // O(1) per buffer via the maintained `extid_index` (the previous
1737 // full `vertex_properties` scan made constrained ingest O(n²)).
1738 if let Some(&vid) = l0.read().extid_index.get(ext_id)
1739 && vid != current_vid
1740 {
1741 return Err(anyhow!(
1742 "Constraint violation: ext_id '{}' already exists (vertex {:?})",
1743 ext_id,
1744 vid
1745 ));
1746 }
1747 }
1748
1749 // Check main vertices table (if it exists)
1750 // Pass None for global uniqueness check (not snapshot-isolated)
1751 let backend = self.storage.backend();
1752 if let Ok(Some(found_vid)) = MainVertexDataset::find_by_ext_id(backend, ext_id, None).await
1753 && found_vid != current_vid
1754 {
1755 return Err(anyhow!(
1756 "Constraint violation: ext_id '{}' already exists (vertex {:?})",
1757 ext_id,
1758 found_vid
1759 ));
1760 }
1761
1762 Ok(())
1763 }
1764
1765 /// Helper to get vertex labels from L0 buffer.
1766 fn get_vertex_labels_from_l0(&self, vid: Vid) -> Option<Vec<String>> {
1767 let l0 = self.l0_manager.get_current();
1768 let l0_guard = l0.read();
1769 // Check if vertex is tombstoned (deleted) - if so, return None
1770 if l0_guard.vertex_tombstones.contains(&vid) {
1771 return None;
1772 }
1773 l0_guard.get_vertex_labels(vid).map(|l| l.to_vec())
1774 }
1775
1776 /// Get vertex labels from all sources: current L0, pending L0s, and storage.
1777 /// This is the proper way to read vertex labels after a flush, as it checks both
1778 /// in-memory buffers and persisted storage.
1779 pub async fn get_vertex_labels(
1780 &self,
1781 vid: Vid,
1782 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1783 ) -> Option<Vec<String>> {
1784 // 1. Check current L0
1785 if let Some(labels) = self.get_vertex_labels_from_l0(vid) {
1786 return Some(labels);
1787 }
1788
1789 // 2. Check transaction L0 if present
1790 if let Some(tx_l0) = tx_l0 {
1791 let guard = tx_l0.read();
1792 if guard.vertex_tombstones.contains(&vid) {
1793 return None;
1794 }
1795 if let Some(labels) = guard.get_vertex_labels(vid) {
1796 return Some(labels.to_vec());
1797 }
1798 }
1799
1800 // 3. Check pending flush L0s
1801 for pending_l0 in self.l0_manager.get_pending_flush() {
1802 let guard = pending_l0.read();
1803 if guard.vertex_tombstones.contains(&vid) {
1804 return None;
1805 }
1806 if let Some(labels) = guard.get_vertex_labels(vid) {
1807 return Some(labels.to_vec());
1808 }
1809 }
1810
1811 // 4. Check storage
1812 self.find_vertex_labels_in_storage(vid).await.ok().flatten()
1813 }
1814
1815 /// Helper to get edge type from L0 buffer.
1816 fn get_edge_type_from_l0(&self, eid: Eid) -> Option<String> {
1817 let l0 = self.l0_manager.get_current();
1818 let l0_guard = l0.read();
1819 l0_guard.get_edge_type(eid).map(|s| s.to_string())
1820 }
1821
1822 /// Look up the edge type ID (u32) for an EID from the L0 buffer's edge endpoints.
1823 /// Falls back to the transaction L0 if available.
1824 pub fn get_edge_type_id_from_l0(
1825 &self,
1826 eid: Eid,
1827 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1828 ) -> Option<u32> {
1829 // Check transaction L0 first
1830 if let Some(tx_l0) = tx_l0 {
1831 let guard = tx_l0.read();
1832 if let Some((_, _, etype)) = guard.get_edge_endpoint_full(eid) {
1833 return Some(etype);
1834 }
1835 }
1836 // Fall back to main L0
1837 let l0 = self.l0_manager.get_current();
1838 let l0_guard = l0.read();
1839 l0_guard
1840 .get_edge_endpoint_full(eid)
1841 .map(|(_, _, etype)| etype)
1842 }
1843
1844 /// Set the type name for an edge (used for schemaless edge types).
1845 /// This is called during CREATE for edge types not found in the schema.
1846 pub fn set_edge_type(
1847 &self,
1848 eid: Eid,
1849 type_name: String,
1850 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1851 ) {
1852 self.resolve_l0(tx_l0).write().set_edge_type(eid, type_name);
1853 }
1854
1855 /// Evaluate a simple CHECK constraint expression.
1856 /// Supports: "property op value" (e.g., "age > 18", "status = 'active'")
1857 fn evaluate_check_constraint(&self, expression: &str, properties: &Properties) -> Result<bool> {
1858 let parts: Vec<&str> = expression.split_whitespace().collect();
1859 if parts.len() != 3 {
1860 // For now, only support "prop op val"
1861 // Fallback to true if too complex to avoid breaking, but warn
1862 log::warn!(
1863 "Complex CHECK constraint expression '{}' not fully supported yet; allowing write.",
1864 expression
1865 );
1866 return Ok(true);
1867 }
1868
1869 let prop_part = parts[0].trim_start_matches('(');
1870 // Handle "variable.property" format - take the part after the dot
1871 let prop_name = if let Some(idx) = prop_part.find('.') {
1872 &prop_part[idx + 1..]
1873 } else {
1874 prop_part
1875 };
1876
1877 let op = parts[1];
1878 let val_str = parts[2].trim_end_matches(')');
1879
1880 let prop_val = match properties.get(prop_name) {
1881 Some(v) => v,
1882 None => return Ok(true), // If property missing, CHECK usually passes (unless NOT NULL)
1883 };
1884
1885 // Parse value string (handle quotes for strings)
1886 let target_val = if (val_str.starts_with('\'') && val_str.ends_with('\''))
1887 || (val_str.starts_with('"') && val_str.ends_with('"'))
1888 {
1889 Value::String(val_str[1..val_str.len() - 1].to_string())
1890 } else if let Ok(n) = val_str.parse::<i64>() {
1891 Value::Int(n)
1892 } else if let Ok(n) = val_str.parse::<f64>() {
1893 Value::Float(n)
1894 } else if let Ok(b) = val_str.parse::<bool>() {
1895 Value::Bool(b)
1896 } else {
1897 // Check for internal format wrappers if they somehow leaked through
1898 if val_str.starts_with("Number(") && val_str.ends_with(')') {
1899 let n_str = &val_str[7..val_str.len() - 1];
1900 if let Ok(n) = n_str.parse::<i64>() {
1901 Value::Int(n)
1902 } else if let Ok(n) = n_str.parse::<f64>() {
1903 Value::Float(n)
1904 } else {
1905 Value::String(val_str.to_string())
1906 }
1907 } else {
1908 Value::String(val_str.to_string())
1909 }
1910 };
1911
1912 match op {
1913 "=" | "==" => Ok(prop_val == &target_val),
1914 "!=" | "<>" => Ok(prop_val != &target_val),
1915 ">" => self
1916 .compare_values(prop_val, &target_val)
1917 .map(|o| o.is_gt()),
1918 "<" => self
1919 .compare_values(prop_val, &target_val)
1920 .map(|o| o.is_lt()),
1921 ">=" => self
1922 .compare_values(prop_val, &target_val)
1923 .map(|o| o.is_ge()),
1924 "<=" => self
1925 .compare_values(prop_val, &target_val)
1926 .map(|o| o.is_le()),
1927 _ => {
1928 log::warn!("Unsupported operator '{}' in CHECK constraint", op);
1929 Ok(true)
1930 }
1931 }
1932 }
1933
1934 fn compare_values(&self, a: &Value, b: &Value) -> Result<std::cmp::Ordering> {
1935 use std::cmp::Ordering;
1936
1937 fn cmp_f64(x: f64, y: f64) -> Ordering {
1938 x.partial_cmp(&y).unwrap_or(Ordering::Equal)
1939 }
1940
1941 match (a, b) {
1942 (Value::Int(n1), Value::Int(n2)) => Ok(n1.cmp(n2)),
1943 (Value::Float(f1), Value::Float(f2)) => Ok(cmp_f64(*f1, *f2)),
1944 (Value::Int(n), Value::Float(f)) => Ok(cmp_f64(*n as f64, *f)),
1945 (Value::Float(f), Value::Int(n)) => Ok(cmp_f64(*f, *n as f64)),
1946 (Value::String(s1), Value::String(s2)) => Ok(s1.cmp(s2)),
1947 _ => Err(anyhow!(
1948 "Cannot compare incompatible types: {:?} vs {:?}",
1949 a,
1950 b
1951 )),
1952 }
1953 }
1954
1955 async fn check_unique_constraint_multi(
1956 &self,
1957 label: &str,
1958 key_values: &[(String, Value)],
1959 current_vid: Vid,
1960 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1961 ) -> Result<()> {
1962 // Serialize constraint key once for O(1) lookups
1963 let key = serialize_constraint_key(label, key_values);
1964
1965 // 1. Check L0 (in-memory) using O(1) constraint index
1966 {
1967 let l0 = self.l0_manager.get_current();
1968 let l0_guard = l0.read();
1969 if l0_guard.has_constraint_key(&key, current_vid) {
1970 return Err(anyhow!(
1971 "Constraint violation: Duplicate composite key for label '{}'",
1972 label
1973 ));
1974 }
1975 }
1976
1977 // 1b. Check pending-flush buffers (Bug #9A). A flush rotates a key's
1978 // buffer onto `pending_flush` and installs a fresh empty current
1979 // buffer; until the rotated rows reach Lance the key is invisible to
1980 // both the current-buffer check above and the storage check below, so
1981 // a duplicate could slip through that flush window. Mirror the read
1982 // paths (e.g. `check_extid_globally_unique`, `get_vertex_labels`) that
1983 // already consult `pending_flush`.
1984 for pending_l0 in self.l0_manager.get_pending_flush() {
1985 if pending_l0.read().has_constraint_key(&key, current_vid) {
1986 return Err(anyhow!(
1987 "Constraint violation: Duplicate composite key for label '{}' (in pending flush)",
1988 label
1989 ));
1990 }
1991 }
1992
1993 // Check Transaction L0
1994 if let Some(tx_l0) = tx_l0 {
1995 let tx_l0_guard = tx_l0.read();
1996 if tx_l0_guard.has_constraint_key(&key, current_vid) {
1997 return Err(anyhow!(
1998 "Constraint violation: Duplicate composite key for label '{}' (in tx)",
1999 label
2000 ));
2001 }
2002 }
2003
2004 // 2. Check Storage (L1/L2)
2005 let filters: Vec<String> = key_values
2006 .iter()
2007 .map(|(prop, val)| {
2008 let val_str = match val {
2009 Value::String(s) => format!("'{}'", s.replace('\'', "''")),
2010 Value::Int(n) => n.to_string(),
2011 Value::Float(f) => f.to_string(),
2012 Value::Bool(b) => b.to_string(),
2013 _ => "NULL".to_string(),
2014 };
2015 format!("{} = {}", prop, val_str)
2016 })
2017 .collect();
2018
2019 let mut filter = filters.join(" AND ");
2020 filter.push_str(&format!(
2021 " AND _deleted = false AND _vid != {}",
2022 current_vid.as_u64()
2023 ));
2024
2025 #[cfg(feature = "lance-backend")]
2026 if let Ok(ds) = self.storage.vertex_dataset(label)
2027 && let Ok(lance_ds) = ds.open_raw().await
2028 {
2029 let count = lance_ds.count_rows(Some(filter.clone())).await?;
2030 if count > 0 {
2031 return Err(anyhow!(
2032 "Constraint violation: Duplicate composite key for label '{}' (in storage). Filter: {}",
2033 label,
2034 filter
2035 ));
2036 }
2037 }
2038
2039 Ok(())
2040 }
2041
2042 async fn check_write_pressure(&self) -> Result<()> {
2043 let status = self
2044 .storage
2045 .compaction_status()
2046 .map_err(|e| anyhow::anyhow!("Failed to get compaction status: {}", e))?;
2047 let l1_runs = status.l1_runs;
2048 let throttle = &self.config.throttle;
2049
2050 if l1_runs >= throttle.hard_limit {
2051 log::warn!("Write stalled: L1 runs ({}) at hard limit", l1_runs);
2052 // Simple polling for now
2053 while self
2054 .storage
2055 .compaction_status()
2056 .map_err(|e| anyhow::anyhow!("Failed to get compaction status: {}", e))?
2057 .l1_runs
2058 >= throttle.hard_limit
2059 {
2060 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2061 }
2062 } else if l1_runs >= throttle.soft_limit {
2063 let excess = l1_runs - throttle.soft_limit;
2064 // Cap multiplier to avoid overflow
2065 let excess = std::cmp::min(excess, 31);
2066 let multiplier = 2_u32.pow(excess as u32);
2067 let delay = throttle.base_delay * multiplier;
2068 tokio::time::sleep(delay).await;
2069 }
2070 Ok(())
2071 }
2072
2073 /// Check transaction memory limit to prevent OOM.
2074 /// No-op when no transaction is active.
2075 fn check_transaction_memory(&self, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) -> Result<()> {
2076 if let Some(tx_l0) = tx_l0 {
2077 let size = tx_l0.read().estimated_size;
2078 if size > self.config.max_transaction_memory {
2079 return Err(anyhow!(
2080 "Transaction memory limit exceeded: {} bytes used, limit is {} bytes. \
2081 Roll back or commit the current transaction.",
2082 size,
2083 self.config.max_transaction_memory
2084 ));
2085 }
2086 }
2087 Ok(())
2088 }
2089
2090 async fn get_query_context(
2091 &self,
2092 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2093 ) -> Option<QueryContext> {
2094 Some(QueryContext::new_with_pending(
2095 self.l0_manager.get_current(),
2096 tx_l0.cloned(),
2097 self.l0_manager.get_pending_flush(),
2098 ))
2099 }
2100
2101 /// Layer-1 CRDT variant enforcement, shared by the single-vertex and batch
2102 /// write paths.
2103 ///
2104 /// Rejects a declared CRDT property written as a parsed CRDT value
2105 /// (`Value::Map`) whose variant differs from the schema's declared variant.
2106 /// A mismatch would make the commit-time merge silently overwrite instead of
2107 /// merge, and the OCC CRDT carve-out (`occ::crdt_carveout_overwrite` /
2108 /// `WriteSet::from_l0`) would hide it as a lost update — so it must be caught
2109 /// at write time, on *every* write path. `try_as_crdt` is `Map`-gated, so the
2110 /// JSON-string (Cypher) form and non-CRDT values pass through untouched: they
2111 /// are never carved out and stay conflictable.
2112 fn enforce_crdt_variants(
2113 props_meta: &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
2114 properties: &Properties,
2115 ) -> Result<()> {
2116 for (key, value) in properties {
2117 let Some(meta) = props_meta.get(key) else {
2118 continue;
2119 };
2120 let uni_common::core::schema::DataType::Crdt(expected) = &meta.r#type else {
2121 continue;
2122 };
2123 if let Some(crdt) = crate::runtime::l0::try_as_crdt(value)
2124 && crdt.type_name() != expected.type_name()
2125 {
2126 return Err(anyhow::Error::new(uni_common::UniError::Constraint {
2127 message: format!(
2128 "CRDT property '{key}' must be written as a {} value",
2129 expected.type_name()
2130 ),
2131 }));
2132 }
2133 }
2134 Ok(())
2135 }
2136
2137 /// Prepare a vertex for upsert by merging CRDT properties with existing values.
2138 ///
2139 /// When `label` is provided, uses it directly to look up property metadata.
2140 /// Otherwise falls back to discovering the label from L0 buffers and storage.
2141 ///
2142 /// # Errors
2143 ///
2144 /// Returns an error if CRDT property merging fails.
2145 async fn prepare_vertex_upsert(
2146 &self,
2147 vid: Vid,
2148 properties: &mut Properties,
2149 label: Option<&str>,
2150 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2151 ) -> Result<()> {
2152 let Some(pm) = &self.property_manager else {
2153 return Ok(());
2154 };
2155
2156 let schema = self.schema_manager.schema();
2157
2158 // Resolve label: use provided label or discover from L0/storage
2159 let discovered_labels;
2160 let label_name = if let Some(l) = label {
2161 Some(l)
2162 } else {
2163 discovered_labels = self.get_vertex_labels(vid, tx_l0).await;
2164 discovered_labels
2165 .as_ref()
2166 .and_then(|l| l.first().map(|s| s.as_str()))
2167 };
2168
2169 let Some(label_str) = label_name else {
2170 return Ok(());
2171 };
2172 let Some(props_meta) = schema.properties.get(label_str) else {
2173 return Ok(());
2174 };
2175
2176 // Identify CRDT properties in the insert data
2177 let crdt_keys: Vec<String> = properties
2178 .keys()
2179 .filter(|key| {
2180 props_meta.get(*key).is_some_and(|meta| {
2181 matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2182 })
2183 })
2184 .cloned()
2185 .collect();
2186
2187 if crdt_keys.is_empty() {
2188 return Ok(());
2189 }
2190
2191 // Enforce that each declared CRDT property written as a parsed CRDT value
2192 // (`Value::Map`) carries its declared variant. A mismatched variant makes
2193 // `merge_crdt_properties` overwrite rather than merge at commit, and the
2194 // OCC carve-out (`occ::crdt_carveout_overwrite` / `WriteSet::from_l0`)
2195 // would hide that as a silent lost update — reject it at the source.
2196 //
2197 // Only the `Map` form is checked: it is exactly the form the carve-out
2198 // applies to (`try_as_crdt` is `Map`-gated). A CRDT written as a JSON
2199 // string (the Cypher form) or a non-CRDT value is never carved out — it
2200 // stays conflictable — so it poses no carve-out soundness risk and is left
2201 // to the existing merge/parse path. This is the declared-property half of
2202 // the layered fix; the commit-time check covers undeclared CRDT-shaped values.
2203 Self::enforce_crdt_variants(props_meta, properties)?;
2204
2205 let ctx = self.get_query_context(tx_l0).await;
2206 for key in crdt_keys {
2207 let existing = pm.get_vertex_prop_with_ctx(vid, &key, ctx.as_ref()).await?;
2208 if !existing.is_null()
2209 && let Some(val) = properties.get_mut(&key)
2210 {
2211 *val = pm.merge_crdt_values(&existing, val)?;
2212 }
2213 }
2214
2215 Ok(())
2216 }
2217
2218 async fn prepare_edge_upsert(
2219 &self,
2220 eid: Eid,
2221 properties: &mut Properties,
2222 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2223 ) -> Result<()> {
2224 if let Some(pm) = &self.property_manager {
2225 let schema = self.schema_manager.schema();
2226 // Get edge type from L0 buffer instead of from EID
2227 let type_name = self.get_edge_type_from_l0(eid);
2228
2229 if let Some(ref t_name) = type_name
2230 && let Some(props_meta) = schema.properties.get(t_name)
2231 {
2232 let mut crdt_keys = Vec::new();
2233 for (key, _) in properties.iter() {
2234 if let Some(meta) = props_meta.get(key)
2235 && matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2236 {
2237 crdt_keys.push(key.clone());
2238 }
2239 }
2240
2241 if !crdt_keys.is_empty() {
2242 let ctx = self.get_query_context(tx_l0).await;
2243 for key in crdt_keys {
2244 let existing = pm.get_edge_prop(eid, &key, ctx.as_ref()).await?;
2245
2246 if !existing.is_null()
2247 && let Some(val) = properties.get_mut(&key)
2248 {
2249 *val = pm.merge_crdt_values(&existing, val)?;
2250 }
2251 }
2252 }
2253 }
2254 }
2255 Ok(())
2256 }
2257
2258 #[instrument(skip(self, properties), level = "trace")]
2259 pub async fn insert_vertex(
2260 &self,
2261 vid: Vid,
2262 properties: Properties,
2263 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2264 ) -> Result<()> {
2265 self.insert_vertex_with_labels(vid, properties, &[], tx_l0)
2266 .await?;
2267 Ok(())
2268 }
2269
2270 /// Component C1 (G4): before a non-transactional mutation merges into main
2271 /// L0, if an outstanding snapshot pins the current generation, freeze it
2272 /// aside so snapshots taken *before* this write stay isolated from it.
2273 ///
2274 /// `flush_lock` (acquired and released here) serializes the freeze against
2275 /// concurrent commit-time freezes/merges, matching the atomicity the tx
2276 /// commit path gets. No-op for transactional writes (their freeze happens at
2277 /// commit) and — the common case — when nothing is pinned, where it costs one
2278 /// atomic load. Freezes at most once per pinned generation: the freeze
2279 /// installs a fresh unpinned `current`, so later writes in the same bulk
2280 /// import see no pin and merge in place, and the snapshot keeps reading the
2281 /// frozen pre-import buffer.
2282 async fn freeze_for_non_tx_write_if_pinned(&self, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) {
2283 // Self-gates on the runtime SSI toggle: nothing pins a snapshot unless a
2284 // transaction began under `ssi_enabled`, so `is_current_pinned()` is
2285 // always false (one atomic load) when SSI is off.
2286 if tx_l0.is_none() && self.l0_manager.is_current_pinned() {
2287 let _flush_lock_guard = self.flush_lock.lock().await;
2288 // Re-check under the lock: a concurrent commit may have frozen first.
2289 if self.l0_manager.is_current_pinned() {
2290 self.l0_manager.freeze_current_for_snapshot();
2291 metrics::counter!("uni_l0_snapshot_freezes_total").increment(1);
2292 }
2293 }
2294 }
2295
2296 #[instrument(skip(self, properties, labels), level = "trace")]
2297 pub async fn insert_vertex_with_labels(
2298 &self,
2299 vid: Vid,
2300 mut properties: Properties,
2301 labels: &[String],
2302 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2303 ) -> Result<Properties> {
2304 let start = std::time::Instant::now();
2305 self.check_write_pressure().await?;
2306 self.check_transaction_memory(tx_l0)?;
2307
2308 // Component C1 (G4): a non-transactional write (`tx_l0 == None`, e.g. bulk
2309 // import / LOAD CSV) mutates main L0 directly, outside the commit-time
2310 // snapshot freeze. Freeze the pinned generation aside first so snapshots
2311 // taken before this write stay isolated from it.
2312 self.freeze_for_non_tx_write_if_pinned(tx_l0).await;
2313
2314 if !self.try_defer_embedding(labels, &properties, vid, tx_l0) {
2315 self.process_embeddings_for_labels(labels, &mut properties)
2316 .await?;
2317 }
2318 self.validate_vertex_constraints(vid, &properties, labels, tx_l0)
2319 .await?;
2320 self.prepare_vertex_upsert(
2321 vid,
2322 &mut properties,
2323 labels.first().map(|s| s.as_str()),
2324 tx_l0,
2325 )
2326 .await?;
2327
2328 // Clone properties and labels before moving into L0 to return them and populate constraint index
2329 let properties_copy = properties.clone();
2330 let labels_copy = labels.to_vec();
2331
2332 {
2333 // For a non-tx write, re-resolve the live `current` buffer and hold
2334 // `flush_lock` across the (synchronous) write so a concurrent flush
2335 // rotate (which takes `flush_lock` via `begin_flush`) cannot install
2336 // a fresh `current` between resolve and write, dropping our write
2337 // (Bug #4). For a tx write `resolve_l0` returns the tx-private
2338 // buffer, never the rotating `current`, so no `flush_lock` is needed
2339 // (and taking it would risk re-entrancy with the commit path).
2340 let _flush_lock_guard = if tx_l0.is_none() {
2341 Some(self.flush_lock.lock().await)
2342 } else {
2343 None
2344 };
2345 let l0 = self.resolve_l0(tx_l0);
2346 let mut l0_guard = l0.write();
2347 // Generation chaining: a non-tx CRDT write into the (post-freeze,
2348 // possibly empty) current buffer must merge against the chained
2349 // committed state, not shadow it. No-op when the chain is empty
2350 // or this is a tx-private write.
2351 if tx_l0.is_none() {
2352 let pending = self.l0_manager.get_pending_flush();
2353 if !pending.is_empty() {
2354 Self::seed_crdt_props(&pending, vid, &properties, &mut l0_guard);
2355 }
2356 }
2357 l0_guard.insert_vertex_with_labels(vid, properties, labels);
2358
2359 // Populate constraint index for O(1) duplicate detection
2360 let schema = self.schema_manager.schema();
2361 for label in &labels_copy {
2362 if schema.get_label_case_insensitive(label).is_none() {
2363 if self.config.strict_schema {
2364 return Err(anyhow::anyhow!(
2365 "Label '{}' is not defined in the schema \
2366 (strict_schema is enabled).",
2367 label
2368 ));
2369 }
2370 continue; // Schemaless: skip unknown labels.
2371 }
2372
2373 // For each unique constraint on this label, insert into constraint index
2374 for constraint in &schema.constraints {
2375 if !constraint.enabled {
2376 continue;
2377 }
2378 if let ConstraintTarget::Label(l) = &constraint.target {
2379 if l != label {
2380 continue;
2381 }
2382 } else {
2383 continue;
2384 }
2385
2386 if let ConstraintType::Unique {
2387 properties: unique_props,
2388 } = &constraint.constraint_type
2389 {
2390 let mut key_values = Vec::new();
2391 let mut all_present = true;
2392 for prop in unique_props {
2393 if let Some(val) = properties_copy.get(prop) {
2394 key_values.push((prop.clone(), val.clone()));
2395 } else {
2396 all_present = false;
2397 break;
2398 }
2399 }
2400
2401 if all_present {
2402 let key = serialize_constraint_key(label, &key_values);
2403 l0_guard.insert_constraint_key(key, vid);
2404 }
2405 }
2406 }
2407 }
2408 }
2409
2410 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2411 self.update_metrics();
2412
2413 if tx_l0.is_none() {
2414 self.check_flush().await?;
2415 }
2416 if start.elapsed().as_millis() > 100 {
2417 log::warn!("Slow insert_vertex: {}ms", start.elapsed().as_millis());
2418 }
2419 Ok(properties_copy)
2420 }
2421
2422 /// True iff routing this partial write through MergeInsert would
2423 /// miss a constraint check. Specifically: a multi-key UNIQUE
2424 /// constraint where the touched-set doesn't cover all member keys
2425 /// requires the unchanged keys from the existing row to compute
2426 /// the composite. Conservative: also returns true if any touched
2427 /// key is `ext_id` (uniqueness checked globally — handled in the
2428 /// full-row path).
2429 fn touched_needs_full_read(&self, touched: &Properties, labels: &[String]) -> bool {
2430 if touched.contains_key("ext_id") {
2431 return true;
2432 }
2433 let schema = self.schema_manager.schema();
2434 for label in labels {
2435 if schema.get_label_case_insensitive(label).is_none() {
2436 continue;
2437 }
2438 for constraint in &schema.constraints {
2439 if !constraint.enabled {
2440 continue;
2441 }
2442 if let ConstraintTarget::Label(l) = &constraint.target {
2443 if !l.eq_ignore_ascii_case(label) {
2444 continue;
2445 }
2446 } else {
2447 continue;
2448 }
2449 if let ConstraintType::Unique {
2450 properties: unique_props,
2451 } = &constraint.constraint_type
2452 {
2453 if unique_props.len() < 2 {
2454 continue; // single-key UNIQUE — partial path sees the key
2455 }
2456 if unique_props.iter().any(|p| touched.contains_key(p)) {
2457 return true;
2458 }
2459 }
2460 }
2461 }
2462 false
2463 }
2464
2465 /// Insert a vertex's FULL property row plus a touched-keys hint so
2466 /// the flush emits ONLY those columns via Lance MergeInsert.
2467 ///
2468 /// Caller must have read the full row (via PropertyManager) and
2469 /// applied SET-touched values on top before calling — same input
2470 /// shape as `insert_vertex_with_labels`. The new arg `touched_keys`
2471 /// is the set of property keys this SET statement actually
2472 /// assigned; L0 records it in `vertex_partial_keys[vid]` and the
2473 /// flush filters the MergeInsert source schema down to those keys.
2474 /// When `UniConfig::partial_lance_writes == false`, falls through
2475 /// to `insert_vertex_with_labels` (Append) — preserving bit-for-bit
2476 /// equivalence with prior releases.
2477 #[instrument(skip(self, props, touched_keys, labels), level = "trace")]
2478 pub async fn insert_vertex_partial_full(
2479 &self,
2480 vid: Vid,
2481 mut props: Properties,
2482 touched_keys: HashSet<String>,
2483 labels: &[String],
2484 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2485 ) -> Result<()> {
2486 if !self.config.partial_lance_writes
2487 || self.touched_needs_full_read(&props_subset(&props, &touched_keys), labels)
2488 {
2489 self.insert_vertex_with_labels(vid, props, labels, tx_l0)
2490 .await?;
2491 return Ok(());
2492 }
2493
2494 self.check_write_pressure().await?;
2495 self.check_transaction_memory(tx_l0)?;
2496 if !self.try_defer_embedding(labels, &props, vid, tx_l0) {
2497 self.process_embeddings_for_labels(labels, &mut props)
2498 .await?;
2499 }
2500 // Full-row validation runs because we have the complete map;
2501 // no need for the partial-only validator.
2502 self.validate_vertex_constraints(vid, &props, labels, tx_l0)
2503 .await?;
2504 {
2505 let l0 = self.resolve_l0(tx_l0);
2506 let mut l0_guard = l0.write();
2507 l0_guard.insert_vertex_partial_full(vid, props, touched_keys, labels);
2508 }
2509 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2510 metrics::counter!("uni_partial_writes_total").increment(1);
2511 self.update_metrics();
2512 if tx_l0.is_none() {
2513 self.check_flush().await?;
2514 }
2515 Ok(())
2516 }
2517
2518 /// Insert a vertex's *partial* property set without first reading the
2519 /// full row.
2520 ///
2521 /// When `WriterConfig::partial_lance_writes` is `true`, the touched
2522 /// keys flow into `L0Buffer::vertex_partial_keys` so the next flush
2523 /// emits them via Lance `MergeInsertBuilder` against a subset-of-
2524 /// schema source — preserving untouched columns (e.g., embeddings)
2525 /// byte-equal in Lance with no read at the caller and no write of
2526 /// those columns.
2527 ///
2528 /// When the flag is `false`, this falls back to the existing
2529 /// `insert_vertex_with_labels` path after merging `touched` with
2530 /// the current properties from L0/storage. The caller can therefore
2531 /// use this entry point unconditionally; the optimization activates
2532 /// only when the flag is on.
2533 #[instrument(skip(self, touched, labels), level = "trace")]
2534 pub async fn insert_vertex_partial(
2535 &self,
2536 vid: Vid,
2537 touched: Properties,
2538 labels: &[String],
2539 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2540 ) -> Result<()> {
2541 let needs_full_read =
2542 !self.config.partial_lance_writes || self.touched_needs_full_read(&touched, labels);
2543 if needs_full_read {
2544 // Flag-off fallback (or constraint-driven fallback): merge
2545 // `touched` with the current full property snapshot from
2546 // L0/storage and route through the existing path. Preserves
2547 // bit-for-bit equivalence with the pre-Round-11 release.
2548 let existing = if let Some(pm) = &self.property_manager {
2549 pm.get_all_vertex_props_with_ctx(vid, None)
2550 .await
2551 .unwrap_or_default()
2552 .unwrap_or_default()
2553 } else {
2554 Properties::new()
2555 };
2556 let mut merged = existing;
2557 for (k, v) in touched {
2558 merged.insert(k, v);
2559 }
2560 self.insert_vertex_with_labels(vid, merged, labels, tx_l0)
2561 .await?;
2562 return Ok(());
2563 }
2564
2565 // Flag-on fast path: stage the partial update directly. Pressure
2566 // checks, embedding generation, constraint validation all still
2567 // run — but the validator is the partial-aware variant that
2568 // skips NOT NULL / multi-key UNIQUE / CHECK / EXISTS for
2569 // properties not present in `touched`. Multi-key UNIQUE that
2570 // overlaps the touched set forces a fallback above via
2571 // `touched_needs_full_read`.
2572 let mut touched = touched;
2573 self.check_write_pressure().await?;
2574 self.check_transaction_memory(tx_l0)?;
2575 if !self.try_defer_embedding(labels, &touched, vid, tx_l0) {
2576 self.process_embeddings_for_labels(labels, &mut touched)
2577 .await?;
2578 }
2579 self.validate_vertex_constraints_partial(vid, &touched, labels, tx_l0)
2580 .await?;
2581
2582 {
2583 let l0 = self.resolve_l0(tx_l0);
2584 let mut l0_guard = l0.write();
2585 l0_guard.insert_vertex_partial(vid, touched, labels);
2586 }
2587
2588 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2589 metrics::counter!("uni_partial_writes_total").increment(1);
2590 self.update_metrics();
2591 if tx_l0.is_none() {
2592 self.check_flush().await?;
2593 }
2594 Ok(())
2595 }
2596
2597 /// Insert multiple vertices with batched operations.
2598 ///
2599 /// This method uses batched operations to achieve O(N) complexity instead of O(N²)
2600 /// for bulk inserts with unique constraints.
2601 ///
2602 /// # Performance Improvements
2603 /// - Batch VID allocation: 1 call instead of N calls
2604 /// - Batch constraint validation: O(N) instead of O(N²)
2605 /// - Batch embedding generation: 1 API call per config instead of N calls
2606 /// - Transaction wrapping: Automatic flush deferral, atomicity
2607 ///
2608 /// # Arguments
2609 /// * `vids` - Pre-allocated VIDs for the vertices
2610 /// * `properties_batch` - Properties for each vertex
2611 /// * `labels` - Labels for all vertices (assumes single label for simplicity)
2612 ///
2613 /// # Errors
2614 /// Returns error if:
2615 /// - VID/properties length mismatch
2616 /// - Constraint violation detected
2617 /// - Embedding generation fails
2618 /// - Transaction commit fails
2619 ///
2620 /// # Atomicity
2621 /// If this method fails, all changes are rolled back (if transaction was started here).
2622 pub async fn insert_vertices_batch(
2623 &self,
2624 vids: Vec<Vid>,
2625 mut properties_batch: Vec<Properties>,
2626 labels: Vec<String>,
2627 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2628 ) -> Result<Vec<Properties>> {
2629 let start = std::time::Instant::now();
2630
2631 // Validate inputs
2632 if vids.len() != properties_batch.len() {
2633 return Err(anyhow!(
2634 "VID/properties size mismatch: {} vids, {} properties",
2635 vids.len(),
2636 properties_batch.len()
2637 ));
2638 }
2639
2640 if vids.is_empty() {
2641 return Ok(Vec::new());
2642 }
2643
2644 // Batch operations — writes go directly to the resolved L0.
2645 // Atomicity is guaranteed by the caller holding the writer lock.
2646 let result = async {
2647 self.check_write_pressure().await?;
2648 self.check_transaction_memory(tx_l0)?;
2649
2650 // Component C1 (G4): batch bulk-import is the canonical non-tx write —
2651 // freeze the pinned generation aside before merging so snapshot
2652 // readers stay isolated. No-op when unpinned or transactional.
2653 self.freeze_for_non_tx_write_if_pinned(tx_l0).await;
2654
2655 // Batch embedding generation (1 API call per config)
2656 self.process_embeddings_for_batch(&labels, &mut properties_batch)
2657 .await?;
2658
2659 // Batch constraint validation (O(N) instead of O(N²))
2660 let label = labels
2661 .first()
2662 .ok_or_else(|| anyhow!("No labels provided"))?;
2663 self.validate_vertex_batch_constraints(&vids, &properties_batch, label, tx_l0)
2664 .await?;
2665
2666 // Batch prepare (CRDT merging if needed)
2667 // Check schema once: skip entirely if no CRDT properties for this label.
2668 // For new vertices (freshly allocated VIDs), there are no existing CRDT
2669 // values to merge, so the per-vertex lookup is unnecessary in that case.
2670 let has_crdt_fields = {
2671 let schema = self.schema_manager.schema();
2672 schema
2673 .properties
2674 .get(label.as_str())
2675 .is_some_and(|props_meta| {
2676 props_meta.values().any(|meta| {
2677 matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2678 })
2679 })
2680 };
2681
2682 if has_crdt_fields {
2683 // Layer-1 variant enforcement (G3): the batch path must reject a
2684 // declared-CRDT variant mismatch exactly as the single-vertex
2685 // `prepare_vertex_upsert` does. Without this, a wrong-variant CRDT
2686 // written via batch import slips past write-time validation and
2687 // the OCC carve-out then masks the overwrite as a lost update.
2688 {
2689 let schema = self.schema_manager.schema();
2690 if let Some(props_meta) = schema.properties.get(label.as_str()) {
2691 for props in &properties_batch {
2692 Self::enforce_crdt_variants(props_meta, props)?;
2693 }
2694 }
2695 }
2696
2697 // Batch fetch existing CRDT values: collect VIDs that need merging,
2698 // then query once via PropertyManager instead of per-vertex lookups.
2699 let schema = self.schema_manager.schema();
2700 let crdt_keys: Vec<String> = schema
2701 .properties
2702 .get(label.as_str())
2703 .map(|props_meta| {
2704 props_meta
2705 .iter()
2706 .filter(|(_, meta)| {
2707 matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2708 })
2709 .map(|(key, _)| key.clone())
2710 .collect()
2711 })
2712 .unwrap_or_default();
2713
2714 if let Some(pm) = &self.property_manager {
2715 let ctx = self.get_query_context(tx_l0).await;
2716 for (vid, props) in vids.iter().zip(&mut properties_batch) {
2717 for key in &crdt_keys {
2718 if props.contains_key(key) {
2719 let existing =
2720 pm.get_vertex_prop_with_ctx(*vid, key, ctx.as_ref()).await?;
2721 if !existing.is_null()
2722 && let Some(val) = props.get_mut(key)
2723 {
2724 *val = pm.merge_crdt_values(&existing, val)?;
2725 }
2726 }
2727 }
2728 }
2729 }
2730 }
2731
2732 // Batch L0 writes — route to active L0 (transaction L0 if active, else current).
2733 let target_l0 = self.resolve_l0(tx_l0);
2734
2735 let properties_result = properties_batch.clone();
2736 {
2737 let mut l0_guard = target_l0.write();
2738 for (vid, props) in vids.iter().zip(properties_batch.iter()) {
2739 l0_guard.insert_vertex_with_labels(*vid, props.clone(), &labels);
2740 }
2741 }
2742
2743 // Update metrics (batch increment)
2744 metrics::counter!("uni_l0_buffer_mutations_total").increment(vids.len() as u64);
2745 self.update_metrics();
2746
2747 Ok::<Vec<Properties>, anyhow::Error>(properties_result)
2748 }
2749 .await;
2750
2751 let props = result?;
2752
2753 if start.elapsed().as_millis() > 100 {
2754 log::warn!(
2755 "Slow insert_vertices_batch ({} vertices): {}ms",
2756 vids.len(),
2757 start.elapsed().as_millis()
2758 );
2759 }
2760
2761 Ok(props)
2762 }
2763
2764 /// Delete a vertex by VID.
2765 ///
2766 /// When `labels` is provided, uses them directly to populate L0 for
2767 /// correct tombstone flushing. Otherwise discovers labels from L0
2768 /// buffers and storage (which can be slow for many vertices).
2769 ///
2770 /// # Errors
2771 ///
2772 /// Returns an error if write pressure stalls, label lookup fails, or
2773 /// the L0 delete operation fails.
2774 #[instrument(skip(self, labels), level = "trace")]
2775 pub async fn delete_vertex(
2776 &self,
2777 vid: Vid,
2778 labels: Option<Vec<String>>,
2779 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2780 ) -> Result<()> {
2781 let start = std::time::Instant::now();
2782 self.check_write_pressure().await?;
2783 self.check_transaction_memory(tx_l0)?;
2784 self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
2785
2786 // Before deleting, ensure we have the vertex's labels stored in L0 so
2787 // the tombstone can be flushed to the correct label datasets. Discover
2788 // them up front (this may await storage) WITHOUT pinning the buffer we
2789 // will eventually mutate — for non-tx writes the live `current` buffer
2790 // is re-resolved below under `flush_lock`, so a concurrent rotate can't
2791 // drop our write (Bug #4). `resolve_l0` here is only used for cheap
2792 // reads that tolerate a racing rotate.
2793 let has_labels = {
2794 let l0_guard = self.resolve_l0(tx_l0);
2795 let guard = l0_guard.read();
2796 guard.vertex_labels.contains_key(&vid)
2797 };
2798
2799 let backfill_labels = if has_labels {
2800 None
2801 } else if let Some(provided) = labels {
2802 // Caller provided labels — skip the lookup entirely
2803 Some(provided)
2804 } else {
2805 // Discover labels from pending flush L0s, then storage
2806 let mut found = None;
2807 for pending_l0 in self.l0_manager.get_pending_flush() {
2808 let pending_guard = pending_l0.read();
2809 if let Some(l) = pending_guard.get_vertex_labels(vid) {
2810 found = Some(l.to_vec());
2811 break;
2812 }
2813 }
2814 if found.is_none() {
2815 found = self.find_vertex_labels_in_storage(vid).await?;
2816 }
2817 found
2818 };
2819
2820 // Test-only seam (no-op without the `failpoints` feature): pause a
2821 // non-transactional delete AFTER the awaited label discovery but BEFORE
2822 // it re-resolves the live buffer and writes the tombstone. A concurrent
2823 // flush can rotate+complete a buffer in this window; the fix re-resolves
2824 // `get_current()` and mutates it under `flush_lock`, so the tombstone
2825 // always lands in the live buffer (Bug #4 — silent lost delete across
2826 // L0 rotation).
2827 fail::fail_point!("nontx::after-capture");
2828
2829 // Apply the label backfill and the tombstone together. For a non-tx
2830 // write, hold `flush_lock` across the (synchronous) re-resolve + write
2831 // so a concurrent flush rotate (which takes `flush_lock` via
2832 // `begin_flush`) cannot install a fresh `current` between our resolve
2833 // and our write. For a tx write `resolve_l0` returns the tx-private
2834 // buffer (never the rotating `current`), so no `flush_lock` is needed
2835 // — and taking it there would risk re-entrancy with the commit path.
2836 if tx_l0.is_none() {
2837 let _flush_lock_guard = self.flush_lock.lock().await;
2838 let l0 = self.l0_manager.get_current();
2839 let mut guard = l0.write();
2840 if let Some(found_labels) = backfill_labels {
2841 guard.vertex_labels.insert(vid, found_labels);
2842 }
2843 guard.delete_vertex(vid)?;
2844 } else {
2845 let l0 = self.resolve_l0(tx_l0);
2846 let mut guard = l0.write();
2847 if let Some(found_labels) = backfill_labels {
2848 guard.vertex_labels.insert(vid, found_labels);
2849 }
2850 guard.delete_vertex(vid)?;
2851 }
2852 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2853 self.update_metrics();
2854
2855 if tx_l0.is_none() {
2856 self.check_flush().await?;
2857 }
2858 if start.elapsed().as_millis() > 100 {
2859 log::warn!("Slow delete_vertex: {}ms", start.elapsed().as_millis());
2860 }
2861 Ok(())
2862 }
2863
2864 /// Find vertex labels from storage by querying the main vertices table.
2865 /// Returns the labels from the latest non-deleted version of the vertex.
2866 async fn find_vertex_labels_in_storage(&self, vid: Vid) -> Result<Option<Vec<String>>> {
2867 use crate::backend::types::ScanRequest;
2868 use arrow_array::Array;
2869 use arrow_array::cast::AsArray;
2870
2871 let backend = self.storage.backend();
2872 let table_name = MainVertexDataset::table_name();
2873
2874 // Check if table exists first; if not, vertex hasn't been flushed to storage yet
2875 if !backend.table_exists(table_name).await? {
2876 return Ok(None);
2877 }
2878
2879 // Query for this specific vid (don't filter by _deleted yet - we need to find the latest version first)
2880 let filter = format!("_vid = {}", vid.as_u64());
2881 let batches = backend
2882 .scan(
2883 ScanRequest::all(table_name)
2884 .with_filter(filter)
2885 .with_columns(vec![
2886 "_vid".to_string(),
2887 "labels".to_string(),
2888 "_version".to_string(),
2889 "_deleted".to_string(),
2890 ]),
2891 )
2892 .await
2893 .unwrap_or_default();
2894
2895 // Find the row with the highest version number
2896 let mut max_version: Option<u64> = None;
2897 let mut labels: Option<Vec<String>> = None;
2898 let mut is_deleted = false;
2899
2900 for batch in batches {
2901 if batch.num_rows() == 0 {
2902 continue;
2903 }
2904
2905 let version_array = batch
2906 .column_by_name("_version")
2907 .unwrap()
2908 .as_primitive::<arrow_array::types::UInt64Type>();
2909
2910 let deleted_array = batch.column_by_name("_deleted").unwrap().as_boolean();
2911
2912 let labels_array = batch.column_by_name("labels").unwrap().as_list::<i32>();
2913
2914 for row_idx in 0..batch.num_rows() {
2915 let version = version_array.value(row_idx);
2916
2917 if max_version.is_none_or(|mv| version > mv) {
2918 is_deleted = deleted_array.value(row_idx);
2919
2920 let labels_list = labels_array.value(row_idx);
2921 let string_array = labels_list.as_string::<i32>();
2922 let vertex_labels: Vec<String> = (0..string_array.len())
2923 .filter(|&i| !string_array.is_null(i))
2924 .map(|i| string_array.value(i).to_string())
2925 .collect();
2926
2927 max_version = Some(version);
2928 labels = Some(vertex_labels);
2929 }
2930 }
2931 }
2932
2933 // If the latest version is deleted, return None
2934 if is_deleted { Ok(None) } else { Ok(labels) }
2935 }
2936
2937 #[expect(clippy::too_many_arguments)]
2938 #[instrument(skip(self, props, touched_keys), level = "trace")]
2939 pub async fn insert_edge_partial_full(
2940 &self,
2941 src_vid: Vid,
2942 dst_vid: Vid,
2943 edge_type: u32,
2944 eid: Eid,
2945 props: Properties,
2946 edge_type_name: Option<String>,
2947 touched_keys: HashSet<String>,
2948 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2949 ) -> Result<()> {
2950 self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
2951 if !self.config.partial_lance_writes {
2952 return self
2953 .insert_edge(
2954 src_vid,
2955 dst_vid,
2956 edge_type,
2957 eid,
2958 props,
2959 edge_type_name,
2960 tx_l0,
2961 )
2962 .await;
2963 }
2964
2965 let start = std::time::Instant::now();
2966 self.check_write_pressure().await?;
2967 self.check_transaction_memory(tx_l0)?;
2968 let mut props = props;
2969 self.prepare_edge_upsert(eid, &mut props, tx_l0).await?;
2970
2971 let l0 = self.resolve_l0(tx_l0);
2972 l0.write().insert_edge_partial_full(
2973 src_vid,
2974 dst_vid,
2975 edge_type,
2976 eid,
2977 props,
2978 edge_type_name,
2979 touched_keys,
2980 )?;
2981
2982 if tx_l0.is_none() {
2983 let version = l0.read().current_version;
2984 self.adjacency_manager
2985 .insert_edge(src_vid, dst_vid, eid, edge_type, version);
2986 }
2987
2988 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2989 metrics::counter!("uni_partial_writes_total").increment(1);
2990 self.update_metrics();
2991 if tx_l0.is_none() {
2992 self.check_flush().await?;
2993 }
2994 if start.elapsed().as_millis() > 100 {
2995 log::warn!(
2996 "Slow insert_edge_partial_full: {}ms",
2997 start.elapsed().as_millis()
2998 );
2999 }
3000 Ok(())
3001 }
3002
3003 #[expect(clippy::too_many_arguments)]
3004 pub async fn insert_edge(
3005 &self,
3006 src_vid: Vid,
3007 dst_vid: Vid,
3008 edge_type: u32,
3009 eid: Eid,
3010 mut properties: Properties,
3011 edge_type_name: Option<String>,
3012 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
3013 ) -> Result<()> {
3014 let start = std::time::Instant::now();
3015 self.check_write_pressure().await?;
3016 self.check_transaction_memory(tx_l0)?;
3017 self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
3018 self.prepare_edge_upsert(eid, &mut properties, tx_l0)
3019 .await?;
3020
3021 let l0 = self.resolve_l0(tx_l0);
3022 l0.write()
3023 .insert_edge(src_vid, dst_vid, edge_type, eid, properties, edge_type_name)?;
3024
3025 // Dual-write to AdjacencyManager overlay (survives flush).
3026 // Skip for transaction-local L0 -- transaction edges are overlaid separately.
3027 if tx_l0.is_none() {
3028 let version = l0.read().current_version;
3029 self.adjacency_manager
3030 .insert_edge(src_vid, dst_vid, eid, edge_type, version);
3031 }
3032
3033 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
3034 self.update_metrics();
3035
3036 if tx_l0.is_none() {
3037 self.check_flush().await?;
3038 }
3039 if start.elapsed().as_millis() > 100 {
3040 log::warn!("Slow insert_edge: {}ms", start.elapsed().as_millis());
3041 }
3042 Ok(())
3043 }
3044
3045 #[instrument(skip(self), level = "trace")]
3046 pub async fn delete_edge(
3047 &self,
3048 eid: Eid,
3049 src_vid: Vid,
3050 dst_vid: Vid,
3051 edge_type: u32,
3052 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
3053 ) -> Result<()> {
3054 let start = std::time::Instant::now();
3055 self.check_write_pressure().await?;
3056 self.check_transaction_memory(tx_l0)?;
3057 self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
3058 let l0 = self.resolve_l0(tx_l0);
3059
3060 l0.write().delete_edge(eid, src_vid, dst_vid, edge_type)?;
3061
3062 // Dual-write tombstone to AdjacencyManager overlay.
3063 if tx_l0.is_none() {
3064 let version = l0.read().current_version;
3065 self.adjacency_manager
3066 .add_tombstone(eid, src_vid, dst_vid, edge_type, version);
3067 }
3068 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
3069 self.update_metrics();
3070
3071 if tx_l0.is_none() {
3072 self.check_flush().await?;
3073 }
3074 if start.elapsed().as_millis() > 100 {
3075 log::warn!("Slow delete_edge: {}ms", start.elapsed().as_millis());
3076 }
3077 Ok(())
3078 }
3079
3080 /// Decide whether a flush should be triggered based on mutation count
3081 /// or elapsed time since the last flush.
3082 ///
3083 /// Extracted from [`Writer::check_flush`] so `commit_transaction_l0` can
3084 /// reuse the decision while bypassing the lock-acquiring entry point
3085 /// (it already holds `flush_lock`).
3086 fn should_flush(&self) -> bool {
3087 let count = self.l0_manager.get_current().read().mutation_count;
3088 if count == 0 {
3089 return false;
3090 }
3091 if count >= self.config.auto_flush_threshold {
3092 return true;
3093 }
3094 if let Some(interval) = self.config.auto_flush_interval
3095 && self.last_flush_time.lock().elapsed() >= interval
3096 && count >= self.config.auto_flush_min_mutations
3097 {
3098 return true;
3099 }
3100 false
3101 }
3102
3103 /// Check if flush should be triggered based on mutation count or time elapsed.
3104 /// This method is called after each write operation and can also be called
3105 /// by a background task for time-based flushing.
3106 pub async fn check_flush(&self) -> Result<()> {
3107 if self.should_flush() {
3108 self.flush_to_l1(None).await?;
3109 }
3110 Ok(())
3111 }
3112
3113 /// Process embeddings for a vertex using labels passed directly.
3114 /// Use this when labels haven't been stored to L0 yet.
3115 async fn process_embeddings_for_labels(
3116 &self,
3117 labels: &[String],
3118 properties: &mut Properties,
3119 ) -> Result<()> {
3120 let label_name = labels.first().map(|s| s.as_str());
3121 self.process_embeddings_impl(label_name, properties).await
3122 }
3123
3124 /// Phase B: if `defer_embeddings` is enabled in `UniConfig` and the
3125 /// vertex has an embedding config that hasn't been satisfied by the
3126 /// caller-provided properties, enqueue the VID in
3127 /// `L0Buffer::pending_embeddings` and return `true`. The caller then
3128 /// skips `process_embeddings_for_labels` and the embedding is computed
3129 /// in a single batched call at flush time via
3130 /// `drain_pending_embeddings`.
3131 ///
3132 /// Returns `false` (caller falls back to today's per-row eager embed)
3133 /// if any of:
3134 /// - the flag is off,
3135 /// - no label has an embedding config,
3136 /// - the user already provided the target property (matches the
3137 /// existing skip-if-present semantics at writer.rs:2727).
3138 ///
3139 /// Trade-off: when deferral is active, in-tx reads of the embedding
3140 /// column return only what was already in storage (or nothing for
3141 /// brand-new vertices). Existing tests that RETURN n.embedding in
3142 /// the same tx as a SET on the source column must run with the flag
3143 /// off; opt in only when no such reads happen between write and
3144 /// commit.
3145 fn try_defer_embedding(
3146 &self,
3147 labels: &[String],
3148 properties: &Properties,
3149 vid: Vid,
3150 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
3151 ) -> bool {
3152 if !self.config.defer_embeddings {
3153 return false;
3154 }
3155 let Some(label) = labels.first() else {
3156 return false;
3157 };
3158
3159 let schema = self.schema_manager.schema();
3160 let mut has_unsatisfied_cfg = false;
3161 for idx in &schema.indexes {
3162 if let IndexDefinition::Vector(v_cfg) = idx
3163 && v_cfg.label == *label
3164 && v_cfg.embedding_config.is_some()
3165 && !properties.contains_key(&v_cfg.property)
3166 {
3167 has_unsatisfied_cfg = true;
3168 break;
3169 }
3170 }
3171 if !has_unsatisfied_cfg {
3172 return false;
3173 }
3174
3175 let l0 = self.resolve_l0(tx_l0);
3176 let mut guard = l0.write();
3177 guard.pending_embeddings.insert(vid, label.clone());
3178 true
3179 }
3180
3181 /// Drain `pending_embeddings` from the rotated old-L0 right before
3182 /// `flush_stream_l1` reads it. Groups by label, issues one batched
3183 /// `process_embeddings_for_batch` call per label, and writes the
3184 /// resulting embedding vectors into each VID's `vertex_properties`
3185 /// map. After this returns, the flush proceeds against an L0 that
3186 /// looks no different from one whose embeddings were generated
3187 /// per-row at insert.
3188 ///
3189 /// Idempotent: a VID whose embedding was already materialized
3190 /// (e.g., by on-demand read paths in a future Phase B revision) is
3191 /// detected via `properties.contains_key(target_prop)` inside
3192 /// `process_embeddings_for_batch` (writer.rs:~2650), so re-running
3193 /// the drain is safe.
3194 async fn drain_pending_embeddings(&self, old_l0_arc: &Arc<RwLock<L0Buffer>>) -> Result<()> {
3195 let by_label: HashMap<String, Vec<Vid>> = {
3196 let guard = old_l0_arc.read();
3197 if guard.pending_embeddings.is_empty() {
3198 return Ok(());
3199 }
3200 let mut m: HashMap<String, Vec<Vid>> = HashMap::new();
3201 for (vid, label) in &guard.pending_embeddings {
3202 m.entry(label.clone()).or_default().push(*vid);
3203 }
3204 m
3205 };
3206
3207 for (label, vids) in by_label {
3208 let mut properties_batch: Vec<Properties> = {
3209 let guard = old_l0_arc.read();
3210 vids.iter()
3211 .map(|vid| {
3212 guard
3213 .vertex_properties
3214 .get(vid)
3215 .cloned()
3216 .unwrap_or_default()
3217 })
3218 .collect()
3219 };
3220
3221 self.process_embeddings_for_batch(std::slice::from_ref(&label), &mut properties_batch)
3222 .await?;
3223
3224 let mut guard = old_l0_arc.write();
3225 for (vid, props) in vids.iter().zip(properties_batch) {
3226 let target = guard.vertex_properties.entry(*vid).or_default();
3227 for (k, v) in props {
3228 target.insert(k, v);
3229 }
3230 guard.pending_embeddings.remove(vid);
3231 }
3232 }
3233 Ok(())
3234 }
3235
3236 /// Process embeddings for a batch of vertices efficiently.
3237 ///
3238 /// Groups vertices by embedding config and makes batched API calls to the
3239 /// embedding service instead of calling once per vertex.
3240 ///
3241 /// # Performance
3242 /// For N vertices with embedding config:
3243 /// - Old approach: N API calls to embedding service
3244 /// - New approach: 1 API call per embedding config (usually 1 total)
3245 async fn process_embeddings_for_batch(
3246 &self,
3247 labels: &[String],
3248 properties_batch: &mut [Properties],
3249 ) -> Result<()> {
3250 let label_name = labels.first().map(|s| s.as_str());
3251 let schema = self.schema_manager.schema();
3252
3253 if let Some(label) = label_name {
3254 // Find vector indexes with embedding config for this label
3255 let mut configs = Vec::new();
3256 for idx in &schema.indexes {
3257 if let IndexDefinition::Vector(v_config) = idx
3258 && v_config.label == label
3259 && let Some(emb_config) = &v_config.embedding_config
3260 {
3261 configs.push((v_config.property.clone(), emb_config.clone()));
3262 }
3263 }
3264
3265 if configs.is_empty() {
3266 return Ok(());
3267 }
3268
3269 for (target_prop, emb_config) in configs {
3270 // Collect input texts from all vertices that need embeddings
3271 let mut input_texts: Vec<String> = Vec::new();
3272 let mut needs_embedding: Vec<usize> = Vec::new();
3273
3274 for (idx, properties) in properties_batch.iter().enumerate() {
3275 // Skip if target property already exists
3276 if properties.contains_key(&target_prop) {
3277 continue;
3278 }
3279
3280 // Check if source properties exist
3281 let mut inputs = Vec::new();
3282 for src_prop in &emb_config.source_properties {
3283 if let Some(val) = properties.get(src_prop)
3284 && let Some(s) = val.as_str()
3285 {
3286 inputs.push(s.to_string());
3287 }
3288 }
3289
3290 if !inputs.is_empty() {
3291 let input_text = inputs.join(" ");
3292 let input_text = match &emb_config.document_prefix {
3293 Some(prefix) => format!("{prefix}{input_text}"),
3294 None => input_text,
3295 };
3296 input_texts.push(input_text);
3297 needs_embedding.push(idx);
3298 }
3299 }
3300
3301 if input_texts.is_empty() {
3302 continue;
3303 }
3304
3305 let runtime = self.xervo_runtime.get().ok_or_else(|| {
3306 anyhow!("Uni-Xervo runtime not configured for auto-embedding")
3307 })?;
3308 let embedder = runtime.embedding(&emb_config.alias).await?;
3309
3310 // Batch generate embeddings (single API call)
3311 let input_refs: Vec<&str> = input_texts.iter().map(|s| s.as_str()).collect();
3312 let embeddings = embedder.embed(input_refs).await?;
3313
3314 // Distribute results back to properties
3315 for (embedding_idx, &prop_idx) in needs_embedding.iter().enumerate() {
3316 if let Some(vec) = embeddings.get(embedding_idx) {
3317 let vals: Vec<Value> =
3318 vec.iter().map(|f| Value::Float(*f as f64)).collect();
3319 properties_batch[prop_idx].insert(target_prop.clone(), Value::List(vals));
3320 }
3321 }
3322 }
3323 }
3324
3325 Ok(())
3326 }
3327
3328 async fn process_embeddings_impl(
3329 &self,
3330 label_name: Option<&str>,
3331 properties: &mut Properties,
3332 ) -> Result<()> {
3333 let schema = self.schema_manager.schema();
3334
3335 if let Some(label) = label_name {
3336 // Find vector indexes with embedding config for this label
3337 let mut configs = Vec::new();
3338 for idx in &schema.indexes {
3339 if let IndexDefinition::Vector(v_config) = idx
3340 && v_config.label == label
3341 && let Some(emb_config) = &v_config.embedding_config
3342 {
3343 configs.push((v_config.property.clone(), emb_config.clone()));
3344 }
3345 }
3346
3347 if configs.is_empty() {
3348 log::info!("No embedding config found for label {}", label);
3349 }
3350
3351 for (target_prop, emb_config) in configs {
3352 // If target property already exists, skip (assume user provided it)
3353 if properties.contains_key(&target_prop) {
3354 continue;
3355 }
3356
3357 // Check if source properties exist
3358 let mut inputs = Vec::new();
3359 for src_prop in &emb_config.source_properties {
3360 if let Some(val) = properties.get(src_prop)
3361 && let Some(s) = val.as_str()
3362 {
3363 inputs.push(s.to_string());
3364 }
3365 }
3366
3367 if inputs.is_empty() {
3368 continue;
3369 }
3370
3371 let input_text = inputs.join(" ");
3372 let input_text = match &emb_config.document_prefix {
3373 Some(prefix) => format!("{prefix}{input_text}"),
3374 None => input_text,
3375 };
3376
3377 let runtime = self.xervo_runtime.get().ok_or_else(|| {
3378 anyhow!("Uni-Xervo runtime not configured for auto-embedding")
3379 })?;
3380 let embedder = runtime.embedding(&emb_config.alias).await?;
3381
3382 // Generate
3383 let embeddings = embedder.embed(vec![input_text.as_str()]).await?;
3384 if let Some(vec) = embeddings.first() {
3385 // Store as array of floats
3386 let vals: Vec<Value> = vec.iter().map(|f| Value::Float(*f as f64)).collect();
3387 properties.insert(target_prop.clone(), Value::List(vals));
3388 }
3389 }
3390 }
3391 Ok(())
3392 }
3393
3394 /// Flushes the current in-memory L0 buffer to L1 storage.
3395 ///
3396 /// # Lock Ordering
3397 ///
3398 /// To prevent deadlocks, locks must be acquired in the following order:
3399 /// 1. `Writer` lock (held by caller via outer `Arc<RwLock<Writer>>`; removed in Phase 4)
3400 /// 2. `flush_lock` (acquired by this entry point; held across the whole flush)
3401 /// 3. `L0Manager` lock (via `begin_flush` / `get_current`)
3402 /// 4. `L0Buffer` lock (individual buffer RWLocks)
3403 /// 5. `Index` / `Storage` locks (during actual flush)
3404 ///
3405 /// Callers that already hold `flush_lock` (today only `commit_transaction_l0`)
3406 /// must call `flush_inline_under_lock` (private) directly to avoid a re-entrant
3407 /// `tokio::sync::Mutex` deadlock — see concurrent_writer.md §5.5.
3408 pub async fn flush_to_l1(&self, name: Option<String>) -> Result<String> {
3409 // Drain any in-flight async flushes first. `flush_to_l1` is a
3410 // SYNCHRONIZATION BARRIER — callers (test fixtures, fork
3411 // setup, shutdown paths) rely on it as "all writes are now
3412 // durably in Lance". Without the drain, an async stream from
3413 // a recent commit might still be writing to Lance when
3414 // `flush_to_l1` returns, leaving a window where forks branch
3415 // off pre-write Lance state and lose data.
3416 if let Some(coord) = self.flush_coordinator.as_ref() {
3417 let _ = coord.drain(self.config.drop_fork_drain_timeout).await;
3418 }
3419 let _flush_lock_guard = self.flush_lock.lock().await;
3420 self.flush_inline_under_lock(name).await
3421 }
3422
3423 /// Async-flush entry point: rotate under `flush_lock`, release the
3424 /// lock, then submit the stream phase to the [`FlushCoordinator`].
3425 /// Returns a [`FlushTicket`](crate::runtime::flush_coordinator::FlushTicket)
3426 /// that resolves when finalize completes.
3427 ///
3428 /// Errors if `config.async_flush_enabled = false` (the coordinator
3429 /// is `None` in that case — see `flush_coordinator` field doc).
3430 pub async fn flush_to_l1_async(
3431 self: &Arc<Self>,
3432 name: Option<String>,
3433 ) -> Result<crate::runtime::flush_coordinator::FlushTicket> {
3434 let coord = self
3435 .flush_coordinator
3436 .as_ref()
3437 .ok_or_else(|| anyhow!("async flush not enabled (config.async_flush_enabled=false)"))?
3438 .clone();
3439 // 1. Acquire permit FIRST (outside flush_lock) so we don't
3440 // introduce a permit-while-holding-flush-lock convoy.
3441 let permit = coord.acquire_permit().await?;
3442 // 2. Rotate under flush_lock (µs work), then allocate the rotate seq
3443 // and bump pending ONLY after the rotate succeeds (Bug #3). A failed
3444 // rotate (the `?` below) must consume neither: the finalizer
3445 // advances in strictly consecutive seq order and only decrements
3446 // pending on finalize, so a leaked seq/pending would wedge it
3447 // forever. The seq is allocated under `flush_lock`, immediately
3448 // after the rotate and before the guard drops, so concurrent rotates
3449 // keep seq order == rotation order, and the seq is unused until
3450 // submit. On the `?` error path the permit drops, freeing the slot.
3451 let (
3452 RotateOutput {
3453 old_l0_arc,
3454 wal_lsn,
3455 current_version,
3456 flush_in_progress_guard,
3457 },
3458 seq,
3459 ) = {
3460 let _flush_lock_guard = self.flush_lock.lock().await;
3461 let rotate_out = self.flush_l0_rotate().await?;
3462 let seq = coord.next_rotate_seq();
3463 coord.note_pending();
3464 (rotate_out, seq)
3465 };
3466 // 3. Build the coordinator's RotatedFlush. parent_manifest is the
3467 // cached_manifest snapshot at this moment.
3468 let parent_manifest = self.cached_manifest.lock().clone();
3469 let rotated = RotatedFlush {
3470 seq,
3471 old_l0_arc: old_l0_arc.clone(),
3472 wal_lsn,
3473 current_version,
3474 name: name.clone(),
3475 parent_manifest,
3476 permit,
3477 flush_in_progress_guard,
3478 };
3479 // 4. Spawn the stream phase via the coordinator. The closure
3480 // captures Arc<Writer> transiently — drops when stream
3481 // completes (bounded, ~50-500 ms).
3482 let writer = self.clone();
3483 let ticket = coord.submit_for_stream(rotated, move |old_l0, wal, ver, n| async move {
3484 let outcome = writer.flush_stream_l1(old_l0, wal, ver, n).await?;
3485 Ok(crate::runtime::flush_coordinator::FlushOutcome {
3486 new_manifest: outcome.manifest,
3487 snapshot_id: outcome.snapshot_id,
3488 })
3489 });
3490 Ok(ticket)
3491 }
3492
3493 /// Phase A+B+C of the flush: flush the WAL, rotate L0 (so the
3494 /// to-be-flushed buffer moves to `pending_flush` and a fresh L0 takes
3495 /// its place), and hand off the WAL to the new L0.
3496 ///
3497 /// Runs in microseconds. Must be called under `flush_lock` (the caller
3498 /// is responsible). The returned [`RotateOutput`] carries everything
3499 /// the subsequent stream + finalize phases need; in particular the
3500 /// [`FlushInProgressGuard`] is bound to the return value so it stays
3501 /// alive for the full flush lifetime — including any future async
3502 /// path where stream runs on a spawned task.
3503 async fn flush_l0_rotate(&self) -> Result<RotateOutput> {
3504 // Acquire the in-progress counter BEFORE any heavy work. The
3505 // guard lives on RotateOutput; dropping RotateOutput drops the
3506 // guard, so the counter goes back to zero exactly when the flush
3507 // is fully done.
3508 let flush_in_progress_guard = FlushInProgressGuard::new(&self.storage);
3509
3510 // A. Flush WAL BEFORE rotating L0. If WAL flush fails, the
3511 // current L0 is still active and mutations are retained in
3512 // memory until restart/retry.
3513 let wal_for_truncate = {
3514 let current_l0 = self.l0_manager.get_current();
3515 let l0_guard = current_l0.read();
3516 l0_guard.wal.clone()
3517 };
3518 // Test-only seam (no-op without the `failpoints` feature): inject a
3519 // WAL-flush failure here to drive the "failed async rotate wedges the
3520 // finalizer" regression (Bug #3). When configured to "return" it makes
3521 // `flush_l0_rotate` return Err exactly as a real WAL-flush failure would.
3522 fail::fail_point!("flush::rotate-fail", |_| {
3523 Err(anyhow!("flush::rotate-fail injected WAL-flush failure"))
3524 });
3525 let wal_lsn = if let Some(ref w) = wal_for_truncate {
3526 w.flush().await?
3527 } else {
3528 0
3529 };
3530
3531 // B. Begin flush: rotate L0 and keep old L0 visible to reads via
3532 // pending_flush until complete_flush is called by finalize.
3533 let old_l0_arc = self.l0_manager.begin_flush(0, None);
3534 metrics::counter!("uni_l0_buffer_rotations_total").increment(1);
3535
3536 // C. WAL handoff: record wal_lsn on old L0, transfer WAL handle
3537 // and current_version to the new L0.
3538 let current_version;
3539 {
3540 let mut old_l0_guard = old_l0_arc.write();
3541 current_version = old_l0_guard.current_version;
3542 old_l0_guard.wal_lsn_at_flush = wal_lsn;
3543 let wal = old_l0_guard.wal.take();
3544 let new_l0_arc = self.l0_manager.get_current();
3545 let mut new_l0_guard = new_l0_arc.write();
3546 new_l0_guard.wal = wal;
3547 new_l0_guard.current_version = current_version;
3548 }
3549
3550 Ok(RotateOutput {
3551 old_l0_arc,
3552 wal_lsn,
3553 current_version,
3554 flush_in_progress_guard,
3555 })
3556 }
3557
3558 /// Phases D, E, F, G of the flush: L1 collect, orphan resolve,
3559 /// manifest seed, Lance writes. Reads from `old_l0_arc` (kept in
3560 /// pending_flush by Phase B); writes append-only Lance datasets; does
3561 /// NOT call save_snapshot / set_latest_snapshot — those are
3562 /// finalize's job, so the manifest doesn't get published until the
3563 /// next phase.
3564 ///
3565 /// Today takes `&self`; in a follow-up commit this becomes a
3566 /// static `Send + 'static` function over `SharedFlushCtx` so it can
3567 /// run on a spawned task while concurrent commits proceed.
3568 async fn flush_stream_l1(
3569 &self,
3570 old_l0_arc: Arc<RwLock<L0Buffer>>,
3571 wal_lsn: u64,
3572 current_version: u64,
3573 name: Option<String>,
3574 ) -> Result<FlushOutcome> {
3575 // Test-only seam (no-op without the `failpoints` feature): the rotate
3576 // (begin_flush) already moved the to-be-flushed buffer onto
3577 // pending_flush and installed a fresh empty current buffer, but the
3578 // rotated rows are NOT yet durable in Lance. Pausing here holds that
3579 // window open to drive the unique-constraint-hole regression
3580 // (Bug #9 Mechanism A).
3581 fail::fail_point!("flush::after-rotate-before-lance");
3582
3583 // Phase B: materialize any deferred embeddings before column
3584 // extraction. No-op when `defer_embeddings` is off (the set will
3585 // be empty). On-demand reads of the embedding column are a TODO
3586 // for a future revision (see UniConfig::defer_embeddings docs).
3587 self.drain_pending_embeddings(&old_l0_arc).await?;
3588
3589 let schema = self.schema_manager.schema();
3590 // 2. Acquire Read lock on Old L0 for flushing
3591 let mut entries_by_type: HashMap<u32, Vec<L1Entry>> = HashMap::new();
3592 // (Vid, labels, properties, deleted, version)
3593 type VertexEntry = (Vid, Vec<String>, Properties, bool, u64);
3594 let mut vertices_by_label: HashMap<u16, Vec<VertexEntry>> = HashMap::new();
3595 // Partial-column updates (Lance MergeInsert path). Per-VID tuple:
3596 // (vid, full L0 properties map, version, set of keys to update).
3597 // Only the keys in the HashSet are emitted to the partial source;
3598 // the full props map is retained so the per-row column extractor
3599 // can read each touched key's value.
3600 type PartialEntry = (Vid, Properties, u64, std::collections::HashSet<String>);
3601 let mut partial_by_label: HashMap<u16, Vec<PartialEntry>> = HashMap::new();
3602 // DELETE-via-MergeInsert (Round-12 §B): tombstones flush as a
3603 // partial source with just `_vid`, `_deleted=true`, `_version`,
3604 // `_updated_at`. Skips the wide-row Append payload that adds
3605 // nothing on a soft-delete.
3606 let mut tombstones_by_label: HashMap<u16, Vec<(Vid, u64)>> = HashMap::new();
3607 let mut main_vertex_tombstones: Vec<(Vid, u64)> = Vec::new();
3608 // Collect vertex timestamps from L0 for flushing to storage
3609 let mut vertex_created_at: HashMap<Vid, i64> = HashMap::new();
3610 let mut vertex_updated_at: HashMap<Vid, i64> = HashMap::new();
3611 // Track tombstones missing labels for storage query fallback
3612 let mut orphaned_tombstones: Vec<(Vid, u64)> = Vec::new();
3613
3614 {
3615 let old_l0 = old_l0_arc.read();
3616
3617 // 1. Collect all edges and tombstones from L0
3618 for edge in old_l0.graph.edges() {
3619 let properties = old_l0
3620 .edge_properties
3621 .get(&edge.eid)
3622 .cloned()
3623 .unwrap_or_default();
3624 let version = old_l0.edge_versions.get(&edge.eid).copied().unwrap_or(0);
3625
3626 // Get timestamps from L0 buffer (populated during insert)
3627 let created_at = old_l0.edge_created_at.get(&edge.eid).copied();
3628 let updated_at = old_l0.edge_updated_at.get(&edge.eid).copied();
3629
3630 entries_by_type
3631 .entry(edge.edge_type)
3632 .or_default()
3633 .push(L1Entry {
3634 src_vid: edge.src_vid,
3635 dst_vid: edge.dst_vid,
3636 eid: edge.eid,
3637 op: Op::Insert,
3638 version,
3639 properties,
3640 created_at,
3641 updated_at,
3642 });
3643 }
3644
3645 // From tombstones
3646 for tombstone in old_l0.tombstones.values() {
3647 let version = old_l0
3648 .edge_versions
3649 .get(&tombstone.eid)
3650 .copied()
3651 .unwrap_or(0);
3652 // Get timestamps - for deletes, updated_at reflects deletion time
3653 let created_at = old_l0.edge_created_at.get(&tombstone.eid).copied();
3654 let updated_at = old_l0.edge_updated_at.get(&tombstone.eid).copied();
3655
3656 entries_by_type
3657 .entry(tombstone.edge_type)
3658 .or_default()
3659 .push(L1Entry {
3660 src_vid: tombstone.src_vid,
3661 dst_vid: tombstone.dst_vid,
3662 eid: tombstone.eid,
3663 op: Op::Delete,
3664 version,
3665 properties: HashMap::new(),
3666 created_at,
3667 updated_at,
3668 });
3669 }
3670
3671 // 1b. Collect vertices by label (using vertex_labels from L0)
3672 //
3673 // Helper: fan-out a single vertex entry into per-label buckets.
3674 // Each per-label table row carries the full label set so multi-label
3675 // info is preserved after flush.
3676 let push_vertex_to_labels =
3677 |vid: Vid,
3678 all_labels: &[String],
3679 props: Properties,
3680 deleted: bool,
3681 version: u64,
3682 out: &mut HashMap<u16, Vec<VertexEntry>>| {
3683 for label in all_labels {
3684 if let Some(label_id) = schema.label_id_by_name(label) {
3685 out.entry(label_id).or_default().push((
3686 vid,
3687 all_labels.to_vec(),
3688 props.clone(),
3689 deleted,
3690 version,
3691 ));
3692 }
3693 }
3694 };
3695
3696 for (vid, props) in &old_l0.vertex_properties {
3697 let version = old_l0.vertex_versions.get(vid).copied().unwrap_or(0);
3698 // Collect timestamps for this vertex
3699 if let Some(&ts) = old_l0.vertex_created_at.get(vid) {
3700 vertex_created_at.insert(*vid, ts);
3701 }
3702 if let Some(&ts) = old_l0.vertex_updated_at.get(vid) {
3703 vertex_updated_at.insert(*vid, ts);
3704 }
3705 if let Some(labels) = old_l0.vertex_labels.get(vid) {
3706 // Partial-write routing: when this VID was last
3707 // touched via `insert_vertex_partial` AND the
3708 // partial_lance_writes flag is on, send only the
3709 // touched columns to a MergeInsert batch. Otherwise
3710 // (CREATE, MERGE-ON-CREATE, full-replace SET, DELETE
3711 // — or flag off) use the existing full-row Append.
3712 let is_partial = self.config.partial_lance_writes
3713 && old_l0.vertex_partial_keys.contains_key(vid);
3714 if is_partial {
3715 if let Some(touched) = old_l0.vertex_partial_keys.get(vid) {
3716 for label in labels {
3717 if let Some(label_id) = schema.label_id_by_name(label) {
3718 partial_by_label.entry(label_id).or_default().push((
3719 *vid,
3720 props.clone(),
3721 version,
3722 touched.clone(),
3723 ));
3724 }
3725 }
3726 }
3727 } else {
3728 push_vertex_to_labels(
3729 *vid,
3730 labels,
3731 props.clone(),
3732 false,
3733 version,
3734 &mut vertices_by_label,
3735 );
3736 }
3737 }
3738 }
3739 for &vid in &old_l0.vertex_tombstones {
3740 let version = old_l0.vertex_versions.get(&vid).copied().unwrap_or(0);
3741 if let Some(&ts) = old_l0.vertex_updated_at.get(&vid) {
3742 vertex_updated_at.insert(vid, ts);
3743 }
3744 if let Some(labels) = old_l0.vertex_labels.get(&vid) {
3745 // Round-12 §B: tombstones flush via Lance MergeInsert
3746 // (just `_vid`, `_deleted=true`, `_version`,
3747 // `_updated_at`) — skipping the wide-row Append.
3748 // Unconditional (no `partial_lance_writes` gating);
3749 // tombstone Append carries no useful payload.
3750 for label in labels {
3751 if let Some(label_id) = schema.label_id_by_name(label) {
3752 tombstones_by_label
3753 .entry(label_id)
3754 .or_default()
3755 .push((vid, version));
3756 }
3757 }
3758 } else {
3759 // Tombstone missing labels (old WAL format) - collect for storage query fallback
3760 orphaned_tombstones.push((vid, version));
3761 }
3762 }
3763 } // Drop read lock
3764
3765 // Resolve orphaned tombstones (missing labels) from storage
3766 if !orphaned_tombstones.is_empty() {
3767 tracing::warn!(
3768 count = orphaned_tombstones.len(),
3769 "Tombstones missing labels in L0, querying storage as fallback"
3770 );
3771 for (vid, version) in orphaned_tombstones {
3772 if let Ok(Some(labels)) = self.find_vertex_labels_in_storage(vid).await
3773 && !labels.is_empty()
3774 {
3775 for label in &labels {
3776 if let Some(label_id) = schema.label_id_by_name(label) {
3777 // Round-12 §B: route through partial tombstone too.
3778 tombstones_by_label
3779 .entry(label_id)
3780 .or_default()
3781 .push((vid, version));
3782 }
3783 }
3784 }
3785 }
3786 }
3787
3788 // 1. Load previous snapshot from cache, or fall back to storage.
3789 //
3790 // Use clone() not take(): for the async path, multiple
3791 // concurrent streams may run; if we take() here, a sibling
3792 // stream sees cached_manifest = None and seeds from
3793 // load_latest_snapshot (stale), losing the chain. clone()
3794 // preserves the parent. Finalize writes back the new manifest
3795 // unconditionally.
3796 let mut manifest = if let Some(cached) = self.cached_manifest.lock().clone() {
3797 cached
3798 } else {
3799 self.storage
3800 .snapshot_manager()
3801 .load_latest_snapshot()
3802 .await?
3803 .unwrap_or_else(|| {
3804 SnapshotManifest::new(Uuid::new_v4().to_string(), schema.schema_version)
3805 })
3806 };
3807
3808 // Update snapshot metadata
3809 // Save parent snapshot ID before generating new one (for lineage tracking)
3810 let parent_id = manifest.snapshot_id.clone();
3811 manifest.parent_snapshot = Some(parent_id);
3812 manifest.snapshot_id = Uuid::new_v4().to_string();
3813 manifest.name = name;
3814 manifest.created_at = Utc::now();
3815 manifest.version_high_water_mark = current_version;
3816 manifest.wal_high_water_mark = wal_lsn;
3817 let snapshot_id = manifest.snapshot_id.clone();
3818
3819 tracing::Span::current().record("snapshot_id", &snapshot_id);
3820
3821 // 2. Write main unified tables FIRST (before deltas).
3822 // Ensures the dual-write invariant: by the time an EID appears in a
3823 // delta table, it already exists in main_edges. This prevents the
3824 // compaction debug_assert from firing when compaction interleaves
3825 // with flush at async yield points.
3826 //
3827 // 2.1 Main edges table
3828 let (main_edges, edge_created_at_map, edge_updated_at_map) = {
3829 let _old_l0 = old_l0_arc.read();
3830 let mut main_edges: Vec<(
3831 uni_common::core::id::Eid,
3832 Vid,
3833 Vid,
3834 String,
3835 Properties,
3836 bool,
3837 u64,
3838 )> = Vec::new();
3839 let mut edge_created_at_map: HashMap<uni_common::core::id::Eid, i64> = HashMap::new();
3840 let mut edge_updated_at_map: HashMap<uni_common::core::id::Eid, i64> = HashMap::new();
3841
3842 for (&edge_type_id, entries) in entries_by_type.iter() {
3843 for entry in entries {
3844 let edge_type_name = self
3845 .storage
3846 .schema_manager()
3847 .edge_type_name_by_id_unified(edge_type_id)
3848 .unwrap_or_else(|| "unknown".to_string());
3849
3850 let deleted = matches!(entry.op, Op::Delete);
3851 main_edges.push((
3852 entry.eid,
3853 entry.src_vid,
3854 entry.dst_vid,
3855 edge_type_name,
3856 entry.properties.clone(),
3857 deleted,
3858 entry.version,
3859 ));
3860
3861 if let Some(ts) = entry.created_at {
3862 edge_created_at_map.insert(entry.eid, ts);
3863 }
3864 if let Some(ts) = entry.updated_at {
3865 edge_updated_at_map.insert(entry.eid, ts);
3866 }
3867 }
3868 }
3869
3870 (main_edges, edge_created_at_map, edge_updated_at_map)
3871 };
3872
3873 if !main_edges.is_empty() {
3874 let main_edge_batch = MainEdgeDataset::build_record_batch(
3875 &main_edges,
3876 Some(&edge_created_at_map),
3877 Some(&edge_updated_at_map),
3878 )?;
3879 MainEdgeDataset::write_batch(self.storage.backend(), main_edge_batch).await?;
3880 MainEdgeDataset::ensure_default_indexes(self.storage.backend()).await?;
3881 }
3882
3883 // 2.2 Main vertices table
3884 let main_vertices: Vec<(Vid, Vec<String>, Properties, bool, u64)> = {
3885 let old_l0 = old_l0_arc.read();
3886 let mut vertices = Vec::new();
3887
3888 // Live vertices: full-row Append on the main table (the
3889 // props_json blob is required for global ID lookups). For
3890 // partial-row VIDs (vertex_partial_keys non-empty), the
3891 // main table still needs the full props for the
3892 // ext_id-uniqueness path; we keep the Append here. The
3893 // per-label Lance write IS partial via MergeInsert.
3894 for (vid, props) in &old_l0.vertex_properties {
3895 let version = old_l0.vertex_versions.get(vid).copied().unwrap_or(0);
3896 let labels = old_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
3897 vertices.push((*vid, labels, props.clone(), false, version));
3898 }
3899
3900 // Tombstones: collected into `main_vertex_tombstones` for
3901 // the MergeInsert path below; skipping the wide-row Append.
3902 for &vid in &old_l0.vertex_tombstones {
3903 let version = old_l0.vertex_versions.get(&vid).copied().unwrap_or(0);
3904 main_vertex_tombstones.push((vid, version));
3905 }
3906
3907 vertices
3908 };
3909
3910 if !main_vertices.is_empty() {
3911 let main_vertex_batch = MainVertexDataset::build_record_batch(
3912 &main_vertices,
3913 Some(&vertex_created_at),
3914 Some(&vertex_updated_at),
3915 )?;
3916 MainVertexDataset::write_batch(self.storage.backend(), main_vertex_batch).await?;
3917 }
3918 // Round-12 §B: tombstones via MergeInsert on the main vertices
3919 // table. Independent of `vertex_properties` length.
3920 if !main_vertex_tombstones.is_empty() {
3921 let tomb_batch = MainVertexDataset::build_tombstone_partial_batch(
3922 &main_vertex_tombstones,
3923 Some(&vertex_updated_at),
3924 )?;
3925 MainVertexDataset::merge_insert_tombstone_batch(self.storage.backend(), tomb_batch)
3926 .await?;
3927 }
3928 if !main_vertices.is_empty() || !main_vertex_tombstones.is_empty() {
3929 MainVertexDataset::ensure_default_indexes(self.storage.backend()).await?;
3930 }
3931
3932 // 3. For each edge type, write FWD and BWD delta runs
3933 for (&edge_type_id, entries) in entries_by_type.iter() {
3934 // Get edge type name from unified lookup (handles both schema'd and schemaless)
3935 let edge_type_name = self
3936 .storage
3937 .schema_manager()
3938 .edge_type_name_by_id_unified(edge_type_id)
3939 .ok_or_else(|| anyhow!("Edge type ID {} not found", edge_type_id))?;
3940
3941 // FWD Run (sorted by src_vid)
3942 // Round-12 §A: split entries into full-row Append and
3943 // partial MergeInsert routes based on `edge_partial_keys`.
3944 // Edges in `edge_partial_keys` were last written via
3945 // `insert_edge_partial_full`; the per-edge-type delta
3946 // tables receive only the touched schema columns plus
3947 // (when any overflow key was touched) the regenerated
3948 // `overflow_json` blob. Untouched columns retain their
3949 // previous-version value via Lance MergeInsert.
3950 let partial_eids: std::collections::HashSet<Eid> = {
3951 let old_l0 = old_l0_arc.read();
3952 entries
3953 .iter()
3954 .filter(|e| {
3955 self.config.partial_lance_writes
3956 && old_l0.edge_partial_keys.contains_key(&e.eid)
3957 })
3958 .map(|e| e.eid)
3959 .collect()
3960 };
3961 let touched_union_by_eid: HashMap<Eid, std::collections::HashSet<String>> = {
3962 let old_l0 = old_l0_arc.read();
3963 partial_eids
3964 .iter()
3965 .filter_map(|eid| old_l0.edge_partial_keys.get(eid).map(|s| (*eid, s.clone())))
3966 .collect()
3967 };
3968 let (full_entries, partial_entries): (Vec<L1Entry>, Vec<L1Entry>) = entries
3969 .clone()
3970 .into_iter()
3971 .partition(|e| !partial_eids.contains(&e.eid));
3972
3973 let backend = self.storage.backend();
3974
3975 // FWD run (sorted by src_vid)
3976 let mut fwd_full = full_entries.clone();
3977 fwd_full.sort_by_key(|e| e.src_vid);
3978 let mut fwd_partial = partial_entries.clone();
3979 fwd_partial.sort_by_key(|e| e.src_vid);
3980 let fwd_ds = self.storage.delta_dataset(&edge_type_name, "fwd")?;
3981 if !fwd_full.is_empty() {
3982 let fwd_batch = fwd_ds.build_record_batch(&fwd_full, &schema)?;
3983 fwd_ds.write_run(backend, fwd_batch).await?;
3984 }
3985 if !fwd_partial.is_empty() {
3986 let touched_union: std::collections::HashSet<String> = fwd_partial
3987 .iter()
3988 .flat_map(|e| {
3989 touched_union_by_eid
3990 .get(&e.eid)
3991 .cloned()
3992 .unwrap_or_default()
3993 .into_iter()
3994 })
3995 .collect();
3996 let fwd_partial_batch =
3997 fwd_ds.build_partial_record_batch(&fwd_partial, &touched_union, &schema)?;
3998 fwd_ds
3999 .merge_insert_partial_run(backend, fwd_partial_batch)
4000 .await?;
4001 }
4002 fwd_ds.ensure_eid_index(backend).await?;
4003
4004 // BWD Run (sorted by dst_vid)
4005 let mut bwd_full = full_entries.clone();
4006 bwd_full.sort_by_key(|e| e.dst_vid);
4007 let mut bwd_partial = partial_entries.clone();
4008 bwd_partial.sort_by_key(|e| e.dst_vid);
4009 let bwd_ds = self.storage.delta_dataset(&edge_type_name, "bwd")?;
4010 if !bwd_full.is_empty() {
4011 let bwd_batch = bwd_ds.build_record_batch(&bwd_full, &schema)?;
4012 bwd_ds.write_run(backend, bwd_batch).await?;
4013 }
4014 if !bwd_partial.is_empty() {
4015 let touched_union: std::collections::HashSet<String> = bwd_partial
4016 .iter()
4017 .flat_map(|e| {
4018 touched_union_by_eid
4019 .get(&e.eid)
4020 .cloned()
4021 .unwrap_or_default()
4022 .into_iter()
4023 })
4024 .collect();
4025 let bwd_partial_batch =
4026 bwd_ds.build_partial_record_batch(&bwd_partial, &touched_union, &schema)?;
4027 bwd_ds
4028 .merge_insert_partial_run(backend, bwd_partial_batch)
4029 .await?;
4030 }
4031 bwd_ds.ensure_eid_index(backend).await?;
4032
4033 // Update Manifest
4034 let current_snap =
4035 manifest
4036 .edges
4037 .entry(edge_type_name.to_string())
4038 .or_insert(EdgeSnapshot {
4039 version: 0,
4040 count: 0,
4041 lance_version: 0,
4042 });
4043 current_snap.version += 1;
4044 current_snap.count += entries.len() as u64;
4045 // LanceDB tables don't expose Lance version directly
4046 current_snap.lance_version = 0;
4047
4048 // Note: No CSR invalidation needed. AdjacencyManager's overlay
4049 // already has these edges via dual-write in insert_edge/delete_edge.
4050 }
4051
4052 // 4. Per-label vertex table writes
4053 // Iterate all labels that have either full-row OR partial-write
4054 // data pending. A label may appear in only one of the two maps
4055 // (e.g., all updates on this label were partial-only).
4056 let all_label_ids: std::collections::HashSet<u16> = vertices_by_label
4057 .keys()
4058 .chain(partial_by_label.keys())
4059 .chain(tombstones_by_label.keys())
4060 .copied()
4061 .collect();
4062 for label_id in all_label_ids {
4063 let vertices = vertices_by_label.remove(&label_id).unwrap_or_default();
4064 let label_name = schema
4065 .label_name_by_id(label_id)
4066 .ok_or_else(|| anyhow!("Label ID {} not found", label_id))?;
4067
4068 let ds = self.storage.vertex_dataset(label_name)?;
4069
4070 // Collect inverted index updates before consuming vertices
4071 // Maps: cfg.property -> (added, removed)
4072 type InvertedUpdateMap = HashMap<String, (HashMap<Vid, Vec<String>>, HashSet<Vid>)>;
4073 let mut inverted_updates: InvertedUpdateMap = HashMap::new();
4074
4075 for idx in &schema.indexes {
4076 if let IndexDefinition::Inverted(cfg) = idx
4077 && cfg.label == label_name
4078 {
4079 let mut added: HashMap<Vid, Vec<String>> = HashMap::new();
4080 let mut removed: HashSet<Vid> = HashSet::new();
4081
4082 for (vid, _labels, props, deleted, _version) in &vertices {
4083 if *deleted {
4084 removed.insert(*vid);
4085 } else if let Some(prop_value) = props.get(&cfg.property) {
4086 // Extract terms from the property value (List<String>)
4087 if let Some(arr) = prop_value.as_array() {
4088 let terms: Vec<String> = arr
4089 .iter()
4090 .filter_map(|v| v.as_str().map(ToString::to_string))
4091 .collect();
4092 if !terms.is_empty() {
4093 added.insert(*vid, terms);
4094 }
4095 }
4096 }
4097 }
4098 // Round-12 §B: tombstones no longer in `vertices`;
4099 // pull them from `tombstones_by_label` for inverted
4100 // index removal.
4101 if let Some(tomb_rows) = tombstones_by_label.get(&label_id) {
4102 for (vid, _) in tomb_rows {
4103 removed.insert(*vid);
4104 }
4105 }
4106
4107 if !added.is_empty() || !removed.is_empty() {
4108 inverted_updates.insert(cfg.property.clone(), (added, removed));
4109 }
4110 }
4111 }
4112
4113 let mut v_data = Vec::new();
4114 let mut d_data = Vec::new();
4115 let mut ver_data = Vec::new();
4116 for (vid, labels, props, deleted, version) in vertices {
4117 v_data.push((vid, labels, props));
4118 d_data.push(deleted);
4119 ver_data.push(version);
4120 }
4121
4122 let backend = self.storage.backend();
4123
4124 // Skip the full-row Append entirely if this label only has
4125 // partial-write rows pending.
4126 if !v_data.is_empty() {
4127 let batch = ds.build_record_batch_with_timestamps(
4128 &v_data,
4129 &d_data,
4130 &ver_data,
4131 &schema,
4132 Some(&vertex_created_at),
4133 Some(&vertex_updated_at),
4134 )?;
4135 ds.write_batch(backend, batch, &schema).await?;
4136 }
4137
4138 // Partial-column batch (Lance MergeInsert path). The flag
4139 // gates whether the routing classified any VIDs as partial;
4140 // outside the flag this collection is always empty so the
4141 // call below is a cheap no-op.
4142 if let Some(partial_rows) = partial_by_label.remove(&label_id)
4143 && !partial_rows.is_empty()
4144 {
4145 let touched_union: std::collections::HashSet<String> = partial_rows
4146 .iter()
4147 .flat_map(|(_, _, _, keys)| keys.iter().cloned())
4148 .collect();
4149 let pairs: Vec<(Vid, Properties)> = partial_rows
4150 .iter()
4151 .map(|(vid, props, _, _)| (*vid, props.clone()))
4152 .collect();
4153 let versions: Vec<u64> = partial_rows.iter().map(|(_, _, v, _)| *v).collect();
4154 let partial_batch = ds.build_partial_record_batch(
4155 &pairs,
4156 &versions,
4157 &touched_union,
4158 &schema,
4159 Some(&vertex_updated_at),
4160 )?;
4161 if partial_batch.num_rows() > 0 {
4162 ds.merge_insert_batch(backend, partial_batch).await?;
4163 }
4164 }
4165
4166 // Tombstone batch (Round-12 §B): always MergeInsert with
4167 // just `_vid`, `_deleted=true`, `_version`, `_updated_at`.
4168 // No partial_lance_writes gating — tombstones never carry
4169 // useful property payload to write. Captured tombstone vids
4170 // also drive `remove_from_vid_labels_index` below.
4171 let tombstone_rows = tombstones_by_label.remove(&label_id).unwrap_or_default();
4172 if !tombstone_rows.is_empty() {
4173 let tomb_batch =
4174 ds.build_tombstone_partial_batch(&tombstone_rows, Some(&vertex_updated_at))?;
4175 if tomb_batch.num_rows() > 0 {
4176 ds.merge_insert_batch(backend, tomb_batch).await?;
4177 }
4178 }
4179
4180 ds.ensure_default_indexes(backend).await?;
4181
4182 // Update VidLabelsIndex (if enabled). v_data carries live
4183 // vertices; tombstone removals come from the
4184 // `tombstone_rows` captured above the MergeInsert call.
4185 for ((vid, labels, _props), &deleted) in v_data.iter().zip(d_data.iter()) {
4186 if deleted {
4187 self.storage.remove_from_vid_labels_index(*vid);
4188 } else {
4189 self.storage.update_vid_labels_index(*vid, labels.clone());
4190 }
4191 }
4192 for (vid, _) in &tombstone_rows {
4193 self.storage.remove_from_vid_labels_index(*vid);
4194 }
4195
4196 // Update Manifest
4197 let current_snap =
4198 manifest
4199 .vertices
4200 .entry(label_name.to_string())
4201 .or_insert(LabelSnapshot {
4202 version: 0,
4203 count: 0,
4204 lance_version: 0,
4205 });
4206 current_snap.version += 1;
4207 current_snap.count += v_data.len() as u64;
4208 // LanceDB tables don't expose Lance version directly
4209 current_snap.lance_version = 0;
4210
4211 // Invalidate table cache to ensure next read picks up new version
4212 self.storage.invalidate_table_cache(label_name);
4213
4214 // Apply inverted index updates incrementally
4215 #[cfg(feature = "lance-backend")]
4216 for idx in &schema.indexes {
4217 if let IndexDefinition::Inverted(cfg) = idx
4218 && cfg.label == label_name
4219 && let Some((added, removed)) = inverted_updates.get(&cfg.property)
4220 {
4221 self.storage
4222 .index_manager()
4223 .update_inverted_index_incremental(cfg, added, removed)
4224 .await?;
4225 }
4226 }
4227
4228 // Update UID index with new vertex mappings
4229 // Collect (UniId, Vid) mappings from non-deleted vertices
4230 #[cfg(feature = "lance-backend")]
4231 {
4232 let mut uid_mappings: Vec<(uni_common::core::id::UniId, Vid)> = Vec::new();
4233 for (vid, _labels, props) in &v_data {
4234 let ext_id = props.get("ext_id").and_then(|v| v.as_str());
4235 let uid = crate::storage::vertex::VertexDataset::compute_vertex_uid(
4236 label_name, ext_id, props,
4237 );
4238 uid_mappings.push((uid, *vid));
4239 }
4240
4241 if !uid_mappings.is_empty()
4242 && let Ok(uid_index) = self.storage.uid_index(label_name)
4243 {
4244 // Stamp mappings with this flush's MVCC version so a later
4245 // re-create of the same UID deterministically outranks the
4246 // stale mapping (review C3).
4247 uid_index
4248 .write_mapping_versioned(&uid_mappings, current_version)
4249 .await?;
4250 }
4251 }
4252 }
4253 Ok(FlushOutcome {
4254 manifest,
4255 snapshot_id,
4256 })
4257 }
4258
4259 /// Composition entry that assumes the caller already holds `flush_lock`.
4260 /// Runs rotate + stream + finalize_locked in sequence. Used by
4261 /// [`Writer::flush_to_l1`] (acquires the lock first) and by
4262 /// `commit_transaction_l0`'s post-merge auto-flush branch (which already
4263 /// holds the lock from the commit critical section).
4264 #[instrument(
4265 skip(self),
4266 fields(snapshot_id, mutations_count, size_bytes),
4267 level = "info"
4268 )]
4269 async fn flush_inline_under_lock(&self, name: Option<String>) -> Result<String> {
4270 let start = std::time::Instant::now();
4271
4272 let (initial_size, initial_count) = {
4273 let l0_arc = self.l0_manager.get_current();
4274 let l0 = l0_arc.read();
4275 (l0.estimated_size, l0.mutation_count)
4276 };
4277 tracing::Span::current().record("size_bytes", initial_size);
4278 tracing::Span::current().record("mutations_count", initial_count);
4279
4280 debug!("Starting L0 flush to L1");
4281
4282 // Phases A (WAL pre-flush), B (rotate), C (WAL handoff).
4283 // FlushInProgressGuard lives on RotateOutput and stays alive for
4284 // the full flush — including the finalize_locked call below.
4285 let RotateOutput {
4286 old_l0_arc,
4287 wal_lsn,
4288 current_version,
4289 flush_in_progress_guard: _flush_guard,
4290 } = self.flush_l0_rotate().await?;
4291
4292 // Phases D (L1 collect), E (orphan resolve), F (manifest seed),
4293 // G (Lance writes). Builds the manifest but does NOT publish it.
4294 let FlushOutcome {
4295 manifest,
4296 snapshot_id,
4297 } = self
4298 .flush_stream_l1(old_l0_arc.clone(), wal_lsn, current_version, name)
4299 .await?;
4300
4301 // Phases H..S: publish manifest, complete_flush, WAL truncate,
4302 // property cache clear, last_flush_time, metrics, l1_runs++,
4303 // compaction trigger, index-rebuild scheduling, fork tick.
4304 self.flush_finalize_locked(
4305 old_l0_arc,
4306 wal_lsn,
4307 manifest,
4308 snapshot_id,
4309 initial_size,
4310 initial_count,
4311 start,
4312 )
4313 .await
4314 }
4315
4316 /// Phases H..S of the flush: publish the manifest and run all
4317 /// post-publish bookkeeping. Assumes the caller already holds
4318 /// `flush_lock` — see [`Writer::flush_finalize_now`] for the
4319 /// lock-acquiring variant used by the async finalize path.
4320 #[allow(clippy::too_many_arguments)]
4321 async fn flush_finalize_locked(
4322 &self,
4323 old_l0_arc: Arc<RwLock<L0Buffer>>,
4324 wal_lsn: u64,
4325 manifest: SnapshotManifest,
4326 snapshot_id: String,
4327 initial_size: usize,
4328 initial_count: usize,
4329 start: std::time::Instant,
4330 ) -> Result<String> {
4331 Self::flush_finalize_body(
4332 &self.shared_ctx(),
4333 old_l0_arc,
4334 wal_lsn,
4335 manifest,
4336 snapshot_id,
4337 initial_size,
4338 initial_count,
4339 start,
4340 )
4341 .await
4342 }
4343
4344 /// Phases H..S of the flush, lock-acquiring variant. Used by the
4345 /// async-flush finalizer task (running on a spawned tokio task),
4346 /// which holds neither `&self` nor `flush_lock`. Briefly re-acquires
4347 /// `flush_lock` to serialize the publish boundary, then runs the
4348 /// same body as `flush_finalize_locked` but over a SharedFlushCtx.
4349 #[allow(clippy::too_many_arguments)]
4350 pub(crate) async fn flush_finalize_now(
4351 shared: SharedFlushCtx,
4352 old_l0_arc: Arc<RwLock<L0Buffer>>,
4353 wal_lsn: u64,
4354 manifest: SnapshotManifest,
4355 snapshot_id: String,
4356 initial_size: usize,
4357 initial_count: usize,
4358 start: std::time::Instant,
4359 ) -> Result<String> {
4360 let _flush_lock_guard = shared.flush_lock.clone().lock_owned().await;
4361 Self::flush_finalize_body(
4362 &shared,
4363 old_l0_arc,
4364 wal_lsn,
4365 manifest,
4366 snapshot_id,
4367 initial_size,
4368 initial_count,
4369 start,
4370 )
4371 .await
4372 }
4373
4374 /// Shared body of `flush_finalize_locked` and `flush_finalize_now`.
4375 /// Static over `SharedFlushCtx`; the caller is responsible for
4376 /// holding `flush_lock`.
4377 #[allow(clippy::too_many_arguments)]
4378 async fn flush_finalize_body(
4379 shared: &SharedFlushCtx,
4380 old_l0_arc: Arc<RwLock<L0Buffer>>,
4381 wal_lsn: u64,
4382 mut manifest: SnapshotManifest,
4383 snapshot_id: String,
4384 initial_size: usize,
4385 initial_count: usize,
4386 start: std::time::Instant,
4387 ) -> Result<String> {
4388 // Parent-snapshot fixup. The stream phase built `manifest` with
4389 // parent_snapshot set from cached_manifest at stream time. If
4390 // OTHER flushes (sync or async) have finalized since then,
4391 // cached_manifest has advanced. Re-link this manifest to the
4392 // current cached chain so we don't orphan their data when we
4393 // overwrite cached_manifest below.
4394 let current_parent_id = shared
4395 .cached_manifest
4396 .lock()
4397 .as_ref()
4398 .map(|m| m.snapshot_id.clone());
4399 if current_parent_id.is_some() && manifest.parent_snapshot != current_parent_id {
4400 manifest.parent_snapshot = current_parent_id;
4401 metrics::counter!("uni_flush_parent_chain_fixups_total").increment(1);
4402 }
4403
4404 // H. Publish manifest (body first, then pointer — recovery is
4405 // idempotent if we crash between the two).
4406 shared
4407 .storage
4408 .snapshot_manager()
4409 .save_snapshot(&manifest)
4410 .await?;
4411 shared
4412 .storage
4413 .snapshot_manager()
4414 .set_latest_snapshot(&manifest.snapshot_id)
4415 .await?;
4416
4417 // H2. Durability barrier (review C4). `save_snapshot` / `set_latest_snapshot`
4418 // wrote the manifest body and the `catalog/latest` pointer through the
4419 // object store, which does NOT fsync. WAL truncation (K, below) removes
4420 // the only other durable copy of this flush's data, so a crash after K
4421 // but before the OS flushed those writes would lose the snapshot —
4422 // recovery could not resolve `latest`. Make them durable now (local-fs
4423 // only; remote stores provide their own durability on `put`).
4424 crate::snapshot::manager::fsync_snapshot_pointer(
4425 shared.storage.local_fs_root().as_deref(),
4426 &manifest.snapshot_id,
4427 )
4428 .map_err(|e| {
4429 anyhow!(
4430 "fsync snapshot {} before WAL truncate: {}",
4431 manifest.snapshot_id,
4432 e
4433 )
4434 })?;
4435
4436 // I. Cache manifest for next flush to avoid re-reading from object store.
4437 *shared.cached_manifest.lock() = Some(manifest.clone());
4438
4439 // L. Invalidate the property cache BEFORE removing the flushed buffer
4440 // from the L0 chain (Bug #10). `clear_cache` has no dependency on the
4441 // complete_flush (J) / WAL-truncate (K) steps below, so clearing it
4442 // first closes the non-monotonic-read window: once the buffer leaves
4443 // the L0 chain at J a freshly-written value would otherwise miss the
4444 // chain and fall through to a stale cache entry. By the time finalize
4445 // runs the streamed rows are already durable in L1, so a post-clear
4446 // read falls through to fresh storage instead. The finalizer holds
4447 // `flush_lock` throughout, so reordering L ahead of J is safe.
4448 if let Some(ref pm) = shared.property_manager {
4449 pm.clear_cache().await;
4450 }
4451
4452 // J. Complete flush: remove old L0 from pending_flush. MUST happen
4453 // BEFORE WAL truncation so min_pending_wal_lsn is accurate.
4454 shared.l0_manager.complete_flush(&old_l0_arc);
4455
4456 // Test-only seam (no-op without the `failpoints` feature): pause AFTER
4457 // complete_flush removed the buffer from the L0 chain (J) but BEFORE
4458 // WAL truncation (K). The property cache is already cleared (L moved
4459 // ahead of J above), so a read in this window falls through to fresh
4460 // L1 storage rather than a stale cache entry (Bug #10 — non-monotonic
4461 // read after flush finalize).
4462 fail::fail_point!("flush::after-complete-before-cache-clear");
4463
4464 // K. Truncate WAL up to the safe LSN.
4465 let wal_handle = shared.l0_manager.get_current().read().wal.clone();
4466 if let Some(w) = wal_handle {
4467 let safe_lsn = shared
4468 .l0_manager
4469 .min_pending_wal_lsn()
4470 .map(|min_pending| min_pending.min(wal_lsn))
4471 .unwrap_or(wal_lsn);
4472 w.truncate_before(safe_lsn).await?;
4473 }
4474
4475 // M. Reset last flush time for time-based auto-flush.
4476 *shared.last_flush_time.lock() = std::time::Instant::now();
4477
4478 info!(
4479 snapshot_id,
4480 mutations_count = initial_count,
4481 size_bytes = initial_size,
4482 "L0 flush to L1 completed successfully"
4483 );
4484 metrics::histogram!("uni_flush_duration_seconds").record(start.elapsed().as_secs_f64());
4485 metrics::counter!("uni_flush_bytes_total").increment(initial_size as u64);
4486 metrics::counter!("uni_flush_rows_total").increment(initial_count as u64);
4487
4488 // P. Increment flush generation counter for write throttling.
4489 {
4490 let mut status = uni_common::sync::acquire_mutex(
4491 &shared.storage.compaction_status,
4492 "compaction_status",
4493 )?;
4494 status.l1_runs += 1;
4495 }
4496
4497 // Q. Trigger CSR compaction if enough frozen segments have accumulated.
4498 let am = shared.adjacency_manager.clone();
4499 if am.should_compact(shared.compaction_config.frozen_segments_compact_threshold) {
4500 let previous_still_running = {
4501 let guard = shared.compaction_handle.read();
4502 guard.as_ref().is_some_and(|h| !h.is_finished())
4503 };
4504 if previous_still_running {
4505 info!("Skipping compaction: previous compaction still in progress");
4506 } else {
4507 let handle = tokio::spawn(async move {
4508 am.compact();
4509 });
4510 *shared.compaction_handle.write() = Some(handle);
4511 }
4512 }
4513
4514 // R. Post-flush: check if any indexes need rebuilding based on thresholds.
4515 if shared.auto_rebuild_enabled
4516 && let Some(rebuild_mgr) = shared.index_rebuild_manager.get()
4517 {
4518 Self::schedule_index_rebuilds_if_needed_static(
4519 &manifest,
4520 rebuild_mgr.clone(),
4521 shared.schema_manager.clone(),
4522 shared.index_rebuild_config.clone(),
4523 );
4524 }
4525
4526 // S. Emit fork-fragment observability after a successful forked flush.
4527 Self::tick_fork_fragment_observability_static(
4528 shared.fork_id,
4529 shared.fork_flush_count.clone(),
4530 shared.fork_fragment_warn_fired.clone(),
4531 shared.fork_fragment_warn_threshold,
4532 );
4533
4534 Ok(snapshot_id)
4535 }
4536
4537 /// Increment fork-flush bookkeeping and fire the fragment warn
4538 /// once if the threshold is crossed.
4539 ///
4540 /// Each flush typically appends ~1 fragment per touched dataset on
4541 /// the fork's branches; without compaction (deferred to Phase 5)
4542 /// long-lived heavy-write forks degrade. The flush count is a
4543 /// proxy for actual fragment growth — reading
4544 /// `Dataset::manifest().fragments.len()` per dataset would add a
4545 /// per-flush object-store roundtrip on the hot commit path, which
4546 /// is too costly for a purely observational guard rail.
4547 ///
4548 /// No-op for primary writers (`fork_id == None`).
4549 #[allow(dead_code)] // called by tests; production path uses _static
4550 pub(crate) fn tick_fork_fragment_observability(&self) {
4551 Self::tick_fork_fragment_observability_static(
4552 self.fork_id,
4553 self.fork_flush_count.clone(),
4554 self.fork_fragment_warn_fired.clone(),
4555 self.config.fork_fragment_warn_threshold,
4556 );
4557 }
4558
4559 /// Static variant of [`Writer::tick_fork_fragment_observability`].
4560 /// Used by the async-flush finalize path, where we hold a
4561 /// [`SharedFlushCtx`] bundle of Arcs rather than `&Writer`.
4562 pub(crate) fn tick_fork_fragment_observability_static(
4563 fork_id: Option<ForkId>,
4564 fork_flush_count: Arc<AtomicU64>,
4565 fork_fragment_warn_fired: Arc<AtomicBool>,
4566 warn_threshold: usize,
4567 ) {
4568 let Some(fork_id) = fork_id else { return };
4569 // `Relaxed` is sufficient: observational counter, no synchronizes-with.
4570 let new_count = fork_flush_count.fetch_add(1, Ordering::Relaxed) + 1;
4571 let fork_label = fork_id.to_string();
4572 metrics::gauge!(
4573 "uni_fork_l1_flushes",
4574 "fork" => fork_label.clone(),
4575 )
4576 .set(new_count as f64);
4577 let threshold = warn_threshold as u64;
4578 if !fork_fragment_warn_fired.load(Ordering::Relaxed)
4579 && threshold > 0
4580 && new_count >= threshold
4581 {
4582 fork_fragment_warn_fired.store(true, Ordering::Relaxed);
4583 tracing::warn!(
4584 fork = %fork_label,
4585 flush_count = new_count,
4586 threshold,
4587 "fork has exceeded the L1 flush-count threshold; \
4588 fork compaction is deferred to Phase 5 — consider \
4589 drop+recreate or promotion to bound fragment growth"
4590 );
4591 }
4592 }
4593
4594 /// Check rebuild thresholds and schedule background index rebuilds for
4595 /// labels that exceed growth or age limits. Marks affected indexes as
4596 /// `Stale` and spawns an async task to schedule the rebuild.
4597 #[allow(dead_code)] // production path uses _static; kept as the
4598 // documented instance entry point.
4599 fn schedule_index_rebuilds_if_needed(
4600 &self,
4601 manifest: &SnapshotManifest,
4602 rebuild_mgr: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
4603 ) {
4604 Self::schedule_index_rebuilds_if_needed_static(
4605 manifest,
4606 rebuild_mgr,
4607 self.schema_manager.clone(),
4608 self.config.index_rebuild.clone(),
4609 );
4610 }
4611
4612 /// Static variant of [`Writer::schedule_index_rebuilds_if_needed`].
4613 /// Used by the async-flush finalize path, where we hold the
4614 /// [`SchemaManager`] via `SharedFlushCtx` rather than `&Writer`.
4615 pub(crate) fn schedule_index_rebuilds_if_needed_static(
4616 manifest: &SnapshotManifest,
4617 rebuild_mgr: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
4618 schema_manager: Arc<uni_common::core::schema::SchemaManager>,
4619 index_rebuild_config: uni_common::config::IndexRebuildConfig,
4620 ) {
4621 let checker =
4622 crate::storage::index_rebuild::RebuildTriggerChecker::new(index_rebuild_config);
4623 let schema = schema_manager.schema();
4624 let labels = checker.labels_needing_rebuild(manifest, &schema.indexes);
4625
4626 if labels.is_empty() {
4627 return;
4628 }
4629
4630 // Mark affected indexes as Stale
4631 for label in &labels {
4632 for idx in &schema.indexes {
4633 if idx.label() == label {
4634 let _ = schema_manager.update_index_metadata(idx.name(), |m| {
4635 m.status = uni_common::core::schema::IndexStatus::Stale;
4636 });
4637 }
4638 }
4639 }
4640
4641 tokio::spawn(async move {
4642 if let Err(e) = rebuild_mgr.schedule(labels).await {
4643 tracing::warn!("Failed to schedule index rebuild: {e}");
4644 }
4645 });
4646 }
4647}
4648
4649/// `FinalizeFn` implementation that the `FlushCoordinator` invokes from
4650/// its single-task finalizer loop. Unit struct on purpose: it must NOT
4651/// hold `Arc<Writer>` (that would create a reference cycle Writer ->
4652/// FlushCoordinator -> Arc<dyn FinalizeFn> -> Writer). All state needed
4653/// for finalize travels in via `SharedFlushCtx`.
4654pub(crate) struct WriterFinalizer;
4655
4656impl FinalizeFn for WriterFinalizer {
4657 fn finalize<'a>(
4658 &'a self,
4659 rotated: RotatedFlush,
4660 outcome: AsyncFlushOutcome,
4661 shared: SharedFlushCtx,
4662 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<String>> + Send + 'a>> {
4663 Box::pin(async move {
4664 // Read initial_size / initial_count from the rotated L0 so
4665 // we don't have to plumb them through the coordinator
4666 // submission. The buffer is still alive in pending_flush
4667 // until `complete_flush` (J) below pops it.
4668 let (initial_size, initial_count) = {
4669 let l0 = rotated.old_l0_arc.read();
4670 (l0.estimated_size, l0.mutation_count)
4671 };
4672 let result = Writer::flush_finalize_now(
4673 shared,
4674 rotated.old_l0_arc.clone(),
4675 rotated.wal_lsn,
4676 outcome.new_manifest,
4677 outcome.snapshot_id,
4678 initial_size,
4679 initial_count,
4680 std::time::Instant::now(),
4681 )
4682 .await;
4683 // `rotated` (permit + flush_in_progress_guard) drops here.
4684 drop(rotated.permit);
4685 result
4686 })
4687 }
4688
4689 fn finalize_failure<'a>(
4690 &'a self,
4691 rotated: RotatedFlush,
4692 err: anyhow::Error,
4693 _shared: SharedFlushCtx,
4694 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Error> + Send + 'a>> {
4695 Box::pin(async move {
4696 tracing::warn!(
4697 error = %err,
4698 seq = rotated.seq,
4699 "async flush stream failed; old L0 remains in pending_flush, \
4700 WAL retains its data, recovery via WAL replay on restart"
4701 );
4702 metrics::counter!("uni_flush_failures_total").increment(1);
4703 // Permit + guard drop here so back-pressure releases even on
4704 // failure.
4705 drop(rotated.permit);
4706 err
4707 })
4708 }
4709}
4710
4711#[cfg(test)]
4712mod tests {
4713 use super::*;
4714 use tempfile::tempdir;
4715
4716 /// Test that commit_transaction writes mutations to WAL before merging to main L0.
4717 /// This verifies fix for issue #137 (transaction commit atomicity).
4718 #[tokio::test]
4719 async fn test_commit_transaction_wal_before_merge() -> Result<()> {
4720 use crate::runtime::wal::WriteAheadLog;
4721 use crate::storage::manager::StorageManager;
4722 use object_store::local::LocalFileSystem;
4723 use object_store::path::Path as ObjectStorePath;
4724 use uni_common::core::schema::SchemaManager;
4725
4726 let dir = tempdir()?;
4727 let path = dir.path().to_str().unwrap();
4728 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
4729 let schema_path = ObjectStorePath::from("schema.json");
4730
4731 let schema_manager =
4732 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
4733 let _label_id = schema_manager.add_label("Test")?;
4734 schema_manager.save().await?;
4735
4736 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
4737
4738 // Create WAL for main L0
4739 let wal_path = ObjectStorePath::from("wal");
4740 let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
4741
4742 let writer = Writer::new_with_config(
4743 storage.clone(),
4744 schema_manager.clone(),
4745 1,
4746 UniConfig::default(),
4747 Some(wal),
4748 None,
4749 )
4750 .await?;
4751
4752 // Begin transaction — create a transaction L0
4753 let tx_l0 = writer.create_transaction_l0();
4754
4755 // Insert data in transaction
4756 let vid_a = writer.next_vid().await?;
4757 let vid_b = writer.next_vid().await?;
4758
4759 let mut props = std::collections::HashMap::new();
4760 props.insert("test".to_string(), Value::String("data".to_string()));
4761
4762 writer
4763 .insert_vertex_with_labels(vid_a, props.clone(), &["Test".to_string()], Some(&tx_l0))
4764 .await?;
4765 writer
4766 .insert_vertex_with_labels(
4767 vid_b,
4768 std::collections::HashMap::new(),
4769 &["Test".to_string()],
4770 Some(&tx_l0),
4771 )
4772 .await?;
4773
4774 let eid = writer.next_eid(1).await?;
4775 writer
4776 .insert_edge(
4777 vid_a,
4778 vid_b,
4779 1,
4780 eid,
4781 std::collections::HashMap::new(),
4782 None,
4783 Some(&tx_l0),
4784 )
4785 .await?;
4786
4787 // Get WAL before commit
4788 let l0 = writer.l0_manager.get_current();
4789 let wal = l0.read().wal.clone().expect("Main L0 should have WAL");
4790 let mutations_before = wal.replay().await?;
4791 let count_before = mutations_before.len();
4792
4793 // Commit transaction - this should write to WAL first
4794 let writer = Arc::new(writer);
4795 writer.commit_transaction_l0(tx_l0).await?;
4796
4797 // Verify WAL has the new mutations
4798 let mutations_after = wal.replay().await?;
4799 assert!(
4800 mutations_after.len() > count_before,
4801 "WAL should contain transaction mutations after commit"
4802 );
4803
4804 // Verify mutations are in correct order: vertices first, then edges
4805 let new_mutations: Vec<_> = mutations_after.into_iter().skip(count_before).collect();
4806
4807 let mut saw_vertex_a = false;
4808 let mut saw_vertex_b = false;
4809 let mut saw_edge = false;
4810
4811 for mutation in &new_mutations {
4812 match mutation {
4813 crate::runtime::wal::Mutation::InsertVertex { vid, .. } => {
4814 if *vid == vid_a {
4815 saw_vertex_a = true;
4816 }
4817 if *vid == vid_b {
4818 saw_vertex_b = true;
4819 }
4820 // Vertices should come before edges
4821 assert!(!saw_edge, "Vertices should be logged to WAL before edges");
4822 }
4823 crate::runtime::wal::Mutation::InsertEdge { eid: e, .. } => {
4824 if *e == eid {
4825 saw_edge = true;
4826 }
4827 // Edges should come after vertices
4828 assert!(
4829 saw_vertex_a && saw_vertex_b,
4830 "Edge should be logged after both vertices"
4831 );
4832 }
4833 _ => {}
4834 }
4835 }
4836
4837 assert!(saw_vertex_a, "Vertex A should be in WAL");
4838 assert!(saw_vertex_b, "Vertex B should be in WAL");
4839 assert!(saw_edge, "Edge should be in WAL");
4840
4841 // Verify data is also in main L0
4842 let l0_read = l0.read();
4843 assert!(
4844 l0_read.vertex_properties.contains_key(&vid_a),
4845 "Vertex A should be in main L0"
4846 );
4847 assert!(
4848 l0_read.vertex_properties.contains_key(&vid_b),
4849 "Vertex B should be in main L0"
4850 );
4851 assert!(
4852 l0_read.edge_endpoints.contains_key(&eid),
4853 "Edge should be in main L0"
4854 );
4855
4856 Ok(())
4857 }
4858
4859 /// Test that failed WAL flush leaves transaction intact for retry or rollback.
4860 #[tokio::test]
4861 async fn test_commit_transaction_wal_failure_rollback() -> Result<()> {
4862 use crate::runtime::wal::WriteAheadLog;
4863 use crate::storage::manager::StorageManager;
4864 use object_store::local::LocalFileSystem;
4865 use object_store::path::Path as ObjectStorePath;
4866 use uni_common::core::schema::SchemaManager;
4867
4868 let dir = tempdir()?;
4869 let path = dir.path().to_str().unwrap();
4870 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
4871 let schema_path = ObjectStorePath::from("schema.json");
4872
4873 let schema_manager =
4874 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
4875 let _label_id = schema_manager.add_label("Test")?;
4876 let _baseline_label_id = schema_manager.add_label("Baseline")?;
4877 let _txdata_label_id = schema_manager.add_label("TxData")?;
4878 schema_manager.save().await?;
4879
4880 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
4881
4882 // Create WAL for main L0
4883 let wal_path = ObjectStorePath::from("wal");
4884 let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
4885
4886 let writer = Writer::new_with_config(
4887 storage.clone(),
4888 schema_manager.clone(),
4889 1,
4890 UniConfig::default(),
4891 Some(wal),
4892 None,
4893 )
4894 .await?;
4895
4896 // Insert baseline data (outside transaction)
4897 let baseline_vid = writer.next_vid().await?;
4898 writer
4899 .insert_vertex_with_labels(
4900 baseline_vid,
4901 [("baseline".to_string(), Value::Bool(true))]
4902 .into_iter()
4903 .collect(),
4904 &["Baseline".to_string()],
4905 None,
4906 )
4907 .await?;
4908
4909 // Begin transaction — create a transaction L0
4910 let tx_l0 = writer.create_transaction_l0();
4911
4912 // Insert data in transaction
4913 let tx_vid = writer.next_vid().await?;
4914 writer
4915 .insert_vertex_with_labels(
4916 tx_vid,
4917 [("tx_data".to_string(), Value::Bool(true))]
4918 .into_iter()
4919 .collect(),
4920 &["TxData".to_string()],
4921 Some(&tx_l0),
4922 )
4923 .await?;
4924
4925 // Capture main L0 state before rollback
4926 let l0 = writer.l0_manager.get_current();
4927 let vertex_count_before = l0.read().vertex_properties.len();
4928
4929 // Rollback transaction (simulating what would happen after WAL flush failure)
4930 drop(tx_l0);
4931
4932 // Verify main L0 is unchanged
4933 let vertex_count_after = l0.read().vertex_properties.len();
4934 assert_eq!(
4935 vertex_count_before, vertex_count_after,
4936 "Main L0 should not change after rollback"
4937 );
4938
4939 // Baseline should still be present
4940 assert!(
4941 l0.read().vertex_properties.contains_key(&baseline_vid),
4942 "Baseline data should remain"
4943 );
4944
4945 // Transaction data should NOT be in main L0
4946 assert!(
4947 !l0.read().vertex_properties.contains_key(&tx_vid),
4948 "Transaction data should not be in main L0 after rollback"
4949 );
4950
4951 Ok(())
4952 }
4953
4954 /// Test that batch insert with shared labels does not clone labels per vertex.
4955 /// This verifies fix for issue #161 (redundant label cloning).
4956 #[tokio::test]
4957 async fn test_batch_insert_shared_labels() -> Result<()> {
4958 use crate::storage::manager::StorageManager;
4959 use object_store::local::LocalFileSystem;
4960 use object_store::path::Path as ObjectStorePath;
4961 use uni_common::core::schema::SchemaManager;
4962
4963 let dir = tempdir()?;
4964 let path = dir.path().to_str().unwrap();
4965 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
4966 let schema_path = ObjectStorePath::from("schema.json");
4967
4968 let schema_manager =
4969 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
4970 let _label_id = schema_manager.add_label("Person")?;
4971 schema_manager.save().await?;
4972
4973 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
4974
4975 let writer = Writer::new(storage.clone(), schema_manager.clone(), 1).await?;
4976
4977 // Shared labels - should not be cloned per vertex
4978 let labels = &["Person".to_string()];
4979
4980 // Insert batch of vertices with same labels
4981 let mut vids = Vec::new();
4982 for i in 0..100 {
4983 let vid = writer.next_vid().await?;
4984 let mut props = std::collections::HashMap::new();
4985 props.insert("id".to_string(), Value::Int(i));
4986 writer
4987 .insert_vertex_with_labels(vid, props, labels, None)
4988 .await?;
4989 vids.push(vid);
4990 }
4991
4992 // Verify all vertices have the correct labels
4993 let l0 = writer.l0_manager.get_current();
4994 for vid in vids {
4995 let l0_guard = l0.read();
4996 let vertex_labels = l0_guard.vertex_labels.get(&vid);
4997 assert!(vertex_labels.is_some(), "Vertex should have labels");
4998 assert_eq!(
4999 vertex_labels.unwrap(),
5000 &vec!["Person".to_string()],
5001 "Labels should match"
5002 );
5003 }
5004
5005 Ok(())
5006 }
5007
5008 /// Test that estimated_size tracks mutations correctly and approximates size_bytes().
5009 /// This verifies fix for issue #147 (O(V+E) size_bytes() in metrics).
5010 #[tokio::test]
5011 async fn test_estimated_size_tracks_mutations() -> Result<()> {
5012 use crate::storage::manager::StorageManager;
5013 use object_store::local::LocalFileSystem;
5014 use object_store::path::Path as ObjectStorePath;
5015 use uni_common::core::schema::SchemaManager;
5016
5017 let dir = tempdir()?;
5018 let path = dir.path().to_str().unwrap();
5019 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5020 let schema_path = ObjectStorePath::from("schema.json");
5021
5022 let schema_manager =
5023 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5024 let _label_id = schema_manager.add_label("Test")?;
5025 schema_manager.save().await?;
5026
5027 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5028
5029 let writer = Writer::new(storage.clone(), schema_manager.clone(), 1).await?;
5030
5031 let l0 = writer.l0_manager.get_current();
5032
5033 // Initial state should be empty
5034 let initial_estimated = l0.read().estimated_size;
5035 let initial_actual = l0.read().size_bytes();
5036 assert_eq!(initial_estimated, 0, "Initial estimated_size should be 0");
5037 assert_eq!(initial_actual, 0, "Initial size_bytes should be 0");
5038
5039 // Insert vertices with properties
5040 let mut vids = Vec::new();
5041 for i in 0..10 {
5042 let vid = writer.next_vid().await?;
5043 let mut props = std::collections::HashMap::new();
5044 props.insert("name".to_string(), Value::String(format!("vertex_{}", i)));
5045 props.insert("index".to_string(), Value::Int(i));
5046 writer
5047 .insert_vertex_with_labels(vid, props, &[], None)
5048 .await?;
5049 vids.push(vid);
5050 }
5051
5052 // Verify estimated_size grew
5053 let after_vertices_estimated = l0.read().estimated_size;
5054 let after_vertices_actual = l0.read().size_bytes();
5055 assert!(
5056 after_vertices_estimated > 0,
5057 "estimated_size should grow after insertions"
5058 );
5059
5060 // Verify estimated_size is within reasonable bounds of actual size (within 2x)
5061 let ratio = after_vertices_estimated as f64 / after_vertices_actual as f64;
5062 assert!(
5063 (0.5..=2.0).contains(&ratio),
5064 "estimated_size ({}) should be within 2x of size_bytes ({}), ratio: {}",
5065 after_vertices_estimated,
5066 after_vertices_actual,
5067 ratio
5068 );
5069
5070 // Insert edges with a simple edge type
5071 let edge_type = 1u32;
5072 for i in 0..9 {
5073 let eid = writer.next_eid(edge_type).await?;
5074 writer
5075 .insert_edge(
5076 vids[i],
5077 vids[i + 1],
5078 edge_type,
5079 eid,
5080 std::collections::HashMap::new(),
5081 Some("NEXT".to_string()),
5082 None,
5083 )
5084 .await?;
5085 }
5086
5087 // Verify estimated_size grew further
5088 let after_edges_estimated = l0.read().estimated_size;
5089 let after_edges_actual = l0.read().size_bytes();
5090 assert!(
5091 after_edges_estimated > after_vertices_estimated,
5092 "estimated_size should grow after edge insertions"
5093 );
5094
5095 // Verify still within reasonable bounds
5096 let ratio = after_edges_estimated as f64 / after_edges_actual as f64;
5097 assert!(
5098 (0.5..=2.0).contains(&ratio),
5099 "estimated_size ({}) should be within 2x of size_bytes ({}), ratio: {}",
5100 after_edges_estimated,
5101 after_edges_actual,
5102 ratio
5103 );
5104
5105 Ok(())
5106 }
5107
5108 /// Test that flushing WAL on a writer with no mutations succeeds cleanly.
5109 #[tokio::test]
5110 async fn test_flush_wal_empty_l0_is_noop() -> Result<()> {
5111 use crate::runtime::wal::WriteAheadLog;
5112 use crate::storage::manager::StorageManager;
5113 use object_store::local::LocalFileSystem;
5114 use object_store::path::Path as ObjectStorePath;
5115 use uni_common::core::schema::SchemaManager;
5116
5117 let dir = tempdir()?;
5118 let path = dir.path().to_str().unwrap();
5119 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5120 let schema_path = ObjectStorePath::from("schema.json");
5121
5122 let schema_manager =
5123 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5124 schema_manager.save().await?;
5125
5126 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5127
5128 let wal_path = ObjectStorePath::from("wal");
5129 let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
5130
5131 let writer = Writer::new_with_config(
5132 storage.clone(),
5133 schema_manager.clone(),
5134 1,
5135 UniConfig::default(),
5136 Some(wal.clone()),
5137 None,
5138 )
5139 .await?;
5140
5141 // Flush with no mutations — should succeed cleanly
5142 let lsn = writer.flush_wal().await?;
5143 // LSN should be 0 or 1 (no real mutations flushed)
5144 assert!(lsn <= 1, "Empty flush should produce low LSN, got {}", lsn);
5145
5146 Ok(())
5147 }
5148
5149 /// Test that transaction data does not leak into main L0 without commit.
5150 #[tokio::test]
5151 async fn test_transaction_isolation_without_commit() -> Result<()> {
5152 use crate::runtime::wal::WriteAheadLog;
5153 use crate::storage::manager::StorageManager;
5154 use object_store::local::LocalFileSystem;
5155 use object_store::path::Path as ObjectStorePath;
5156 use uni_common::core::schema::SchemaManager;
5157
5158 let dir = tempdir()?;
5159 let path = dir.path().to_str().unwrap();
5160 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5161 let schema_path = ObjectStorePath::from("schema.json");
5162
5163 let schema_manager =
5164 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5165 let _label_id = schema_manager.add_label("Person")?;
5166 schema_manager.save().await?;
5167
5168 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5169
5170 let wal_path = ObjectStorePath::from("wal");
5171 let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
5172
5173 let writer = Writer::new_with_config(
5174 storage.clone(),
5175 schema_manager.clone(),
5176 1,
5177 UniConfig::default(),
5178 Some(wal),
5179 None,
5180 )
5181 .await?;
5182
5183 // Create transaction L0
5184 let tx_l0 = writer.create_transaction_l0();
5185
5186 // Insert vertex into transaction L0
5187 let vid = writer.next_vid().await?;
5188 writer
5189 .insert_vertex_with_labels(
5190 vid,
5191 [("name".to_string(), Value::String("Ghost".to_string()))]
5192 .into_iter()
5193 .collect(),
5194 &["Person".to_string()],
5195 Some(&tx_l0),
5196 )
5197 .await?;
5198
5199 // Verify data is in transaction L0
5200 assert!(
5201 tx_l0.read().vertex_properties.contains_key(&vid),
5202 "Transaction L0 should contain the vertex"
5203 );
5204
5205 // Verify data is NOT in main L0
5206 let main_l0 = writer.l0_manager.get_current();
5207 assert!(
5208 !main_l0.read().vertex_properties.contains_key(&vid),
5209 "Main L0 should NOT contain uncommitted transaction data"
5210 );
5211
5212 // Drop transaction without committing — data should be lost
5213 drop(tx_l0);
5214
5215 // Main L0 still should not have it
5216 assert!(
5217 !main_l0.read().vertex_properties.contains_key(&vid),
5218 "Main L0 should remain clean after dropped transaction"
5219 );
5220
5221 Ok(())
5222 }
5223
5224 /// Phase 2 Day 12: the fork-fragment warn fires exactly once when
5225 /// the flush count crosses the configured threshold and stays
5226 /// silent on subsequent flushes for the lifetime of the writer.
5227 /// Primary writers (`fork_id == None`) never fire it.
5228 ///
5229 /// Tested directly against `tick_fork_fragment_observability` so
5230 /// the contract is locked in independently of the broader
5231 /// `flush_to_l1` path (the end-to-end fork-flush path is blocked
5232 /// on Day 10's on-the-fly schema overlay growth).
5233 #[tokio::test]
5234 async fn fork_fragment_warn_fires_once_then_silences() -> Result<()> {
5235 use crate::storage::manager::StorageManager;
5236 use object_store::local::LocalFileSystem;
5237 use object_store::path::Path as ObjectStorePath;
5238 use uni_common::core::fork::ForkId;
5239 use uni_common::core::schema::SchemaManager;
5240
5241 let dir = tempdir()?;
5242 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5243 let schema_path = ObjectStorePath::from("schema.json");
5244 let schema_manager =
5245 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5246 let storage = Arc::new(
5247 StorageManager::new(dir.path().to_str().unwrap(), schema_manager.clone()).await?,
5248 );
5249
5250 let config = UniConfig {
5251 fork_fragment_warn_threshold: 3,
5252 ..Default::default()
5253 };
5254 let mut writer =
5255 Writer::new_with_config(storage, schema_manager, 1, config, None, None).await?;
5256
5257 // Primary path: never fires.
5258 for _ in 0..10 {
5259 writer.tick_fork_fragment_observability();
5260 }
5261 assert!(!writer.fork_fragment_warn_fired.load(Ordering::Relaxed));
5262 assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 0);
5263
5264 // Fork path: tag and tick. Below threshold → no fire.
5265 writer.fork_id = Some(ForkId::new());
5266 writer.tick_fork_fragment_observability();
5267 writer.tick_fork_fragment_observability();
5268 assert!(!writer.fork_fragment_warn_fired.load(Ordering::Relaxed));
5269 assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 2);
5270
5271 // Crossing threshold → fires once.
5272 writer.tick_fork_fragment_observability();
5273 assert!(writer.fork_fragment_warn_fired.load(Ordering::Relaxed));
5274 assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 3);
5275
5276 // Subsequent ticks bump the gauge but do not re-fire.
5277 let fired_after = writer.fork_fragment_warn_fired.load(Ordering::Relaxed);
5278 for _ in 0..5 {
5279 writer.tick_fork_fragment_observability();
5280 }
5281 assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 8);
5282 assert_eq!(
5283 writer.fork_fragment_warn_fired.load(Ordering::Relaxed),
5284 fired_after
5285 );
5286
5287 Ok(())
5288 }
5289
5290 /// The hot-path mutators must not write to any `Writer` struct field.
5291 /// Phase 2 of the refactor
5292 /// gave them `&self` receivers, which the compiler enforces against
5293 /// direct `self.x = y` assignment — but interior-mutable writes
5294 /// (Mutex/Atomic/OnceLock) still compile. This regression test snapshots
5295 /// every potentially-writable field, calls each hot-path mutator, and
5296 /// asserts no field changed.
5297 ///
5298 /// Cold-path methods (`flush_to_l1`, `commit_transaction_l0`,
5299 /// `tick_fork_fragment_observability`) DO mutate fields by design and
5300 /// are intentionally out of scope here.
5301 #[tokio::test]
5302 async fn hot_path_mutators_do_not_change_writer_fields() -> Result<()> {
5303 use crate::storage::manager::StorageManager;
5304 use object_store::local::LocalFileSystem;
5305 use object_store::path::Path as ObjectStorePath;
5306 use uni_common::core::schema::SchemaManager;
5307
5308 let dir = tempdir()?;
5309 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5310 let schema_path = ObjectStorePath::from("schema.json");
5311 let schema_manager =
5312 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5313 schema_manager.add_label("Person")?;
5314 schema_manager.save().await?;
5315 let storage = Arc::new(
5316 StorageManager::new(dir.path().to_str().unwrap(), schema_manager.clone()).await?,
5317 );
5318
5319 let writer =
5320 Writer::new_with_config(storage, schema_manager, 1, UniConfig::default(), None, None)
5321 .await?;
5322
5323 /// Captures every `Writer` field that *could* be written by a
5324 /// hot-path mutator (i.e., every non-Arc, non-immutable-after-
5325 /// construction field). Arc'd substructures (`l0_manager`,
5326 /// `storage`, etc.) are intentionally not checked — they are
5327 /// re-pointed only at construction.
5328 #[derive(Debug, PartialEq)]
5329 struct Snapshot {
5330 last_flush_time: std::time::Instant,
5331 cached_manifest_some: bool,
5332 fork_flush_count: u64,
5333 fork_fragment_warn_fired: bool,
5334 xervo_runtime_some: bool,
5335 index_rebuild_manager_some: bool,
5336 fork_id: Option<ForkId>,
5337 }
5338
5339 fn snap(w: &Writer) -> Snapshot {
5340 Snapshot {
5341 last_flush_time: *w.last_flush_time.lock(),
5342 cached_manifest_some: w.cached_manifest.lock().is_some(),
5343 fork_flush_count: w.fork_flush_count.load(Ordering::Relaxed),
5344 fork_fragment_warn_fired: w.fork_fragment_warn_fired.load(Ordering::Relaxed),
5345 xervo_runtime_some: w.xervo_runtime.get().is_some(),
5346 index_rebuild_manager_some: w.index_rebuild_manager.get().is_some(),
5347 fork_id: w.fork_id,
5348 }
5349 }
5350
5351 // 1. insert_vertex_with_labels
5352 let before = snap(&writer);
5353 let vid = writer.next_vid().await?;
5354 writer
5355 .insert_vertex_with_labels(vid, Properties::new(), &["Person".to_string()], None)
5356 .await?;
5357 assert_eq!(
5358 snap(&writer),
5359 before,
5360 "insert_vertex_with_labels mutated a Writer field"
5361 );
5362
5363 // 2. insert_vertices_batch
5364 let before = snap(&writer);
5365 let vids = writer.allocate_vids(2).await?;
5366 writer
5367 .insert_vertices_batch(
5368 vids,
5369 vec![Properties::new(), Properties::new()],
5370 vec!["Person".into()],
5371 None,
5372 )
5373 .await?;
5374 assert_eq!(
5375 snap(&writer),
5376 before,
5377 "insert_vertices_batch mutated a Writer field"
5378 );
5379
5380 // 3. delete_vertex
5381 let before = snap(&writer);
5382 writer.delete_vertex(vid, None, None).await?;
5383 assert_eq!(
5384 snap(&writer),
5385 before,
5386 "delete_vertex mutated a Writer field"
5387 );
5388
5389 // (insert_edge / delete_edge are skipped here: their fixture cost is
5390 // disproportionate to the audit's marginal value, and the same
5391 // structural argument plus the compiler-enforced `&self` covers them.)
5392
5393 Ok(())
5394 }
5395}