uni_store/runtime/writer.rs
1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::runtime::context::QueryContext;
5use crate::runtime::flush_coordinator::{
6 FinalizeFn, FlushCoordinator, FlushOutcome as AsyncFlushOutcome, RotatedFlush, SharedFlushCtx,
7};
8use crate::runtime::id_allocator::IdAllocator;
9use crate::runtime::l0::{L0Buffer, serialize_constraint_key};
10use crate::runtime::l0_manager::L0Manager;
11use crate::runtime::property_manager::PropertyManager;
12use crate::runtime::wal::WriteAheadLog;
13use crate::storage::adjacency_manager::AdjacencyManager;
14use crate::storage::delta::{L1Entry, Op};
15use crate::storage::main_edge::MainEdgeDataset;
16use crate::storage::main_vertex::MainVertexDataset;
17use crate::storage::manager::StorageManager;
18use anyhow::{Result, anyhow};
19use chrono::Utc;
20use metrics;
21use parking_lot::{Mutex as PlMutex, RwLock};
22use std::collections::{HashMap, HashSet};
23use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
24use std::sync::{Arc, OnceLock};
25use tracing::{debug, info, instrument};
26use uni_common::Properties;
27use uni_common::Value;
28use uni_common::config::UniConfig;
29use uni_common::core::fork::ForkId;
30use uni_common::core::id::{Eid, Vid};
31use uni_common::core::schema::{ConstraintTarget, ConstraintType, IndexDefinition};
32use uni_common::core::snapshot::{EdgeSnapshot, LabelSnapshot, SnapshotManifest};
33use uni_xervo::runtime::ModelRuntime;
34use uuid::Uuid;
35
36#[derive(Clone, Debug)]
37pub struct WriterConfig {
38 pub max_mutations: usize,
39 /// Enable the partial-column MergeInsert path for SET-only flushes.
40 ///
41 /// When `true`, `Writer::insert_vertex_partial` records the touched
42 /// property keys into `L0Buffer::vertex_partial_keys` and the flush
43 /// routes those VIDs through Lance `MergeInsertBuilder` with a
44 /// subset-of-schema source, skipping the read of (and write of)
45 /// the unchanged columns — including wide ones like embeddings.
46 ///
47 /// When `false`, `insert_vertex_partial` falls back to the
48 /// read-modify-write `insert_vertex_with_labels` path (preserving
49 /// bit-for-bit equivalence with prior releases). Default `false`
50 /// for the first release; flip to `true` after telemetry on the
51 /// issue #72 ingest workload confirms the win.
52 ///
53 /// See the soundness probe at
54 /// `crates/uni-store/tests/common/storage/lance_merge_insert_probe.rs`.
55 pub partial_lance_writes: bool,
56}
57
58impl Default for WriterConfig {
59 fn default() -> Self {
60 Self {
61 max_mutations: 10_000,
62 partial_lance_writes: false,
63 }
64 }
65}
66
67/// RAII latch on [`StorageManager::flush_in_progress`].
68///
69/// Sets the flag to `true` on construction (via CAS) and back to `false` on
70/// drop, so any `?` early-exit inside `flush_to_l1` cannot leave the flag
71/// stuck. Returns `None` if a flush is already in progress, providing
72/// forward-compatible exclusion once the outer writer-RwLock is removed in
73/// Phase 4 of the concurrent-writer refactor.
74// FlushInProgressGuard moved to storage/manager.rs so flush_coordinator.rs
75// can hold it on RotatedFlush without a writer.rs back-import cycle.
76pub use crate::storage::manager::FlushInProgressGuard;
77
78/// Output of [`Writer::flush_l0_rotate`]: the to-be-flushed L0 buffer,
79/// captured WAL LSN, current_version, and the in-progress guard whose
80/// lifetime spans the full flush (including the future async stream
81/// phase that runs on a spawned task).
82struct RotateOutput {
83 old_l0_arc: Arc<RwLock<L0Buffer>>,
84 wal_lsn: u64,
85 current_version: u64,
86 flush_in_progress_guard: FlushInProgressGuard,
87}
88
89/// Project a property map to a subset selected by `keys`. Used to
90/// run `touched_needs_full_read` against just the SET-touched keys
91/// when the caller passes a fully-merged `props` map.
92fn props_subset(props: &Properties, keys: &HashSet<String>) -> Properties {
93 let mut out = Properties::new();
94 for k in keys {
95 if let Some(v) = props.get(k) {
96 out.insert(k.clone(), v.clone());
97 }
98 }
99 out
100}
101
102/// Output of [`Writer::flush_stream_l1`]: the built (but not yet
103/// published) snapshot manifest and its id. Finalize is responsible
104/// for `save_snapshot` + `set_latest_snapshot` + `cached_manifest`
105/// update.
106struct FlushOutcome {
107 manifest: SnapshotManifest,
108 snapshot_id: String,
109}
110
111pub struct Writer {
112 pub l0_manager: Arc<L0Manager>,
113 pub storage: Arc<StorageManager>,
114 pub schema_manager: Arc<uni_common::core::schema::SchemaManager>,
115 pub allocator: Arc<IdAllocator>,
116 pub config: UniConfig,
117 /// Optional embedding runtime. `OnceLock` so the initializer can run
118 /// on `&self` after the `Writer` has been wrapped in `Arc<Writer>`
119 /// (Phase 4 of concurrent_writer.md). Read through
120 /// [`Writer::xervo_runtime`] — the field itself is private to keep
121 /// callers oblivious to the OnceLock representation.
122 xervo_runtime: OnceLock<Arc<ModelRuntime>>,
123 /// Property manager for cache invalidation after flush
124 pub property_manager: Option<Arc<PropertyManager>>,
125 /// Adjacency manager for dual-write (edges survive flush).
126 adjacency_manager: Arc<AdjacencyManager>,
127 /// Timestamp of last flush or creation. Interior-mutable so that
128 /// `&self` callers can update it; uncontended in practice because all
129 /// writes happen inside the single-flusher critical section.
130 /// Arc-wrapped so it can travel into the SharedFlushCtx that the
131 /// async-flush coordinator passes to spawned stream/finalize tasks.
132 last_flush_time: Arc<PlMutex<std::time::Instant>>,
133 /// Background compaction task handle (prevents concurrent compaction races)
134 compaction_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
135 /// Optional index rebuild manager for post-flush automatic rebuild scheduling.
136 /// `OnceLock` for the same reason as `xervo_runtime`.
137 /// Wrapped in `Arc` so the async-flush finalize path can read it
138 /// from a spawned task via `SharedFlushCtx`.
139 index_rebuild_manager: Arc<OnceLock<Arc<crate::storage::index_rebuild::IndexRebuildManager>>>,
140 /// Cached snapshot manifest from the last flush. Avoids re-reading from
141 /// object store on every flush_to_l1 call. Wrapped in a `Mutex` for
142 /// `&self` access; uncontended because all access is inside the
143 /// single-flusher critical section.
144 cached_manifest: Arc<PlMutex<Option<SnapshotManifest>>>,
145 /// Identifier of the fork this writer serves, if any. `None` for
146 /// primary's writer. Set by [`crate::fork::writer_factory::new_for_fork`]
147 /// and read in `flush_to_l1` to emit fork-tagged metrics and to fire
148 /// the fragment-count guard rail (Phase 2 Day 12).
149 pub fork_id: Option<ForkId>,
150 /// Number of `flush_to_l1` calls since this writer was constructed.
151 /// Used as a proxy for L1 fragment growth on the fork's branches:
152 /// each flush typically appends ~1 fragment per touched dataset, so
153 /// the count tracks the order of magnitude of fragment accumulation.
154 /// Reading the actual `Dataset::manifest().fragments.len()` per
155 /// flush would add a per-dataset object-store roundtrip on the hot
156 /// commit path; the proxy keeps the guard rail purely observational
157 /// (Phase 5 introduces fork compaction proper). Only meaningful when
158 /// `fork_id.is_some()`. `Relaxed` is sufficient — observational only.
159 fork_flush_count: Arc<AtomicU64>,
160 /// Whether the fork-fragment warning has already fired at the
161 /// configured threshold. One-shot per writer lifetime. `Relaxed` is
162 /// sufficient — observational only.
163 fork_fragment_warn_fired: Arc<AtomicBool>,
164 /// Dedicated lock for the genuinely-exclusive flush path. Acquired by
165 /// the [`Writer::flush_to_l1`] entry and by `commit_transaction_l0`
166 /// across its WAL-append + L0-merge window. Replaces the outer
167 /// `Arc<RwLock<Writer>>` for flush exclusion once Phase 4 drops it.
168 /// Arc-wrapped so async-flush coordinator's finalize path can
169 /// re-acquire it from a spawned task via SharedFlushCtx.
170 flush_lock: Arc<tokio::sync::Mutex<()>>,
171 /// Coordinator for async-flush pipeline. Owns the back-pressure
172 /// semaphore, rotate-order sequence, single-finalizer task, and
173 /// pending-flush counter. Always present even when async flush is
174 /// disabled — the sync `flush_to_l1` path uses it for the future
175 /// `FlushInProgressGuard`/permit ownership model.
176 /// Coordinator is `None` when `async_flush_enabled = false`. The
177 /// coordinator's finalizer task captures `SharedFlushCtx` which
178 /// includes `Arc<StorageManager>`; on a fork-scoped Writer that
179 /// also pins the fork's `ForkScope` via `storage.fork_scope`, so
180 /// the holder count never drops. Constructing it only when the
181 /// feature is actually on avoids that side-effect for all
182 /// existing sync-flush paths. When async-flush graduates from
183 /// opt-in to default (Commit 12), `drop_fork` (Commit 8) handles
184 /// the drain explicitly.
185 #[allow(dead_code)] // first production use lands in Commit 6/7
186 pub(crate) flush_coordinator: Option<Arc<crate::runtime::flush_coordinator::FlushCoordinator>>,
187 /// Optimistic-concurrency commit-sequence counter (SSI). Incremented once
188 /// per successful commit under `flush_lock`; a transaction captures the
189 /// current value at begin as its read sequence (`L0Buffer::occ_read_seq`).
190 ///
191 /// Always allocated; consulted only when `config.ssi_enabled` is `true`.
192 commit_sequence: Arc<AtomicU64>,
193 /// Bounded log of recently-committed write-sets for OCC conflict detection.
194 /// Read and updated only under `flush_lock`.
195 ///
196 /// Always allocated; consulted only when `config.ssi_enabled` is `true`.
197 committed_writes: Arc<PlMutex<crate::runtime::occ::CommitRegistry>>,
198 /// Per-row pessimistic locks for `FOR UPDATE` (SSI escape hatch), keyed by
199 /// canonical (label, key-props) bytes. A transaction holds the lock from
200 /// MATCH until commit/rollback, serializing concurrent `FOR UPDATE` writers
201 /// on the same key (avoiding optimistic abort-retry on hot keys).
202 ///
203 /// Always allocated; populated only when `config.ssi_enabled` is `true`.
204 for_update_locks: Arc<dashmap::DashMap<Vec<u8>, Arc<tokio::sync::Mutex<()>>>>,
205}
206
207/// Number of recent commits retained for OCC conflict detection. Large enough
208/// that under-run — and the resulting conservative abort — is rare in practice;
209/// each entry is a small set of touched ids.
210const OCC_REGISTRY_CAPACITY: usize = 4096;
211
212impl Writer {
213 pub async fn new(
214 storage: Arc<StorageManager>,
215 schema_manager: Arc<uni_common::core::schema::SchemaManager>,
216 start_version: u64,
217 ) -> Result<Self> {
218 Self::new_with_config(
219 storage,
220 schema_manager,
221 start_version,
222 UniConfig::default(),
223 None,
224 None,
225 )
226 .await
227 }
228
229 pub async fn new_with_config(
230 storage: Arc<StorageManager>,
231 schema_manager: Arc<uni_common::core::schema::SchemaManager>,
232 start_version: u64,
233 config: UniConfig,
234 wal: Option<Arc<WriteAheadLog>>,
235 allocator: Option<Arc<IdAllocator>>,
236 ) -> Result<Self> {
237 let allocator = if let Some(a) = allocator {
238 a
239 } else {
240 let store = storage.store();
241 let path = object_store::path::Path::from("id_allocator.json");
242 Arc::new(IdAllocator::new(store, path, 1000).await?)
243 };
244
245 let l0_manager = Arc::new(L0Manager::new(start_version, wal));
246
247 let property_manager = Some(Arc::new(PropertyManager::new(
248 storage.clone(),
249 schema_manager.clone(),
250 1000,
251 )));
252
253 let adjacency_manager = storage.adjacency_manager();
254
255 // Hoist the Arc'd fields so we can both stash them on Writer and
256 // hand the same Arcs to the SharedFlushCtx that FlushCoordinator
257 // captures. Single-source-of-truth for each piece of mutable
258 // shared state.
259 let last_flush_time = Arc::new(PlMutex::new(std::time::Instant::now()));
260 let cached_manifest = Arc::new(PlMutex::new(None));
261 let fork_flush_count = Arc::new(AtomicU64::new(0));
262 let fork_fragment_warn_fired = Arc::new(AtomicBool::new(false));
263 let flush_lock = Arc::new(tokio::sync::Mutex::new(()));
264 let compaction_handle = Arc::new(RwLock::new(None));
265 let index_rebuild_manager: Arc<
266 OnceLock<Arc<crate::storage::index_rebuild::IndexRebuildManager>>,
267 > = Arc::new(OnceLock::new());
268
269 let flush_coordinator = if config.async_flush_enabled {
270 let shared = SharedFlushCtx {
271 storage: storage.clone(),
272 l0_manager: l0_manager.clone(),
273 adjacency_manager: adjacency_manager.clone(),
274 property_manager: property_manager.clone(),
275 schema_manager: schema_manager.clone(),
276 cached_manifest: cached_manifest.clone(),
277 last_flush_time: last_flush_time.clone(),
278 fork_id: None,
279 fork_flush_count: fork_flush_count.clone(),
280 fork_fragment_warn_fired: fork_fragment_warn_fired.clone(),
281 fork_fragment_warn_threshold: config.fork_fragment_warn_threshold,
282 flush_lock: flush_lock.clone(),
283 index_rebuild_manager: index_rebuild_manager.clone(),
284 compaction_handle: compaction_handle.clone(),
285 compaction_config: config.compaction.clone(),
286 index_rebuild_config: config.index_rebuild.clone(),
287 auto_rebuild_enabled: config.index_rebuild.auto_rebuild_enabled,
288 };
289 let finalize_fn: Arc<dyn FinalizeFn> = Arc::new(WriterFinalizer);
290 Some(Arc::new(FlushCoordinator::new(
291 config.max_pending_flushes,
292 shared,
293 finalize_fn,
294 )))
295 } else {
296 None
297 };
298
299 let commit_sequence = Arc::new(AtomicU64::new(0));
300 let committed_writes = Arc::new(PlMutex::new(crate::runtime::occ::CommitRegistry::new(
301 OCC_REGISTRY_CAPACITY,
302 )));
303 let for_update_locks = Arc::new(dashmap::DashMap::new());
304
305 Ok(Self {
306 l0_manager,
307 storage,
308 schema_manager,
309 allocator,
310 config,
311 xervo_runtime: OnceLock::new(),
312 property_manager,
313 adjacency_manager,
314 last_flush_time,
315 compaction_handle,
316 index_rebuild_manager,
317 cached_manifest,
318 fork_id: None,
319 fork_flush_count,
320 fork_fragment_warn_fired,
321 flush_lock,
322 flush_coordinator,
323 commit_sequence,
324 committed_writes,
325 for_update_locks,
326 })
327 }
328
329 /// Returns the shared pessimistic lock handle for a `FOR UPDATE` row key,
330 /// creating it on first use. The caller `.lock_owned().await`s the returned
331 /// mutex and holds the guard for the transaction's lifetime.
332 pub fn row_lock_handle(&self, key: &[u8]) -> Arc<tokio::sync::Mutex<()>> {
333 self.for_update_locks
334 .entry(key.to_vec())
335 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
336 .clone()
337 }
338
339 /// Prunes `FOR UPDATE` lock-map entries for `keys` that no live transaction
340 /// holds anymore, so the map does not grow without bound across the keyspace.
341 ///
342 /// Called when a transaction ends, **after** its guards have been dropped.
343 /// `remove_if` evaluates its predicate under the DashMap shard lock, which is
344 /// the same lock `row_lock_handle` takes to clone an entry — so the check
345 /// `strong_count == 1` (only the map holds the `Arc`) is race-free: a
346 /// concurrent acquirer either already cloned the `Arc` (count ≥ 2 → we skip
347 /// removal) or has not yet taken the shard lock (it will mint a fresh entry
348 /// after we remove). Either way no two transactions ever lock different
349 /// `Mutex` instances for the same key.
350 pub fn release_for_update_locks(&self, keys: &[Vec<u8>]) {
351 for key in keys {
352 self.for_update_locks
353 .remove_if(key, |_, handle| Arc::strong_count(handle) == 1);
354 }
355 }
356
357 /// Number of live entries in the `FOR UPDATE` lock map. Introspection for
358 /// tests that the map does not leak entries across transactions (G5).
359 pub fn for_update_lock_count(&self) -> usize {
360 self.for_update_locks.len()
361 }
362
363 /// The current OCC commit sequence. A `FOR UPDATE` acquisition re-stamps a
364 /// fresh transaction's `occ_read_seq` to this so its conflict-detection
365 /// baseline advances to lock-acquisition time (read-latest under the lock).
366 pub fn current_commit_sequence(&self) -> u64 {
367 self.commit_sequence.load(Ordering::Relaxed)
368 }
369
370 /// Build a fresh `SharedFlushCtx` from this Writer's current state.
371 /// Used by the async-flush stream/finalize paths to pass into spawned
372 /// tasks without smuggling `Arc<Writer>` (which would create a cycle
373 /// with `flush_coordinator -> FinalizeFn -> Writer`).
374 pub(crate) fn shared_ctx(&self) -> SharedFlushCtx {
375 SharedFlushCtx {
376 storage: self.storage.clone(),
377 l0_manager: self.l0_manager.clone(),
378 adjacency_manager: self.adjacency_manager.clone(),
379 property_manager: self.property_manager.clone(),
380 schema_manager: self.schema_manager.clone(),
381 cached_manifest: self.cached_manifest.clone(),
382 last_flush_time: self.last_flush_time.clone(),
383 fork_id: self.fork_id,
384 fork_flush_count: self.fork_flush_count.clone(),
385 fork_fragment_warn_fired: self.fork_fragment_warn_fired.clone(),
386 fork_fragment_warn_threshold: self.config.fork_fragment_warn_threshold,
387 flush_lock: self.flush_lock.clone(),
388 index_rebuild_manager: self.index_rebuild_manager.clone(),
389 compaction_handle: self.compaction_handle.clone(),
390 compaction_config: self.config.compaction.clone(),
391 index_rebuild_config: self.config.index_rebuild.clone(),
392 auto_rebuild_enabled: self.config.index_rebuild.auto_rebuild_enabled,
393 }
394 }
395
396 /// Borrow the flush coordinator if async flush is enabled.
397 /// Returns `None` when `config.async_flush_enabled = false`.
398 /// External callers (`drop_fork`) use this to drain pending streams.
399 pub fn flush_coordinator(
400 &self,
401 ) -> Option<&Arc<crate::runtime::flush_coordinator::FlushCoordinator>> {
402 self.flush_coordinator.as_ref()
403 }
404
405 /// Set the index rebuild manager for post-flush automatic rebuild scheduling.
406 ///
407 /// One-shot: returns `Err` if already set. The receiver is `&self` so this
408 /// can be called after the `Writer` has been wrapped in `Arc<Writer>`.
409 pub fn set_index_rebuild_manager(
410 &self,
411 manager: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
412 ) -> Result<()> {
413 self.index_rebuild_manager
414 .set(manager)
415 .map_err(|_| anyhow!("index_rebuild_manager already set"))
416 }
417
418 /// Replay WAL mutations into the current L0 buffer.
419 pub async fn replay_wal(&self, wal_high_water_mark: u64) -> Result<usize> {
420 let l0 = self.l0_manager.get_current();
421 let wal = l0.read().wal.clone();
422
423 if let Some(wal) = wal {
424 wal.initialize().await?;
425 let mutations = wal.replay_since(wal_high_water_mark).await?;
426 let count = mutations.len();
427
428 if count > 0 {
429 log::info!(
430 "Replaying {} mutations from WAL (LSN > {})",
431 count,
432 wal_high_water_mark
433 );
434 let mut l0_guard = l0.write();
435 l0_guard.replay_mutations(mutations)?;
436 // Rebuild the UNIQUE constraint index over the recovered rows
437 // (Bug #9 Mechanism B). `replay_mutations` restores
438 // vertices/properties/labels but never repopulates
439 // `constraint_index` (its only other caller is the live insert
440 // path). Without this, a unique key that lives only in the WAL
441 // (committed but not yet flushed to Lance) is invisible to
442 // `check_unique_constraint_multi` after recovery and a
443 // duplicate of it could be created.
444 self.rebuild_constraint_index(&mut l0_guard);
445 }
446
447 Ok(count)
448 } else {
449 Ok(0)
450 }
451 }
452
453 /// Rebuild the UNIQUE constraint index on a recovered L0 buffer.
454 ///
455 /// Scans every recovered vertex's properties and, for each enabled UNIQUE
456 /// constraint whose target label the vertex carries and whose member
457 /// properties are all present, inserts the same constraint key the live
458 /// insert path builds (`serialize_constraint_key`). Tombstoned vertices are
459 /// skipped. Called after [`L0Buffer::replay_mutations`] under the buffer's
460 /// write lock; the schema is already loaded on the `Writer`.
461 fn rebuild_constraint_index(&self, l0_guard: &mut L0Buffer) {
462 let schema = self.schema_manager.schema();
463 // Collect entries first to avoid borrowing `vertex_properties`
464 // immutably while mutating `constraint_index` through the same guard.
465 let mut keys: Vec<(Vec<u8>, Vid)> = Vec::new();
466 for (&vid, props) in &l0_guard.vertex_properties {
467 if l0_guard.vertex_tombstones.contains(&vid) {
468 continue;
469 }
470 let Some(labels) = l0_guard.vertex_labels.get(&vid) else {
471 continue;
472 };
473 for label in labels {
474 for constraint in &schema.constraints {
475 if !constraint.enabled {
476 continue;
477 }
478 let ConstraintTarget::Label(l) = &constraint.target else {
479 continue;
480 };
481 if l != label {
482 continue;
483 }
484 let ConstraintType::Unique {
485 properties: unique_props,
486 } = &constraint.constraint_type
487 else {
488 continue;
489 };
490 let mut key_values = Vec::new();
491 let mut all_present = true;
492 for prop in unique_props {
493 if let Some(val) = props.get(prop) {
494 key_values.push((prop.clone(), val.clone()));
495 } else {
496 all_present = false;
497 break;
498 }
499 }
500 if all_present {
501 keys.push((serialize_constraint_key(label, &key_values), vid));
502 }
503 }
504 }
505 }
506 for (key, vid) in keys {
507 l0_guard.insert_constraint_key(key, vid);
508 }
509 }
510
511 /// Allocates the next VID (pure auto-increment).
512 pub async fn next_vid(&self) -> Result<Vid> {
513 self.allocator.allocate_vid().await
514 }
515
516 /// Allocates multiple VIDs at once for bulk operations.
517 /// This is more efficient than calling next_vid() in a loop.
518 pub async fn allocate_vids(&self, count: usize) -> Result<Vec<Vid>> {
519 self.allocator.allocate_vids(count).await
520 }
521
522 /// Allocates the next EID (pure auto-increment).
523 pub async fn next_eid(&self, _type_id: u32) -> Result<Eid> {
524 self.allocator.allocate_eid().await
525 }
526
527 /// Allocates multiple EIDs at once for bulk operations.
528 /// This is more efficient than calling next_eid() in a loop.
529 pub async fn allocate_eids(&self, count: usize) -> Result<Vec<Eid>> {
530 self.allocator.allocate_eids(count).await
531 }
532
533 /// Install the embedding runtime exactly once. Receiver is `&self` so it
534 /// can be called after the `Writer` has been wrapped in `Arc<Writer>`.
535 pub fn set_xervo_runtime(&self, runtime: Arc<ModelRuntime>) -> Result<()> {
536 self.xervo_runtime
537 .set(runtime)
538 .map_err(|_| anyhow!("xervo_runtime already set"))
539 }
540
541 pub fn xervo_runtime(&self) -> Option<Arc<ModelRuntime>> {
542 self.xervo_runtime.get().cloned()
543 }
544
545 /// Create a new empty L0 buffer for transaction-scoped mutations.
546 ///
547 /// Only reads the current version — no exclusive lock required on Writer.
548 /// The returned buffer has no WAL reference; mutations are logged at
549 /// commit time via [`Self::commit_transaction_l0`].
550 pub fn create_transaction_l0(&self) -> Arc<RwLock<L0Buffer>> {
551 let current_version = self.l0_manager.get_current().read().current_version;
552 // Transaction mutations are logged to WAL at COMMIT time, not during the transaction.
553 let buf = L0Buffer::new(current_version, None);
554 // SSI: stamp the OCC read sequence at begin so commit can detect any
555 // transaction that committed since. Gated on the runtime `ssi_enabled`
556 // toggle — when off, `occ_read_set` stays `None` and every downstream
557 // read-set recording / commit validation self-gates to a no-op.
558 let buf = if self.config.ssi_enabled {
559 let mut buf = buf;
560 buf.occ_read_seq = self.commit_sequence.load(Ordering::Relaxed);
561 // The read path records observed ids here for SSI antidependency
562 // detection; commit consults it.
563 buf.occ_read_set = Some(Arc::new(parking_lot::Mutex::new(
564 crate::runtime::l0::OccReadSet::default(),
565 )));
566 buf
567 } else {
568 buf
569 };
570 Arc::new(RwLock::new(buf))
571 }
572
573 /// Resolve the target L0 buffer for a mutation.
574 ///
575 /// When `tx_l0` is `Some`, the mutation targets a transaction-private buffer.
576 /// When `None`, it targets the global L0 from the manager.
577 fn resolve_l0(&self, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) -> Arc<RwLock<L0Buffer>> {
578 tx_l0
579 .cloned()
580 .unwrap_or_else(|| self.l0_manager.get_current())
581 }
582
583 fn update_metrics(&self) {
584 let l0 = self.l0_manager.get_current();
585 let size = l0.read().estimated_size;
586 metrics::gauge!("l0_buffer_size_bytes").set(size as f64);
587 }
588
589 /// Overlay-aware issue-#77 edge-endpoint validation.
590 ///
591 /// The current buffer alone does not hold all committed-but-unflushed
592 /// tombstones — a flush rotation moves them onto `pending_flush` until
593 /// the Lance write completes. A vertex is effectively deleted iff,
594 /// walking newest-first (tx → current → pending newest→oldest), the
595 /// first buffer that knows the vid says "tombstoned" (an insert clears
596 /// the tombstone within a buffer, so props/tombstone are mutually
597 /// exclusive per buffer).
598 ///
599 /// Must run under `flush_lock` so the overlay cannot change before the
600 /// merge, and BEFORE the durable WAL flush (see the call site).
601 fn validate_edge_endpoints_overlay(&self, tx_l0: &L0Buffer) -> Result<()> {
602 let chain = self.l0_manager.get_pending_flush();
603 let current = self.l0_manager.get_current();
604 let effectively_deleted = |vid: &Vid| -> bool {
605 if tx_l0.vertex_properties.contains_key(vid) {
606 return false;
607 }
608 if tx_l0.vertex_tombstones.contains(vid) {
609 return true;
610 }
611 {
612 let cur = current.read();
613 if cur.vertex_properties.contains_key(vid) {
614 return false;
615 }
616 if cur.vertex_tombstones.contains(vid) {
617 return true;
618 }
619 }
620 for frozen in chain.iter().rev() {
621 let g = frozen.read();
622 if g.vertex_properties.contains_key(vid) {
623 return false;
624 }
625 if g.vertex_tombstones.contains(vid) {
626 return true;
627 }
628 }
629 false
630 };
631 for (eid, (src_vid, dst_vid, _etype)) in &tx_l0.edge_endpoints {
632 if tx_l0.tombstones.contains_key(eid) {
633 continue; // a deletion, not an insertion — never resurrects a vertex
634 }
635 if effectively_deleted(src_vid) {
636 anyhow::bail!(
637 "Cannot insert edge {}: source vertex {} has been deleted (issue #77)",
638 eid,
639 src_vid
640 );
641 }
642 if effectively_deleted(dst_vid) {
643 anyhow::bail!(
644 "Cannot insert edge {}: destination vertex {} has been deleted (issue #77)",
645 eid,
646 dst_vid
647 );
648 }
649 }
650 Ok(())
651 }
652
653 /// Seed `main_l0` (the current buffer) with the newest pending-overlay
654 /// value for each CRDT property the transaction writes, so the commit
655 /// merge MERGES against the committed CRDT state instead of shadowing it
656 /// (the carve-out lets concurrent CRDT writers commit on the assumption
657 /// that the merge sees the committed value — true only while that value
658 /// lives in current, not mid-flush on `pending_flush`). No-op when
659 /// nothing is pending or the property already exists in current. Vertex
660 /// properties only, mirroring the carve-out itself.
661 fn seed_crdt_state_from_chain(&self, tx_l0: &L0Buffer, main_l0: &mut L0Buffer) {
662 let chain = self.l0_manager.get_pending_flush();
663 if chain.is_empty() {
664 return;
665 }
666 for (vid, props) in &tx_l0.vertex_properties {
667 Self::seed_crdt_props(&chain, *vid, props, main_l0);
668 }
669 }
670
671 /// Per-vertex CRDT seeding (see [`Self::seed_crdt_state_from_chain`]).
672 /// Also used by the non-transactional vertex write path, which CRDT-merges
673 /// into the current buffer directly and has the same shadowing hazard
674 /// during a flush window.
675 fn seed_crdt_props(
676 chain: &[Arc<RwLock<L0Buffer>>],
677 vid: Vid,
678 props: &Properties,
679 target: &mut L0Buffer,
680 ) {
681 for (key, value) in props {
682 if crate::runtime::l0::try_as_crdt(value).is_none() {
683 continue;
684 }
685 if target
686 .vertex_properties
687 .get(&vid)
688 .is_some_and(|p| p.contains_key(key))
689 {
690 continue;
691 }
692 // Newest generation first: the first hit is the live state.
693 for frozen in chain.iter().rev() {
694 let g = frozen.read();
695 if let Some(v) = g.vertex_properties.get(&vid).and_then(|p| p.get(key)) {
696 if crate::runtime::l0::try_as_crdt(v).is_some() {
697 target
698 .vertex_properties
699 .entry(vid)
700 .or_default()
701 .insert(key.clone(), v.clone());
702 }
703 break;
704 }
705 }
706 }
707 }
708
709 /// Commit an externally-owned transaction L0 buffer.
710 ///
711 /// Writes mutations to WAL, flushes, merges into main L0, and replays
712 /// edges into the AdjacencyManager. Returns the WAL LSN of the commit
713 /// (0 when no WAL is configured).
714 /// Commit a transaction's private L0 buffer into main L0.
715 ///
716 /// Returns `(wal_lsn, flush_pending)`. When `flush_pending == true`, the
717 /// post-commit `should_flush()` predicate fired but no flush ran — the
718 /// caller is expected to spawn a background `flush_to_l1`. This is the
719 /// shape used when `UniConfig::async_flush_enabled` is set, so commits
720 /// don't block on L1-streaming I/O.
721 pub async fn commit_transaction_l0(
722 self: &Arc<Self>,
723 tx_l0_arc: Arc<RwLock<L0Buffer>>,
724 ) -> Result<(u64, bool)> {
725 // Hold `flush_lock` across WAL append + flush + main-L0 merge.
726 // Two concurrent commits serialize here; in Phase 3 the outer
727 // `Arc<RwLock<Writer>>` already provides this exclusion, so the
728 // acquisition is uncontended. Phase 4 drops the outer lock and
729 // this becomes the load-bearing serialization point.
730 let _flush_lock_guard = self.flush_lock.lock().await;
731
732 // Crash-recovery seam: simulate process death immediately after winning
733 // the commit serialization point but before any durable work. No-op
734 // unless built with `--features failpoints`. (See ssi_resilience tests.)
735 fail::fail_point!("commit::after-flush-lock");
736
737 // SSI: optimistic conflict detection. This MUST run before any WAL
738 // write — `flush_wal()` below is the durable commit point and the WAL
739 // has no abort marker, so aborting after it would resurrect this
740 // transaction on crash recovery. The write-set is reused for
741 // registration after a successful merge.
742 // Runtime-gated on `config.ssi_enabled`. When off, no validation runs
743 // and `occ_write_set` is `None`, so the post-merge registration below
744 // is skipped — reproducing last-writer-wins exactly.
745 let occ_write_set: Option<crate::runtime::occ::WriteSet> = if self.config.ssi_enabled {
746 let tx_l0 = tx_l0_arc.read();
747 let read_seq = tx_l0.occ_read_seq;
748 let write_set = crate::runtime::occ::WriteSet::from_l0(&tx_l0);
749 if !write_set.is_empty() {
750 // Telemetry: one validation per non-empty (writing) commit. The
751 // ratio of conflicts to validations is the headline abort rate.
752 metrics::counter!("uni_ssi_commit_validations_total").increment(1);
753 // Read-set is consulted only for writing transactions, so a
754 // read-only commit (empty write-set) runs at snapshot isolation.
755 let read_guard = tx_l0.occ_read_set.as_ref().map(|rs| rs.lock());
756 if let Some(conflict) =
757 self.committed_writes
758 .lock()
759 .check(read_seq, &write_set, read_guard.as_deref())
760 {
761 use crate::runtime::occ::Conflict;
762 match &conflict {
763 Conflict::WriteWrite { .. } => metrics::counter!(
764 "uni_ssi_serialization_conflicts_total",
765 "kind" => "write_write",
766 )
767 .increment(1),
768 Conflict::ReadWrite { .. } => metrics::counter!(
769 "uni_ssi_serialization_conflicts_total",
770 "kind" => "read_write",
771 )
772 .increment(1),
773 Conflict::HistoryTruncated { .. } => {
774 metrics::counter!("uni_ssi_history_truncated_total").increment(1)
775 }
776 }
777 return Err(anyhow::Error::new(
778 uni_common::UniError::SerializationConflict {
779 message: conflict.to_string(),
780 },
781 ));
782 }
783 }
784
785 // Validate against the committed-but-unflushed overlay under
786 // `flush_lock`: serializable MERGE uniqueness + CRDT carve-out
787 // soundness. The current buffer alone does not hold all committed
788 // state — a flush rotation moves it onto `pending_flush` until the
789 // Lance write completes (the Bug #9A window, here at the
790 // commit-time layer) — so every check walks [current, pending…].
791 {
792 let pending = self.l0_manager.get_pending_flush();
793 let main_l0 = self.l0_manager.get_current();
794 let overlay: Vec<Arc<RwLock<L0Buffer>>> =
795 std::iter::once(main_l0).chain(pending).collect();
796
797 // SSI / serializable MERGE: abort if a concurrent transaction has
798 // already committed a row with one of this transaction's unique
799 // keys. Commits serialize here, so this closes the race window
800 // left by the per-insert check. (Empty index → no iterations.)
801 for (key, vid) in &tx_l0.constraint_index {
802 if overlay
803 .iter()
804 .any(|b| b.read().has_constraint_key(key, *vid))
805 {
806 metrics::counter!("uni_ssi_constraint_conflicts_total").increment(1);
807 return Err(anyhow::Error::new(
808 uni_common::UniError::ConstraintConflict {
809 message: "unique key already committed by a concurrent \
810 transaction"
811 .to_string(),
812 },
813 ));
814 }
815 }
816
817 // Same race window for global ext_id uniqueness: the per-insert
818 // check ran against an older main L0; re-probe the committed
819 // index here, where commits serialize.
820 for (ext_id, vid) in &tx_l0.extid_index {
821 let taken = overlay.iter().any(|b| {
822 matches!(b.read().extid_index.get(ext_id), Some(&owner) if owner != *vid)
823 });
824 if taken {
825 metrics::counter!("uni_ssi_constraint_conflicts_total").increment(1);
826 return Err(anyhow::Error::new(
827 uni_common::UniError::ConstraintConflict {
828 message: format!(
829 "ext_id '{ext_id}' already committed by a concurrent \
830 transaction"
831 ),
832 },
833 ));
834 }
835 }
836
837 // CRDT carve-out soundness: a pure-CRDT write was dropped from the
838 // write-set assuming its merge commutes. If the overlay holds a
839 // *different* CRDT variant for the same property, the merge would
840 // silently overwrite it — abort instead of losing the update.
841 // (Checked against every overlay buffer: conservative if an old
842 // generation held a different variant that a newer commit already
843 // replaced, but an abort+retry is always sound.)
844 for buf in &overlay {
845 if let Some(conflict) =
846 crate::runtime::occ::crdt_carveout_overwrite(&tx_l0, &buf.read())
847 {
848 metrics::counter!("uni_ssi_crdt_aborts_total").increment(1);
849 return Err(anyhow::Error::new(
850 uni_common::UniError::SerializationConflict {
851 message: conflict.to_string(),
852 },
853 ));
854 }
855 }
856 }
857 Some(write_set)
858 } else {
859 None
860 };
861
862 // Issue #77: an edge whose endpoint is effectively deleted makes the
863 // merge below bail. That bail MUST happen before the durable WAL flush —
864 // after it the transaction is committed-but-unmerged (a ghost commit),
865 // and WAL replay re-hits the same bail, making the database unopenable.
866 // SSI validation was deliberately placed before the flush for exactly
867 // this reason; the endpoint check belongs here too. Runs unconditionally
868 // (issue #77 is not SSI-gated) under `flush_lock`, so the overlay
869 // tombstone state cannot change between here and the merge. A
870 // tombstone may live in a flush-rotated pending buffer rather than
871 // the current buffer, so the check walks the overlay newest-first.
872 {
873 let tx_l0 = tx_l0_arc.read();
874 self.validate_edge_endpoints_overlay(&tx_l0)?;
875 }
876
877 // Crash-recovery seam: SSI validation has passed; the transaction is
878 // about to become durable. A crash here must leave NO trace (validation
879 // happens before the WAL is touched). No-op unless `failpoints`.
880 fail::fail_point!("commit::after-validate");
881
882 // 1. Write transaction mutations to WAL BEFORE merging into main L0
883 // This ensures durability before visibility.
884 {
885 let tx_l0 = tx_l0_arc.read();
886 let main_l0_arc = self.l0_manager.get_current();
887 let main_l0 = main_l0_arc.read();
888
889 // If WAL exists, write mutations to it for durability
890 if let Some(wal) = main_l0.wal.as_ref() {
891 // Order: vertices first, then edges (to ensure src/dst exist on replay)
892
893 // Vertex insertions
894 for (vid, properties) in &tx_l0.vertex_properties {
895 if !tx_l0.vertex_tombstones.contains(vid) {
896 let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
897 wal.append(crate::runtime::wal::Mutation::InsertVertex {
898 vid: *vid,
899 properties: properties.clone(),
900 labels,
901 })?;
902 }
903 }
904
905 // Vertex deletions
906 for vid in &tx_l0.vertex_tombstones {
907 let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
908 wal.append(crate::runtime::wal::Mutation::DeleteVertex { vid: *vid, labels })?;
909 }
910
911 // Label-only mutations (SET n:Label / REMOVE n:Label). After
912 // vertex inserts (so the vertex exists on replay), before edges,
913 // and skipping vertices deleted in this same commit.
914 for vid in &tx_l0.vertex_label_overwrites {
915 if tx_l0.vertex_tombstones.contains(vid) {
916 continue;
917 }
918 let labels = tx_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
919 wal.append(crate::runtime::wal::Mutation::SetVertexLabels {
920 vid: *vid,
921 labels,
922 })?;
923 }
924
925 // Crash-recovery seam: vertices appended, edges not yet. Tests
926 // assert that a crash here (before `flush_wal`) recovers NOTHING
927 // — the durable commit point is the flush below, not append.
928 fail::fail_point!("commit::mid-wal");
929
930 // Edge insertions and deletions from edge_endpoints
931 for (eid, (src_vid, dst_vid, edge_type)) in &tx_l0.edge_endpoints {
932 if tx_l0.tombstones.contains_key(eid) {
933 let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
934 wal.append(crate::runtime::wal::Mutation::DeleteEdge {
935 eid: *eid,
936 src_vid: *src_vid,
937 dst_vid: *dst_vid,
938 edge_type: *edge_type,
939 version,
940 })?;
941 } else {
942 let properties =
943 tx_l0.edge_properties.get(eid).cloned().unwrap_or_default();
944 let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
945 let edge_type_name = tx_l0.edge_types.get(eid).cloned();
946 wal.append(crate::runtime::wal::Mutation::InsertEdge {
947 src_vid: *src_vid,
948 dst_vid: *dst_vid,
949 edge_type: *edge_type,
950 eid: *eid,
951 version,
952 properties,
953 edge_type_name,
954 })?;
955 }
956 }
957
958 // Tombstones for edges that only exist in the global L0 (not in
959 // this transaction's edge_endpoints). Without this, deletes of
960 // pre-existing edges would be silently lost.
961 for (eid, tombstone) in &tx_l0.tombstones {
962 if !tx_l0.edge_endpoints.contains_key(eid) {
963 let version = tx_l0.edge_versions.get(eid).copied().unwrap_or(0);
964 wal.append(crate::runtime::wal::Mutation::DeleteEdge {
965 eid: *eid,
966 src_vid: tombstone.src_vid,
967 dst_vid: tombstone.dst_vid,
968 edge_type: tombstone.edge_type,
969 version,
970 })?;
971 }
972 }
973 }
974 }
975
976 // 2. Flush WAL to durable storage - THIS IS THE COMMIT POINT
977 let wal_lsn = self.flush_wal().await?;
978
979 // Crash-recovery seam: the WAL is durable but main L0 has NOT merged.
980 // A crash here must RECOVER the transaction on replay (it is committed),
981 // even though it was never made visible in-process. No-op unless `failpoints`.
982 fail::fail_point!("commit::after-wal-flush");
983
984 // Component C1: if an outstanding snapshot pins the current generation,
985 // clone it aside (lazy copy-on-write) before merging, so the pinning
986 // transaction's reads stay isolated from this commit. No-op — and zero
987 // cost — when nothing is pinned (the common case). We hold `flush_lock`,
988 // so this cannot race a flush rotate or another commit's merge; the merge
989 // below re-fetches `get_current()`, landing in the fresh post-freeze buffer.
990 // Self-gates on the runtime SSI toggle: a snapshot is only ever pinned by
991 // a transaction begun under `ssi_enabled`, so `is_current_pinned()` is
992 // always false when SSI is off and this is a zero-cost no-op.
993 if self.l0_manager.is_current_pinned() {
994 self.l0_manager.freeze_current_for_snapshot();
995 metrics::counter!("uni_l0_snapshot_freezes_total").increment(1);
996 }
997
998 // 3. Merge into main L0 and make visible
999 {
1000 // Write-lock the tx buffer: `merge_take` moves its property maps
1001 // into main L0 instead of cloning them. The commit consumes the
1002 // transaction, so the drained maps are never observed afterwards;
1003 // everything read below (endpoints, versions, tombstones) is left
1004 // intact.
1005 let mut tx_l0 = tx_l0_arc.write();
1006 let main_l0_arc = self.l0_manager.get_current();
1007 let mut main_l0 = main_l0_arc.write();
1008 // A CRDT property's committed state may live only in a
1009 // flush-rotated pending buffer (the post-rotation current is
1010 // empty until the Lance write completes — the Bug #9A window).
1011 // `merge_crdt_properties` merges against the CURRENT buffer's
1012 // value — without seeding, the tx's CRDT state would SHADOW the
1013 // pending buffer's at read time (newest buffer wins per property)
1014 // and concurrent increments would be lost. Seed the newest
1015 // overlay value for each CRDT property the tx writes that current
1016 // lacks, so the merge below merges instead of replaces.
1017 self.seed_crdt_state_from_chain(&tx_l0, &mut main_l0);
1018 main_l0.merge_take(&mut tx_l0)?;
1019
1020 // Replay transaction edges into the AdjacencyManager overlay
1021 for (eid, (src, dst, etype)) in &tx_l0.edge_endpoints {
1022 let edge_version = tx_l0
1023 .edge_versions
1024 .get(eid)
1025 .copied()
1026 .unwrap_or(main_l0.current_version);
1027 if tx_l0.tombstones.contains_key(eid) {
1028 self.adjacency_manager
1029 .add_tombstone(*eid, *src, *dst, *etype, edge_version);
1030 } else {
1031 self.adjacency_manager
1032 .insert_edge(*src, *dst, *eid, *etype, edge_version);
1033 }
1034 }
1035
1036 // Replay tombstones for edges that only exist in the global L0
1037 // (not in this transaction's edge_endpoints).
1038 for (eid, tombstone) in &tx_l0.tombstones {
1039 if !tx_l0.edge_endpoints.contains_key(eid) {
1040 let edge_version = tx_l0
1041 .edge_versions
1042 .get(eid)
1043 .copied()
1044 .unwrap_or(main_l0.current_version);
1045 self.adjacency_manager.add_tombstone(
1046 *eid,
1047 tombstone.src_vid,
1048 tombstone.dst_vid,
1049 tombstone.edge_type,
1050 edge_version,
1051 );
1052 }
1053 }
1054 }
1055
1056 // Crash-recovery seam: durable AND merged, but the in-memory commit
1057 // registry has not recorded this write-set yet. A crash here is
1058 // indistinguishable from one at `after-wal-flush` on reopen (the
1059 // registry is in-memory and rebuilt empty); the tx still recovers.
1060 fail::fail_point!("commit::after-merge");
1061
1062 // SSI: register this commit's write-set under a fresh commit sequence so
1063 // later transactions detect conflicts against it. Still under
1064 // `flush_lock`, before the async-flush branch can drop the guard.
1065 // `occ_write_set` is `Some` only when `config.ssi_enabled`.
1066 if let Some(write_set) = occ_write_set
1067 && !write_set.is_empty()
1068 {
1069 let seq = self.commit_sequence.fetch_add(1, Ordering::Relaxed) + 1;
1070 self.committed_writes.lock().record(seq, write_set);
1071 }
1072
1073 self.update_metrics();
1074
1075 // 4. Best-effort post-commit auto-flush.
1076 //
1077 // Two paths:
1078 // - async_flush_enabled = false (default): inline under our
1079 // existing flush_lock guard via flush_inline_under_lock.
1080 // - async_flush_enabled = true: rotate inline, drop flush_lock,
1081 // then submit the stream phase to the coordinator. Gated on
1082 // `pending_flush_count() < max_pending_flushes` so we don't
1083 // stack up rotations beyond the configured pipeline depth.
1084 // `try_acquire_permit` is non-blocking: if we lose the race
1085 // for the last permit, we just skip this trigger (the next
1086 // commit retries).
1087 let mut flush_pending = false;
1088 if self.should_flush() {
1089 if self.config.async_flush_enabled
1090 && let Some(coord) = self.flush_coordinator.as_ref()
1091 && coord.pending_flush_count() < self.config.max_pending_flushes
1092 {
1093 match coord.try_acquire_permit() {
1094 Some(permit) => {
1095 match self.flush_l0_rotate().await {
1096 Ok(rotate_out) => {
1097 // Allocate the rotate seq and bump pending ONLY
1098 // after the rotate succeeds (Bug #3). A failed
1099 // rotate must consume neither: the finalizer
1100 // advances strictly in consecutive seq order and
1101 // only decrements pending on finalize, so a
1102 // leaked seq/pending from a failed rotate would
1103 // wedge the finalizer forever and climb pending
1104 // toward `max_pending_flushes`. The seq is still
1105 // allocated under `flush_lock` (immediately after
1106 // the rotate, before the guard drops below), so
1107 // concurrent rotates keep seq order == rotation
1108 // order, and the seq is not used until submit.
1109 let seq = coord.next_rotate_seq();
1110 coord.note_pending();
1111 // Release flush_lock BEFORE the spawn so concurrent
1112 // commits can proceed while the stream runs.
1113 drop(_flush_lock_guard);
1114 let parent_manifest = self.cached_manifest.lock().clone();
1115 let rotated = crate::runtime::flush_coordinator::RotatedFlush {
1116 seq,
1117 old_l0_arc: rotate_out.old_l0_arc.clone(),
1118 wal_lsn: rotate_out.wal_lsn,
1119 current_version: rotate_out.current_version,
1120 name: None,
1121 parent_manifest,
1122 permit,
1123 flush_in_progress_guard: rotate_out.flush_in_progress_guard,
1124 };
1125 let writer = self.clone();
1126 let _ticket = coord.submit_for_stream(
1127 rotated,
1128 move |old_l0, wal, ver, n| async move {
1129 let outcome =
1130 writer.flush_stream_l1(old_l0, wal, ver, n).await?;
1131 Ok(crate::runtime::flush_coordinator::FlushOutcome {
1132 new_manifest: outcome.manifest,
1133 snapshot_id: outcome.snapshot_id,
1134 })
1135 },
1136 );
1137 flush_pending = true;
1138 // Early return — flush_lock already dropped.
1139 return Ok((wal_lsn, flush_pending));
1140 }
1141 Err(e) => {
1142 tracing::warn!("Async rotate failed (non-critical): {}", e);
1143 // No seq was allocated and pending was not
1144 // bumped (both moved into the Ok arm for Bug
1145 // #3), so the finalizer is not wedged. The
1146 // permit drops here, freeing the slot.
1147 }
1148 }
1149 }
1150 None => {
1151 // Race: someone else grabbed the last permit. Skip;
1152 // next commit will retry should_flush().
1153 metrics::counter!("uni_flush_trigger_skipped_total").increment(1);
1154 }
1155 }
1156 } else if let Err(e) = self.flush_inline_under_lock(None).await {
1157 tracing::warn!("Post-commit flush check failed (non-critical): {}", e);
1158 }
1159 }
1160
1161 Ok((wal_lsn, flush_pending))
1162 }
1163
1164 /// Flush the WAL buffer to durable storage.
1165 ///
1166 /// Returns the LSN of the flushed segment, or `0` when no WAL is configured.
1167 pub async fn flush_wal(&self) -> Result<u64> {
1168 let l0 = self.l0_manager.get_current();
1169 let wal = l0.read().wal.clone();
1170
1171 match wal {
1172 Some(wal) => Ok(wal.flush().await?),
1173 None => Ok(0),
1174 }
1175 }
1176
1177 /// Record property removals in the active L0 mutation stats.
1178 ///
1179 /// Routes to the transaction L0 if provided, otherwise to the main L0.
1180 pub fn track_properties_removed(&self, count: usize, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) {
1181 if count == 0 {
1182 return;
1183 }
1184 let l0 = self.resolve_l0(tx_l0);
1185 l0.write().mutation_stats.properties_removed += count;
1186 }
1187
1188 /// Validates vertex constraints for the given properties.
1189 /// In the new design, label is passed as a parameter since VID no longer embeds label.
1190 async fn validate_vertex_constraints_for_label(
1191 &self,
1192 vid: Vid,
1193 properties: &Properties,
1194 label: &str,
1195 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1196 ) -> Result<()> {
1197 self.validate_vertex_constraints_for_label_impl(vid, properties, label, tx_l0, false)
1198 .await
1199 }
1200
1201 /// Partial-update sibling: validates only constraints touching keys
1202 /// present in `properties` (the touched set). NOT NULL is checked
1203 /// only for touched keys; multi-key UNIQUE / CHECK / EXISTS are
1204 /// skipped when any referenced key is absent (the caller is
1205 /// expected to have routed to the full-row path in that case via
1206 /// `touched_needs_full_read`).
1207 async fn validate_vertex_constraints_for_label_partial(
1208 &self,
1209 vid: Vid,
1210 properties: &Properties,
1211 label: &str,
1212 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1213 ) -> Result<()> {
1214 self.validate_vertex_constraints_for_label_impl(vid, properties, label, tx_l0, true)
1215 .await
1216 }
1217
1218 async fn validate_vertex_constraints_for_label_impl(
1219 &self,
1220 vid: Vid,
1221 properties: &Properties,
1222 label: &str,
1223 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1224 partial: bool,
1225 ) -> Result<()> {
1226 let schema = self.schema_manager.schema();
1227
1228 {
1229 // 1. Check NOT NULL constraints (from Property definitions).
1230 // Under partial-update mode, skip properties NOT in
1231 // `properties` — they retain their previous (already-
1232 // validated) value.
1233 if let Some(props_meta) = schema.properties.get(label) {
1234 for (prop_name, meta) in props_meta {
1235 if !meta.nullable {
1236 let present = properties.get(prop_name);
1237 if partial && present.is_none() {
1238 continue;
1239 }
1240 if present.is_none_or(|v| v.is_null()) {
1241 log::warn!(
1242 "Constraint violation: Property '{}' cannot be null for label '{}'",
1243 prop_name,
1244 label
1245 );
1246 return Err(anyhow!(
1247 "Constraint violation: Property '{}' cannot be null",
1248 prop_name
1249 ));
1250 }
1251 }
1252 }
1253 }
1254
1255 // 2. Check Explicit Constraints (Unique, Check, etc.)
1256 for constraint in &schema.constraints {
1257 if !constraint.enabled {
1258 continue;
1259 }
1260 match &constraint.target {
1261 ConstraintTarget::Label(l) if l == label => {}
1262 _ => continue,
1263 }
1264
1265 match &constraint.constraint_type {
1266 ConstraintType::Unique {
1267 properties: unique_props,
1268 } => {
1269 // Support single and multi-property unique constraints
1270 if !unique_props.is_empty() {
1271 let mut key_values = Vec::new();
1272 let mut missing = false;
1273 for prop in unique_props {
1274 if let Some(val) = properties.get(prop) {
1275 key_values.push((prop.clone(), val.clone()));
1276 } else {
1277 missing = true; // Can't enforce if property missing (partial update?)
1278 // For INSERT, missing means null?
1279 // If property is nullable, unique constraint typically allows multiple nulls or ignores?
1280 // For now, only check if ALL keys are present
1281 }
1282 }
1283
1284 if !missing {
1285 self.check_unique_constraint_multi(label, &key_values, vid, tx_l0)
1286 .await?;
1287 }
1288 }
1289 }
1290 ConstraintType::Exists { property } => {
1291 if properties.get(property).is_none_or(|v| v.is_null()) {
1292 log::warn!(
1293 "Constraint violation: Property '{}' must exist for label '{}'",
1294 property,
1295 label
1296 );
1297 return Err(anyhow!(
1298 "Constraint violation: Property '{}' must exist",
1299 property
1300 ));
1301 }
1302 }
1303 ConstraintType::Check { expression } => {
1304 if !self.evaluate_check_constraint(expression, properties)? {
1305 return Err(anyhow!(
1306 "CHECK constraint '{}' violated: expression '{}' evaluated to false",
1307 constraint.name,
1308 expression
1309 ));
1310 }
1311 }
1312 _ => {
1313 return Err(anyhow!("Unsupported constraint type"));
1314 }
1315 }
1316 }
1317 }
1318 Ok(())
1319 }
1320
1321 /// Validates vertex constraints for a vertex with the given labels.
1322 /// Labels must be passed explicitly since the vertex may not yet be in L0.
1323 /// Unknown labels (not in schema) are skipped.
1324 async fn validate_vertex_constraints(
1325 &self,
1326 vid: Vid,
1327 properties: &Properties,
1328 labels: &[String],
1329 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1330 ) -> Result<()> {
1331 let schema = self.schema_manager.schema();
1332
1333 // Validate constraints only for known labels
1334 for label in labels {
1335 // Skip unknown labels (schemaless support)
1336 if schema.get_label_case_insensitive(label).is_none() {
1337 continue;
1338 }
1339 self.validate_vertex_constraints_for_label(vid, properties, label, tx_l0)
1340 .await?;
1341 }
1342
1343 // Check global ext_id uniqueness if ext_id is provided
1344 if let Some(ext_id) = properties.get("ext_id").and_then(|v| v.as_str()) {
1345 self.check_extid_globally_unique(ext_id, vid, tx_l0).await?;
1346 }
1347
1348 Ok(())
1349 }
1350
1351 /// Partial sibling of `validate_vertex_constraints` — validates only
1352 /// constraints touching keys present in `properties`. Used by
1353 /// `insert_vertex_partial`'s fast path; the caller pre-screens for
1354 /// multi-key UNIQUE constraints via `touched_needs_full_read`.
1355 async fn validate_vertex_constraints_partial(
1356 &self,
1357 vid: Vid,
1358 touched: &Properties,
1359 labels: &[String],
1360 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1361 ) -> Result<()> {
1362 let schema = self.schema_manager.schema();
1363 for label in labels {
1364 if schema.get_label_case_insensitive(label).is_none() {
1365 continue;
1366 }
1367 self.validate_vertex_constraints_for_label_partial(vid, touched, label, tx_l0)
1368 .await?;
1369 }
1370 if let Some(ext_id) = touched.get("ext_id").and_then(|v| v.as_str()) {
1371 self.check_extid_globally_unique(ext_id, vid, tx_l0).await?;
1372 }
1373 Ok(())
1374 }
1375
1376 /// Collect ext_ids and unique constraint keys from an iterator of vertex properties.
1377 ///
1378 /// Used to build a constraint key index from L0 buffers for batch validation.
1379 fn collect_constraint_keys_from_properties<'a>(
1380 properties_iter: impl Iterator<Item = &'a Properties>,
1381 label: &str,
1382 constraints: &[uni_common::core::schema::Constraint],
1383 existing_keys: &mut HashMap<String, HashSet<String>>,
1384 existing_extids: &mut HashSet<String>,
1385 ) {
1386 for props in properties_iter {
1387 if let Some(ext_id) = props.get("ext_id").and_then(|v| v.as_str()) {
1388 existing_extids.insert(ext_id.to_string());
1389 }
1390
1391 for constraint in constraints {
1392 if !constraint.enabled {
1393 continue;
1394 }
1395 if let ConstraintTarget::Label(l) = &constraint.target {
1396 if l != label {
1397 continue;
1398 }
1399 } else {
1400 continue;
1401 }
1402
1403 if let ConstraintType::Unique {
1404 properties: unique_props,
1405 } = &constraint.constraint_type
1406 {
1407 let mut key_parts = Vec::new();
1408 let mut all_present = true;
1409 for prop in unique_props {
1410 if let Some(val) = props.get(prop) {
1411 key_parts.push(format!("{}:{}", prop, val));
1412 } else {
1413 all_present = false;
1414 break;
1415 }
1416 }
1417 if all_present {
1418 let key = key_parts.join("|");
1419 existing_keys
1420 .entry(constraint.name.clone())
1421 .or_default()
1422 .insert(key);
1423 }
1424 }
1425 }
1426 }
1427 }
1428
1429 /// Validates constraints for a batch of vertices efficiently.
1430 ///
1431 /// This method builds an in-memory index from L0 buffers ONCE instead of scanning
1432 /// per vertex, reducing complexity from O(n²) to O(n) for bulk inserts.
1433 ///
1434 /// # Arguments
1435 /// * `vids` - VIDs of vertices being inserted
1436 /// * `properties_batch` - Properties for each vertex
1437 /// * `label` - Label for all vertices (assumes single label for now)
1438 ///
1439 /// # Performance
1440 /// For N vertices with unique constraints:
1441 /// - Old approach: O(N²) - scan L0 buffer N times
1442 /// - New approach: O(N) - scan L0 buffer once, build HashSet, check each vertex in O(1)
1443 async fn validate_vertex_batch_constraints(
1444 &self,
1445 vids: &[Vid],
1446 properties_batch: &[Properties],
1447 label: &str,
1448 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1449 ) -> Result<()> {
1450 if vids.len() != properties_batch.len() {
1451 return Err(anyhow!("VID/properties length mismatch"));
1452 }
1453
1454 let schema = self.schema_manager.schema();
1455
1456 // 1. Validate NOT NULL constraints for each vertex
1457 if let Some(props_meta) = schema.properties.get(label) {
1458 for (idx, properties) in properties_batch.iter().enumerate() {
1459 for (prop_name, meta) in props_meta {
1460 if !meta.nullable && properties.get(prop_name).is_none_or(|v| v.is_null()) {
1461 return Err(anyhow!(
1462 "Constraint violation at index {}: Property '{}' cannot be null",
1463 idx,
1464 prop_name
1465 ));
1466 }
1467 }
1468 }
1469 }
1470
1471 // 2. Build constraint key index from L0 buffers (ONCE for entire batch)
1472 let mut existing_keys: HashMap<String, HashSet<String>> = HashMap::new();
1473 let mut existing_extids: HashSet<String> = HashSet::new();
1474
1475 // Scan current L0 buffer
1476 {
1477 let l0 = self.l0_manager.get_current();
1478 let l0_guard = l0.read();
1479 Self::collect_constraint_keys_from_properties(
1480 l0_guard.vertex_properties.values(),
1481 label,
1482 &schema.constraints,
1483 &mut existing_keys,
1484 &mut existing_extids,
1485 );
1486 }
1487
1488 // Scan transaction L0 if present
1489 if let Some(tx_l0) = tx_l0 {
1490 let tx_l0_guard = tx_l0.read();
1491 Self::collect_constraint_keys_from_properties(
1492 tx_l0_guard.vertex_properties.values(),
1493 label,
1494 &schema.constraints,
1495 &mut existing_keys,
1496 &mut existing_extids,
1497 );
1498 }
1499
1500 // 3. Check batch vertices against index AND check for duplicates within batch
1501 let mut batch_keys: HashMap<String, HashMap<String, usize>> = HashMap::new();
1502 let mut batch_extids: HashMap<String, usize> = HashMap::new();
1503
1504 for (idx, (_vid, properties)) in vids.iter().zip(properties_batch.iter()).enumerate() {
1505 // Check ext_id uniqueness
1506 if let Some(ext_id) = properties.get("ext_id").and_then(|v| v.as_str()) {
1507 if existing_extids.contains(ext_id) {
1508 return Err(anyhow!(
1509 "Constraint violation at index {}: ext_id '{}' already exists",
1510 idx,
1511 ext_id
1512 ));
1513 }
1514 if let Some(first_idx) = batch_extids.get(ext_id) {
1515 return Err(anyhow!(
1516 "Constraint violation: ext_id '{}' duplicated in batch at indices {} and {}",
1517 ext_id,
1518 first_idx,
1519 idx
1520 ));
1521 }
1522 batch_extids.insert(ext_id.to_string(), idx);
1523 }
1524
1525 // Check unique constraints
1526 for constraint in &schema.constraints {
1527 if !constraint.enabled {
1528 continue;
1529 }
1530 if let ConstraintTarget::Label(l) = &constraint.target {
1531 if l != label {
1532 continue;
1533 }
1534 } else {
1535 continue;
1536 }
1537
1538 match &constraint.constraint_type {
1539 ConstraintType::Unique {
1540 properties: unique_props,
1541 } => {
1542 let mut key_parts = Vec::new();
1543 let mut all_present = true;
1544 for prop in unique_props {
1545 if let Some(val) = properties.get(prop) {
1546 key_parts.push(format!("{}:{}", prop, val));
1547 } else {
1548 all_present = false;
1549 break;
1550 }
1551 }
1552
1553 if all_present {
1554 let key = key_parts.join("|");
1555
1556 // Check against existing L0 keys
1557 if let Some(keys) = existing_keys.get(&constraint.name)
1558 && keys.contains(&key)
1559 {
1560 return Err(anyhow!(
1561 "Constraint violation at index {}: Duplicate composite key for label '{}' (constraint '{}')",
1562 idx,
1563 label,
1564 constraint.name
1565 ));
1566 }
1567
1568 // Check for duplicates within batch
1569 let batch_constraint_keys =
1570 batch_keys.entry(constraint.name.clone()).or_default();
1571 if let Some(first_idx) = batch_constraint_keys.get(&key) {
1572 return Err(anyhow!(
1573 "Constraint violation: Duplicate key '{}' in batch at indices {} and {}",
1574 key,
1575 first_idx,
1576 idx
1577 ));
1578 }
1579 batch_constraint_keys.insert(key, idx);
1580 }
1581 }
1582 ConstraintType::Exists { property }
1583 if properties.get(property).is_none_or(|v| v.is_null()) =>
1584 {
1585 return Err(anyhow!(
1586 "Constraint violation at index {}: Property '{}' must exist",
1587 idx,
1588 property
1589 ));
1590 }
1591 ConstraintType::Check { expression }
1592 if !self.evaluate_check_constraint(expression, properties)? =>
1593 {
1594 return Err(anyhow!(
1595 "Constraint violation at index {}: CHECK constraint '{}' violated",
1596 idx,
1597 constraint.name
1598 ));
1599 }
1600 _ => {}
1601 }
1602 }
1603 }
1604
1605 // 4. Check storage for unique constraints (can batch this into a single query)
1606 for constraint in &schema.constraints {
1607 if !constraint.enabled {
1608 continue;
1609 }
1610 if let ConstraintTarget::Label(l) = &constraint.target {
1611 if l != label {
1612 continue;
1613 }
1614 } else {
1615 continue;
1616 }
1617
1618 if let ConstraintType::Unique {
1619 properties: unique_props,
1620 } = &constraint.constraint_type
1621 {
1622 // Build compound OR filter for all batch vertices
1623 let mut or_filters = Vec::new();
1624 for properties in properties_batch.iter() {
1625 let mut and_parts = Vec::new();
1626 let mut all_present = true;
1627 for prop in unique_props {
1628 if let Some(val) = properties.get(prop) {
1629 let val_str = match val {
1630 Value::String(s) => format!("'{}'", s.replace('\'', "''")),
1631 Value::Int(n) => n.to_string(),
1632 Value::Float(f) => f.to_string(),
1633 Value::Bool(b) => b.to_string(),
1634 _ => {
1635 all_present = false;
1636 break;
1637 }
1638 };
1639 and_parts.push(format!("{} = {}", prop, val_str));
1640 } else {
1641 all_present = false;
1642 break;
1643 }
1644 }
1645 if all_present {
1646 or_filters.push(format!("({})", and_parts.join(" AND ")));
1647 }
1648 }
1649
1650 #[cfg(feature = "lance-backend")]
1651 if !or_filters.is_empty() {
1652 let vid_list: Vec<String> =
1653 vids.iter().map(|v| v.as_u64().to_string()).collect();
1654 let filter = format!(
1655 "({}) AND _deleted = false AND _vid NOT IN ({})",
1656 or_filters.join(" OR "),
1657 vid_list.join(", ")
1658 );
1659
1660 if let Ok(ds) = self.storage.vertex_dataset(label)
1661 && let Ok(lance_ds) = ds.open_raw().await
1662 {
1663 let count = lance_ds.count_rows(Some(filter.clone())).await?;
1664 if count > 0 {
1665 return Err(anyhow!(
1666 "Constraint violation: Duplicate composite key for label '{}' in storage (constraint '{}')",
1667 label,
1668 constraint.name
1669 ));
1670 }
1671 }
1672 }
1673 }
1674 }
1675
1676 Ok(())
1677 }
1678
1679 /// Checks that ext_id is globally unique across all vertices.
1680 ///
1681 /// Searches L0 buffers (current, transaction, pending) and the main vertices table
1682 /// to ensure no other vertex uses this ext_id.
1683 ///
1684 /// # Errors
1685 ///
1686 /// Returns error if another vertex with the same ext_id exists.
1687 async fn check_extid_globally_unique(
1688 &self,
1689 ext_id: &str,
1690 current_vid: Vid,
1691 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1692 ) -> Result<()> {
1693 // Check L0 buffers: current, transaction, and pending flush
1694 let l0_buffers_to_check: Vec<Arc<RwLock<L0Buffer>>> = {
1695 let mut buffers = vec![self.l0_manager.get_current()];
1696 if let Some(tx_l0) = tx_l0 {
1697 buffers.push(tx_l0.clone());
1698 }
1699 buffers.extend(self.l0_manager.get_pending_flush());
1700 buffers
1701 };
1702
1703 for l0 in &l0_buffers_to_check {
1704 // O(1) per buffer via the maintained `extid_index` (the previous
1705 // full `vertex_properties` scan made constrained ingest O(n²)).
1706 if let Some(&vid) = l0.read().extid_index.get(ext_id)
1707 && vid != current_vid
1708 {
1709 return Err(anyhow!(
1710 "Constraint violation: ext_id '{}' already exists (vertex {:?})",
1711 ext_id,
1712 vid
1713 ));
1714 }
1715 }
1716
1717 // Check main vertices table (if it exists)
1718 // Pass None for global uniqueness check (not snapshot-isolated)
1719 let backend = self.storage.backend();
1720 if let Ok(Some(found_vid)) = MainVertexDataset::find_by_ext_id(backend, ext_id, None).await
1721 && found_vid != current_vid
1722 {
1723 return Err(anyhow!(
1724 "Constraint violation: ext_id '{}' already exists (vertex {:?})",
1725 ext_id,
1726 found_vid
1727 ));
1728 }
1729
1730 Ok(())
1731 }
1732
1733 /// Helper to get vertex labels from L0 buffer.
1734 fn get_vertex_labels_from_l0(&self, vid: Vid) -> Option<Vec<String>> {
1735 let l0 = self.l0_manager.get_current();
1736 let l0_guard = l0.read();
1737 // Check if vertex is tombstoned (deleted) - if so, return None
1738 if l0_guard.vertex_tombstones.contains(&vid) {
1739 return None;
1740 }
1741 l0_guard.get_vertex_labels(vid).map(|l| l.to_vec())
1742 }
1743
1744 /// Get vertex labels from all sources: current L0, pending L0s, and storage.
1745 /// This is the proper way to read vertex labels after a flush, as it checks both
1746 /// in-memory buffers and persisted storage.
1747 pub async fn get_vertex_labels(
1748 &self,
1749 vid: Vid,
1750 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1751 ) -> Option<Vec<String>> {
1752 // 1. Check current L0
1753 if let Some(labels) = self.get_vertex_labels_from_l0(vid) {
1754 return Some(labels);
1755 }
1756
1757 // 2. Check transaction L0 if present
1758 if let Some(tx_l0) = tx_l0 {
1759 let guard = tx_l0.read();
1760 if guard.vertex_tombstones.contains(&vid) {
1761 return None;
1762 }
1763 if let Some(labels) = guard.get_vertex_labels(vid) {
1764 return Some(labels.to_vec());
1765 }
1766 }
1767
1768 // 3. Check pending flush L0s
1769 for pending_l0 in self.l0_manager.get_pending_flush() {
1770 let guard = pending_l0.read();
1771 if guard.vertex_tombstones.contains(&vid) {
1772 return None;
1773 }
1774 if let Some(labels) = guard.get_vertex_labels(vid) {
1775 return Some(labels.to_vec());
1776 }
1777 }
1778
1779 // 4. Check storage
1780 self.find_vertex_labels_in_storage(vid).await.ok().flatten()
1781 }
1782
1783 /// Helper to get edge type from L0 buffer.
1784 fn get_edge_type_from_l0(&self, eid: Eid) -> Option<String> {
1785 let l0 = self.l0_manager.get_current();
1786 let l0_guard = l0.read();
1787 l0_guard.get_edge_type(eid).map(|s| s.to_string())
1788 }
1789
1790 /// Look up the edge type ID (u32) for an EID from the L0 buffer's edge endpoints.
1791 /// Falls back to the transaction L0 if available.
1792 pub fn get_edge_type_id_from_l0(
1793 &self,
1794 eid: Eid,
1795 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1796 ) -> Option<u32> {
1797 // Check transaction L0 first
1798 if let Some(tx_l0) = tx_l0 {
1799 let guard = tx_l0.read();
1800 if let Some((_, _, etype)) = guard.get_edge_endpoint_full(eid) {
1801 return Some(etype);
1802 }
1803 }
1804 // Fall back to main L0
1805 let l0 = self.l0_manager.get_current();
1806 let l0_guard = l0.read();
1807 l0_guard
1808 .get_edge_endpoint_full(eid)
1809 .map(|(_, _, etype)| etype)
1810 }
1811
1812 /// Set the type name for an edge (used for schemaless edge types).
1813 /// This is called during CREATE for edge types not found in the schema.
1814 pub fn set_edge_type(
1815 &self,
1816 eid: Eid,
1817 type_name: String,
1818 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1819 ) {
1820 self.resolve_l0(tx_l0).write().set_edge_type(eid, type_name);
1821 }
1822
1823 /// Evaluate a simple CHECK constraint expression.
1824 /// Supports: "property op value" (e.g., "age > 18", "status = 'active'")
1825 fn evaluate_check_constraint(&self, expression: &str, properties: &Properties) -> Result<bool> {
1826 let parts: Vec<&str> = expression.split_whitespace().collect();
1827 if parts.len() != 3 {
1828 // For now, only support "prop op val"
1829 // Fallback to true if too complex to avoid breaking, but warn
1830 log::warn!(
1831 "Complex CHECK constraint expression '{}' not fully supported yet; allowing write.",
1832 expression
1833 );
1834 return Ok(true);
1835 }
1836
1837 let prop_part = parts[0].trim_start_matches('(');
1838 // Handle "variable.property" format - take the part after the dot
1839 let prop_name = if let Some(idx) = prop_part.find('.') {
1840 &prop_part[idx + 1..]
1841 } else {
1842 prop_part
1843 };
1844
1845 let op = parts[1];
1846 let val_str = parts[2].trim_end_matches(')');
1847
1848 let prop_val = match properties.get(prop_name) {
1849 Some(v) => v,
1850 None => return Ok(true), // If property missing, CHECK usually passes (unless NOT NULL)
1851 };
1852
1853 // Parse value string (handle quotes for strings)
1854 let target_val = if (val_str.starts_with('\'') && val_str.ends_with('\''))
1855 || (val_str.starts_with('"') && val_str.ends_with('"'))
1856 {
1857 Value::String(val_str[1..val_str.len() - 1].to_string())
1858 } else if let Ok(n) = val_str.parse::<i64>() {
1859 Value::Int(n)
1860 } else if let Ok(n) = val_str.parse::<f64>() {
1861 Value::Float(n)
1862 } else if let Ok(b) = val_str.parse::<bool>() {
1863 Value::Bool(b)
1864 } else {
1865 // Check for internal format wrappers if they somehow leaked through
1866 if val_str.starts_with("Number(") && val_str.ends_with(')') {
1867 let n_str = &val_str[7..val_str.len() - 1];
1868 if let Ok(n) = n_str.parse::<i64>() {
1869 Value::Int(n)
1870 } else if let Ok(n) = n_str.parse::<f64>() {
1871 Value::Float(n)
1872 } else {
1873 Value::String(val_str.to_string())
1874 }
1875 } else {
1876 Value::String(val_str.to_string())
1877 }
1878 };
1879
1880 match op {
1881 "=" | "==" => Ok(prop_val == &target_val),
1882 "!=" | "<>" => Ok(prop_val != &target_val),
1883 ">" => self
1884 .compare_values(prop_val, &target_val)
1885 .map(|o| o.is_gt()),
1886 "<" => self
1887 .compare_values(prop_val, &target_val)
1888 .map(|o| o.is_lt()),
1889 ">=" => self
1890 .compare_values(prop_val, &target_val)
1891 .map(|o| o.is_ge()),
1892 "<=" => self
1893 .compare_values(prop_val, &target_val)
1894 .map(|o| o.is_le()),
1895 _ => {
1896 log::warn!("Unsupported operator '{}' in CHECK constraint", op);
1897 Ok(true)
1898 }
1899 }
1900 }
1901
1902 fn compare_values(&self, a: &Value, b: &Value) -> Result<std::cmp::Ordering> {
1903 use std::cmp::Ordering;
1904
1905 fn cmp_f64(x: f64, y: f64) -> Ordering {
1906 x.partial_cmp(&y).unwrap_or(Ordering::Equal)
1907 }
1908
1909 match (a, b) {
1910 (Value::Int(n1), Value::Int(n2)) => Ok(n1.cmp(n2)),
1911 (Value::Float(f1), Value::Float(f2)) => Ok(cmp_f64(*f1, *f2)),
1912 (Value::Int(n), Value::Float(f)) => Ok(cmp_f64(*n as f64, *f)),
1913 (Value::Float(f), Value::Int(n)) => Ok(cmp_f64(*f, *n as f64)),
1914 (Value::String(s1), Value::String(s2)) => Ok(s1.cmp(s2)),
1915 _ => Err(anyhow!(
1916 "Cannot compare incompatible types: {:?} vs {:?}",
1917 a,
1918 b
1919 )),
1920 }
1921 }
1922
1923 async fn check_unique_constraint_multi(
1924 &self,
1925 label: &str,
1926 key_values: &[(String, Value)],
1927 current_vid: Vid,
1928 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
1929 ) -> Result<()> {
1930 // Serialize constraint key once for O(1) lookups
1931 let key = serialize_constraint_key(label, key_values);
1932
1933 // 1. Check L0 (in-memory) using O(1) constraint index
1934 {
1935 let l0 = self.l0_manager.get_current();
1936 let l0_guard = l0.read();
1937 if l0_guard.has_constraint_key(&key, current_vid) {
1938 return Err(anyhow!(
1939 "Constraint violation: Duplicate composite key for label '{}'",
1940 label
1941 ));
1942 }
1943 }
1944
1945 // 1b. Check pending-flush buffers (Bug #9A). A flush rotates a key's
1946 // buffer onto `pending_flush` and installs a fresh empty current
1947 // buffer; until the rotated rows reach Lance the key is invisible to
1948 // both the current-buffer check above and the storage check below, so
1949 // a duplicate could slip through that flush window. Mirror the read
1950 // paths (e.g. `check_extid_globally_unique`, `get_vertex_labels`) that
1951 // already consult `pending_flush`.
1952 for pending_l0 in self.l0_manager.get_pending_flush() {
1953 if pending_l0.read().has_constraint_key(&key, current_vid) {
1954 return Err(anyhow!(
1955 "Constraint violation: Duplicate composite key for label '{}' (in pending flush)",
1956 label
1957 ));
1958 }
1959 }
1960
1961 // Check Transaction L0
1962 if let Some(tx_l0) = tx_l0 {
1963 let tx_l0_guard = tx_l0.read();
1964 if tx_l0_guard.has_constraint_key(&key, current_vid) {
1965 return Err(anyhow!(
1966 "Constraint violation: Duplicate composite key for label '{}' (in tx)",
1967 label
1968 ));
1969 }
1970 }
1971
1972 // 2. Check Storage (L1/L2)
1973 let filters: Vec<String> = key_values
1974 .iter()
1975 .map(|(prop, val)| {
1976 let val_str = match val {
1977 Value::String(s) => format!("'{}'", s.replace('\'', "''")),
1978 Value::Int(n) => n.to_string(),
1979 Value::Float(f) => f.to_string(),
1980 Value::Bool(b) => b.to_string(),
1981 _ => "NULL".to_string(),
1982 };
1983 format!("{} = {}", prop, val_str)
1984 })
1985 .collect();
1986
1987 let mut filter = filters.join(" AND ");
1988 filter.push_str(&format!(
1989 " AND _deleted = false AND _vid != {}",
1990 current_vid.as_u64()
1991 ));
1992
1993 #[cfg(feature = "lance-backend")]
1994 if let Ok(ds) = self.storage.vertex_dataset(label)
1995 && let Ok(lance_ds) = ds.open_raw().await
1996 {
1997 let count = lance_ds.count_rows(Some(filter.clone())).await?;
1998 if count > 0 {
1999 return Err(anyhow!(
2000 "Constraint violation: Duplicate composite key for label '{}' (in storage). Filter: {}",
2001 label,
2002 filter
2003 ));
2004 }
2005 }
2006
2007 Ok(())
2008 }
2009
2010 async fn check_write_pressure(&self) -> Result<()> {
2011 let status = self
2012 .storage
2013 .compaction_status()
2014 .map_err(|e| anyhow::anyhow!("Failed to get compaction status: {}", e))?;
2015 let l1_runs = status.l1_runs;
2016 let throttle = &self.config.throttle;
2017
2018 if l1_runs >= throttle.hard_limit {
2019 log::warn!("Write stalled: L1 runs ({}) at hard limit", l1_runs);
2020 // Simple polling for now
2021 while self
2022 .storage
2023 .compaction_status()
2024 .map_err(|e| anyhow::anyhow!("Failed to get compaction status: {}", e))?
2025 .l1_runs
2026 >= throttle.hard_limit
2027 {
2028 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2029 }
2030 } else if l1_runs >= throttle.soft_limit {
2031 let excess = l1_runs - throttle.soft_limit;
2032 // Cap multiplier to avoid overflow
2033 let excess = std::cmp::min(excess, 31);
2034 let multiplier = 2_u32.pow(excess as u32);
2035 let delay = throttle.base_delay * multiplier;
2036 tokio::time::sleep(delay).await;
2037 }
2038 Ok(())
2039 }
2040
2041 /// Check transaction memory limit to prevent OOM.
2042 /// No-op when no transaction is active.
2043 fn check_transaction_memory(&self, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) -> Result<()> {
2044 if let Some(tx_l0) = tx_l0 {
2045 let size = tx_l0.read().estimated_size;
2046 if size > self.config.max_transaction_memory {
2047 return Err(anyhow!(
2048 "Transaction memory limit exceeded: {} bytes used, limit is {} bytes. \
2049 Roll back or commit the current transaction.",
2050 size,
2051 self.config.max_transaction_memory
2052 ));
2053 }
2054 }
2055 Ok(())
2056 }
2057
2058 async fn get_query_context(
2059 &self,
2060 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2061 ) -> Option<QueryContext> {
2062 Some(QueryContext::new_with_pending(
2063 self.l0_manager.get_current(),
2064 tx_l0.cloned(),
2065 self.l0_manager.get_pending_flush(),
2066 ))
2067 }
2068
2069 /// Layer-1 CRDT variant enforcement, shared by the single-vertex and batch
2070 /// write paths.
2071 ///
2072 /// Rejects a declared CRDT property written as a parsed CRDT value
2073 /// (`Value::Map`) whose variant differs from the schema's declared variant.
2074 /// A mismatch would make the commit-time merge silently overwrite instead of
2075 /// merge, and the OCC CRDT carve-out (`occ::crdt_carveout_overwrite` /
2076 /// `WriteSet::from_l0`) would hide it as a lost update — so it must be caught
2077 /// at write time, on *every* write path. `try_as_crdt` is `Map`-gated, so the
2078 /// JSON-string (Cypher) form and non-CRDT values pass through untouched: they
2079 /// are never carved out and stay conflictable.
2080 fn enforce_crdt_variants(
2081 props_meta: &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
2082 properties: &Properties,
2083 ) -> Result<()> {
2084 for (key, value) in properties {
2085 let Some(meta) = props_meta.get(key) else {
2086 continue;
2087 };
2088 let uni_common::core::schema::DataType::Crdt(expected) = &meta.r#type else {
2089 continue;
2090 };
2091 if let Some(crdt) = crate::runtime::l0::try_as_crdt(value)
2092 && crdt.type_name() != expected.type_name()
2093 {
2094 return Err(anyhow::Error::new(uni_common::UniError::Constraint {
2095 message: format!(
2096 "CRDT property '{key}' must be written as a {} value",
2097 expected.type_name()
2098 ),
2099 }));
2100 }
2101 }
2102 Ok(())
2103 }
2104
2105 /// Prepare a vertex for upsert by merging CRDT properties with existing values.
2106 ///
2107 /// When `label` is provided, uses it directly to look up property metadata.
2108 /// Otherwise falls back to discovering the label from L0 buffers and storage.
2109 ///
2110 /// # Errors
2111 ///
2112 /// Returns an error if CRDT property merging fails.
2113 async fn prepare_vertex_upsert(
2114 &self,
2115 vid: Vid,
2116 properties: &mut Properties,
2117 label: Option<&str>,
2118 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2119 ) -> Result<()> {
2120 let Some(pm) = &self.property_manager else {
2121 return Ok(());
2122 };
2123
2124 let schema = self.schema_manager.schema();
2125
2126 // Resolve label: use provided label or discover from L0/storage
2127 let discovered_labels;
2128 let label_name = if let Some(l) = label {
2129 Some(l)
2130 } else {
2131 discovered_labels = self.get_vertex_labels(vid, tx_l0).await;
2132 discovered_labels
2133 .as_ref()
2134 .and_then(|l| l.first().map(|s| s.as_str()))
2135 };
2136
2137 let Some(label_str) = label_name else {
2138 return Ok(());
2139 };
2140 let Some(props_meta) = schema.properties.get(label_str) else {
2141 return Ok(());
2142 };
2143
2144 // Identify CRDT properties in the insert data
2145 let crdt_keys: Vec<String> = properties
2146 .keys()
2147 .filter(|key| {
2148 props_meta.get(*key).is_some_and(|meta| {
2149 matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2150 })
2151 })
2152 .cloned()
2153 .collect();
2154
2155 if crdt_keys.is_empty() {
2156 return Ok(());
2157 }
2158
2159 // Enforce that each declared CRDT property written as a parsed CRDT value
2160 // (`Value::Map`) carries its declared variant. A mismatched variant makes
2161 // `merge_crdt_properties` overwrite rather than merge at commit, and the
2162 // OCC carve-out (`occ::crdt_carveout_overwrite` / `WriteSet::from_l0`)
2163 // would hide that as a silent lost update — reject it at the source.
2164 //
2165 // Only the `Map` form is checked: it is exactly the form the carve-out
2166 // applies to (`try_as_crdt` is `Map`-gated). A CRDT written as a JSON
2167 // string (the Cypher form) or a non-CRDT value is never carved out — it
2168 // stays conflictable — so it poses no carve-out soundness risk and is left
2169 // to the existing merge/parse path. This is the declared-property half of
2170 // the layered fix; the commit-time check covers undeclared CRDT-shaped values.
2171 Self::enforce_crdt_variants(props_meta, properties)?;
2172
2173 let ctx = self.get_query_context(tx_l0).await;
2174 for key in crdt_keys {
2175 let existing = pm.get_vertex_prop_with_ctx(vid, &key, ctx.as_ref()).await?;
2176 if !existing.is_null()
2177 && let Some(val) = properties.get_mut(&key)
2178 {
2179 *val = pm.merge_crdt_values(&existing, val)?;
2180 }
2181 }
2182
2183 Ok(())
2184 }
2185
2186 async fn prepare_edge_upsert(
2187 &self,
2188 eid: Eid,
2189 properties: &mut Properties,
2190 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2191 ) -> Result<()> {
2192 if let Some(pm) = &self.property_manager {
2193 let schema = self.schema_manager.schema();
2194 // Get edge type from L0 buffer instead of from EID
2195 let type_name = self.get_edge_type_from_l0(eid);
2196
2197 if let Some(ref t_name) = type_name
2198 && let Some(props_meta) = schema.properties.get(t_name)
2199 {
2200 let mut crdt_keys = Vec::new();
2201 for (key, _) in properties.iter() {
2202 if let Some(meta) = props_meta.get(key)
2203 && matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2204 {
2205 crdt_keys.push(key.clone());
2206 }
2207 }
2208
2209 if !crdt_keys.is_empty() {
2210 let ctx = self.get_query_context(tx_l0).await;
2211 for key in crdt_keys {
2212 let existing = pm.get_edge_prop(eid, &key, ctx.as_ref()).await?;
2213
2214 if !existing.is_null()
2215 && let Some(val) = properties.get_mut(&key)
2216 {
2217 *val = pm.merge_crdt_values(&existing, val)?;
2218 }
2219 }
2220 }
2221 }
2222 }
2223 Ok(())
2224 }
2225
2226 #[instrument(skip(self, properties), level = "trace")]
2227 pub async fn insert_vertex(
2228 &self,
2229 vid: Vid,
2230 properties: Properties,
2231 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2232 ) -> Result<()> {
2233 self.insert_vertex_with_labels(vid, properties, &[], tx_l0)
2234 .await?;
2235 Ok(())
2236 }
2237
2238 /// Component C1 (G4): before a non-transactional mutation merges into main
2239 /// L0, if an outstanding snapshot pins the current generation, freeze it
2240 /// aside so snapshots taken *before* this write stay isolated from it.
2241 ///
2242 /// `flush_lock` (acquired and released here) serializes the freeze against
2243 /// concurrent commit-time freezes/merges, matching the atomicity the tx
2244 /// commit path gets. No-op for transactional writes (their freeze happens at
2245 /// commit) and — the common case — when nothing is pinned, where it costs one
2246 /// atomic load. Freezes at most once per pinned generation: the freeze
2247 /// installs a fresh unpinned `current`, so later writes in the same bulk
2248 /// import see no pin and merge in place, and the snapshot keeps reading the
2249 /// frozen pre-import buffer.
2250 async fn freeze_for_non_tx_write_if_pinned(&self, tx_l0: Option<&Arc<RwLock<L0Buffer>>>) {
2251 // Self-gates on the runtime SSI toggle: nothing pins a snapshot unless a
2252 // transaction began under `ssi_enabled`, so `is_current_pinned()` is
2253 // always false (one atomic load) when SSI is off.
2254 if tx_l0.is_none() && self.l0_manager.is_current_pinned() {
2255 let _flush_lock_guard = self.flush_lock.lock().await;
2256 // Re-check under the lock: a concurrent commit may have frozen first.
2257 if self.l0_manager.is_current_pinned() {
2258 self.l0_manager.freeze_current_for_snapshot();
2259 metrics::counter!("uni_l0_snapshot_freezes_total").increment(1);
2260 }
2261 }
2262 }
2263
2264 #[instrument(skip(self, properties, labels), level = "trace")]
2265 pub async fn insert_vertex_with_labels(
2266 &self,
2267 vid: Vid,
2268 mut properties: Properties,
2269 labels: &[String],
2270 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2271 ) -> Result<Properties> {
2272 let start = std::time::Instant::now();
2273 self.check_write_pressure().await?;
2274 self.check_transaction_memory(tx_l0)?;
2275
2276 // Component C1 (G4): a non-transactional write (`tx_l0 == None`, e.g. bulk
2277 // import / LOAD CSV) mutates main L0 directly, outside the commit-time
2278 // snapshot freeze. Freeze the pinned generation aside first so snapshots
2279 // taken before this write stay isolated from it.
2280 self.freeze_for_non_tx_write_if_pinned(tx_l0).await;
2281
2282 if !self.try_defer_embedding(labels, &properties, vid, tx_l0) {
2283 self.process_embeddings_for_labels(labels, &mut properties)
2284 .await?;
2285 }
2286 self.validate_vertex_constraints(vid, &properties, labels, tx_l0)
2287 .await?;
2288 self.prepare_vertex_upsert(
2289 vid,
2290 &mut properties,
2291 labels.first().map(|s| s.as_str()),
2292 tx_l0,
2293 )
2294 .await?;
2295
2296 // Clone properties and labels before moving into L0 to return them and populate constraint index
2297 let properties_copy = properties.clone();
2298 let labels_copy = labels.to_vec();
2299
2300 {
2301 // For a non-tx write, re-resolve the live `current` buffer and hold
2302 // `flush_lock` across the (synchronous) write so a concurrent flush
2303 // rotate (which takes `flush_lock` via `begin_flush`) cannot install
2304 // a fresh `current` between resolve and write, dropping our write
2305 // (Bug #4). For a tx write `resolve_l0` returns the tx-private
2306 // buffer, never the rotating `current`, so no `flush_lock` is needed
2307 // (and taking it would risk re-entrancy with the commit path).
2308 let _flush_lock_guard = if tx_l0.is_none() {
2309 Some(self.flush_lock.lock().await)
2310 } else {
2311 None
2312 };
2313 let l0 = self.resolve_l0(tx_l0);
2314 let mut l0_guard = l0.write();
2315 // Generation chaining: a non-tx CRDT write into the (post-freeze,
2316 // possibly empty) current buffer must merge against the chained
2317 // committed state, not shadow it. No-op when the chain is empty
2318 // or this is a tx-private write.
2319 if tx_l0.is_none() {
2320 let pending = self.l0_manager.get_pending_flush();
2321 if !pending.is_empty() {
2322 Self::seed_crdt_props(&pending, vid, &properties, &mut l0_guard);
2323 }
2324 }
2325 l0_guard.insert_vertex_with_labels(vid, properties, labels);
2326
2327 // Populate constraint index for O(1) duplicate detection
2328 let schema = self.schema_manager.schema();
2329 for label in &labels_copy {
2330 if schema.get_label_case_insensitive(label).is_none() {
2331 if self.config.strict_schema {
2332 return Err(anyhow::anyhow!(
2333 "Label '{}' is not defined in the schema \
2334 (strict_schema is enabled).",
2335 label
2336 ));
2337 }
2338 continue; // Schemaless: skip unknown labels.
2339 }
2340
2341 // For each unique constraint on this label, insert into constraint index
2342 for constraint in &schema.constraints {
2343 if !constraint.enabled {
2344 continue;
2345 }
2346 if let ConstraintTarget::Label(l) = &constraint.target {
2347 if l != label {
2348 continue;
2349 }
2350 } else {
2351 continue;
2352 }
2353
2354 if let ConstraintType::Unique {
2355 properties: unique_props,
2356 } = &constraint.constraint_type
2357 {
2358 let mut key_values = Vec::new();
2359 let mut all_present = true;
2360 for prop in unique_props {
2361 if let Some(val) = properties_copy.get(prop) {
2362 key_values.push((prop.clone(), val.clone()));
2363 } else {
2364 all_present = false;
2365 break;
2366 }
2367 }
2368
2369 if all_present {
2370 let key = serialize_constraint_key(label, &key_values);
2371 l0_guard.insert_constraint_key(key, vid);
2372 }
2373 }
2374 }
2375 }
2376 }
2377
2378 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2379 self.update_metrics();
2380
2381 if tx_l0.is_none() {
2382 self.check_flush().await?;
2383 }
2384 if start.elapsed().as_millis() > 100 {
2385 log::warn!("Slow insert_vertex: {}ms", start.elapsed().as_millis());
2386 }
2387 Ok(properties_copy)
2388 }
2389
2390 /// True iff routing this partial write through MergeInsert would
2391 /// miss a constraint check. Specifically: a multi-key UNIQUE
2392 /// constraint where the touched-set doesn't cover all member keys
2393 /// requires the unchanged keys from the existing row to compute
2394 /// the composite. Conservative: also returns true if any touched
2395 /// key is `ext_id` (uniqueness checked globally — handled in the
2396 /// full-row path).
2397 fn touched_needs_full_read(&self, touched: &Properties, labels: &[String]) -> bool {
2398 if touched.contains_key("ext_id") {
2399 return true;
2400 }
2401 let schema = self.schema_manager.schema();
2402 for label in labels {
2403 if schema.get_label_case_insensitive(label).is_none() {
2404 continue;
2405 }
2406 for constraint in &schema.constraints {
2407 if !constraint.enabled {
2408 continue;
2409 }
2410 if let ConstraintTarget::Label(l) = &constraint.target {
2411 if !l.eq_ignore_ascii_case(label) {
2412 continue;
2413 }
2414 } else {
2415 continue;
2416 }
2417 if let ConstraintType::Unique {
2418 properties: unique_props,
2419 } = &constraint.constraint_type
2420 {
2421 if unique_props.len() < 2 {
2422 continue; // single-key UNIQUE — partial path sees the key
2423 }
2424 if unique_props.iter().any(|p| touched.contains_key(p)) {
2425 return true;
2426 }
2427 }
2428 }
2429 }
2430 false
2431 }
2432
2433 /// Insert a vertex's FULL property row plus a touched-keys hint so
2434 /// the flush emits ONLY those columns via Lance MergeInsert.
2435 ///
2436 /// Caller must have read the full row (via PropertyManager) and
2437 /// applied SET-touched values on top before calling — same input
2438 /// shape as `insert_vertex_with_labels`. The new arg `touched_keys`
2439 /// is the set of property keys this SET statement actually
2440 /// assigned; L0 records it in `vertex_partial_keys[vid]` and the
2441 /// flush filters the MergeInsert source schema down to those keys.
2442 /// When `UniConfig::partial_lance_writes == false`, falls through
2443 /// to `insert_vertex_with_labels` (Append) — preserving bit-for-bit
2444 /// equivalence with prior releases.
2445 #[instrument(skip(self, props, touched_keys, labels), level = "trace")]
2446 pub async fn insert_vertex_partial_full(
2447 &self,
2448 vid: Vid,
2449 mut props: Properties,
2450 touched_keys: HashSet<String>,
2451 labels: &[String],
2452 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2453 ) -> Result<()> {
2454 if !self.config.partial_lance_writes
2455 || self.touched_needs_full_read(&props_subset(&props, &touched_keys), labels)
2456 {
2457 self.insert_vertex_with_labels(vid, props, labels, tx_l0)
2458 .await?;
2459 return Ok(());
2460 }
2461
2462 self.check_write_pressure().await?;
2463 self.check_transaction_memory(tx_l0)?;
2464 if !self.try_defer_embedding(labels, &props, vid, tx_l0) {
2465 self.process_embeddings_for_labels(labels, &mut props)
2466 .await?;
2467 }
2468 // Full-row validation runs because we have the complete map;
2469 // no need for the partial-only validator.
2470 self.validate_vertex_constraints(vid, &props, labels, tx_l0)
2471 .await?;
2472 {
2473 let l0 = self.resolve_l0(tx_l0);
2474 let mut l0_guard = l0.write();
2475 l0_guard.insert_vertex_partial_full(vid, props, touched_keys, labels);
2476 }
2477 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2478 metrics::counter!("uni_partial_writes_total").increment(1);
2479 self.update_metrics();
2480 if tx_l0.is_none() {
2481 self.check_flush().await?;
2482 }
2483 Ok(())
2484 }
2485
2486 /// Insert a vertex's *partial* property set without first reading the
2487 /// full row.
2488 ///
2489 /// When `WriterConfig::partial_lance_writes` is `true`, the touched
2490 /// keys flow into `L0Buffer::vertex_partial_keys` so the next flush
2491 /// emits them via Lance `MergeInsertBuilder` against a subset-of-
2492 /// schema source — preserving untouched columns (e.g., embeddings)
2493 /// byte-equal in Lance with no read at the caller and no write of
2494 /// those columns.
2495 ///
2496 /// When the flag is `false`, this falls back to the existing
2497 /// `insert_vertex_with_labels` path after merging `touched` with
2498 /// the current properties from L0/storage. The caller can therefore
2499 /// use this entry point unconditionally; the optimization activates
2500 /// only when the flag is on.
2501 #[instrument(skip(self, touched, labels), level = "trace")]
2502 pub async fn insert_vertex_partial(
2503 &self,
2504 vid: Vid,
2505 touched: Properties,
2506 labels: &[String],
2507 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2508 ) -> Result<()> {
2509 let needs_full_read =
2510 !self.config.partial_lance_writes || self.touched_needs_full_read(&touched, labels);
2511 if needs_full_read {
2512 // Flag-off fallback (or constraint-driven fallback): merge
2513 // `touched` with the current full property snapshot from
2514 // L0/storage and route through the existing path. Preserves
2515 // bit-for-bit equivalence with the pre-Round-11 release.
2516 let existing = if let Some(pm) = &self.property_manager {
2517 pm.get_all_vertex_props_with_ctx(vid, None)
2518 .await
2519 .unwrap_or_default()
2520 .unwrap_or_default()
2521 } else {
2522 Properties::new()
2523 };
2524 let mut merged = existing;
2525 for (k, v) in touched {
2526 merged.insert(k, v);
2527 }
2528 self.insert_vertex_with_labels(vid, merged, labels, tx_l0)
2529 .await?;
2530 return Ok(());
2531 }
2532
2533 // Flag-on fast path: stage the partial update directly. Pressure
2534 // checks, embedding generation, constraint validation all still
2535 // run — but the validator is the partial-aware variant that
2536 // skips NOT NULL / multi-key UNIQUE / CHECK / EXISTS for
2537 // properties not present in `touched`. Multi-key UNIQUE that
2538 // overlaps the touched set forces a fallback above via
2539 // `touched_needs_full_read`.
2540 let mut touched = touched;
2541 self.check_write_pressure().await?;
2542 self.check_transaction_memory(tx_l0)?;
2543 if !self.try_defer_embedding(labels, &touched, vid, tx_l0) {
2544 self.process_embeddings_for_labels(labels, &mut touched)
2545 .await?;
2546 }
2547 self.validate_vertex_constraints_partial(vid, &touched, labels, tx_l0)
2548 .await?;
2549
2550 {
2551 let l0 = self.resolve_l0(tx_l0);
2552 let mut l0_guard = l0.write();
2553 l0_guard.insert_vertex_partial(vid, touched, labels);
2554 }
2555
2556 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2557 metrics::counter!("uni_partial_writes_total").increment(1);
2558 self.update_metrics();
2559 if tx_l0.is_none() {
2560 self.check_flush().await?;
2561 }
2562 Ok(())
2563 }
2564
2565 /// Insert multiple vertices with batched operations.
2566 ///
2567 /// This method uses batched operations to achieve O(N) complexity instead of O(N²)
2568 /// for bulk inserts with unique constraints.
2569 ///
2570 /// # Performance Improvements
2571 /// - Batch VID allocation: 1 call instead of N calls
2572 /// - Batch constraint validation: O(N) instead of O(N²)
2573 /// - Batch embedding generation: 1 API call per config instead of N calls
2574 /// - Transaction wrapping: Automatic flush deferral, atomicity
2575 ///
2576 /// # Arguments
2577 /// * `vids` - Pre-allocated VIDs for the vertices
2578 /// * `properties_batch` - Properties for each vertex
2579 /// * `labels` - Labels for all vertices (assumes single label for simplicity)
2580 ///
2581 /// # Errors
2582 /// Returns error if:
2583 /// - VID/properties length mismatch
2584 /// - Constraint violation detected
2585 /// - Embedding generation fails
2586 /// - Transaction commit fails
2587 ///
2588 /// # Atomicity
2589 /// If this method fails, all changes are rolled back (if transaction was started here).
2590 pub async fn insert_vertices_batch(
2591 &self,
2592 vids: Vec<Vid>,
2593 mut properties_batch: Vec<Properties>,
2594 labels: Vec<String>,
2595 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2596 ) -> Result<Vec<Properties>> {
2597 let start = std::time::Instant::now();
2598
2599 // Validate inputs
2600 if vids.len() != properties_batch.len() {
2601 return Err(anyhow!(
2602 "VID/properties size mismatch: {} vids, {} properties",
2603 vids.len(),
2604 properties_batch.len()
2605 ));
2606 }
2607
2608 if vids.is_empty() {
2609 return Ok(Vec::new());
2610 }
2611
2612 // Batch operations — writes go directly to the resolved L0.
2613 // Atomicity is guaranteed by the caller holding the writer lock.
2614 let result = async {
2615 self.check_write_pressure().await?;
2616 self.check_transaction_memory(tx_l0)?;
2617
2618 // Component C1 (G4): batch bulk-import is the canonical non-tx write —
2619 // freeze the pinned generation aside before merging so snapshot
2620 // readers stay isolated. No-op when unpinned or transactional.
2621 self.freeze_for_non_tx_write_if_pinned(tx_l0).await;
2622
2623 // Batch embedding generation (1 API call per config)
2624 self.process_embeddings_for_batch(&labels, &mut properties_batch)
2625 .await?;
2626
2627 // Batch constraint validation (O(N) instead of O(N²))
2628 let label = labels
2629 .first()
2630 .ok_or_else(|| anyhow!("No labels provided"))?;
2631 self.validate_vertex_batch_constraints(&vids, &properties_batch, label, tx_l0)
2632 .await?;
2633
2634 // Batch prepare (CRDT merging if needed)
2635 // Check schema once: skip entirely if no CRDT properties for this label.
2636 // For new vertices (freshly allocated VIDs), there are no existing CRDT
2637 // values to merge, so the per-vertex lookup is unnecessary in that case.
2638 let has_crdt_fields = {
2639 let schema = self.schema_manager.schema();
2640 schema
2641 .properties
2642 .get(label.as_str())
2643 .is_some_and(|props_meta| {
2644 props_meta.values().any(|meta| {
2645 matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2646 })
2647 })
2648 };
2649
2650 if has_crdt_fields {
2651 // Layer-1 variant enforcement (G3): the batch path must reject a
2652 // declared-CRDT variant mismatch exactly as the single-vertex
2653 // `prepare_vertex_upsert` does. Without this, a wrong-variant CRDT
2654 // written via batch import slips past write-time validation and
2655 // the OCC carve-out then masks the overwrite as a lost update.
2656 {
2657 let schema = self.schema_manager.schema();
2658 if let Some(props_meta) = schema.properties.get(label.as_str()) {
2659 for props in &properties_batch {
2660 Self::enforce_crdt_variants(props_meta, props)?;
2661 }
2662 }
2663 }
2664
2665 // Batch fetch existing CRDT values: collect VIDs that need merging,
2666 // then query once via PropertyManager instead of per-vertex lookups.
2667 let schema = self.schema_manager.schema();
2668 let crdt_keys: Vec<String> = schema
2669 .properties
2670 .get(label.as_str())
2671 .map(|props_meta| {
2672 props_meta
2673 .iter()
2674 .filter(|(_, meta)| {
2675 matches!(meta.r#type, uni_common::core::schema::DataType::Crdt(_))
2676 })
2677 .map(|(key, _)| key.clone())
2678 .collect()
2679 })
2680 .unwrap_or_default();
2681
2682 if let Some(pm) = &self.property_manager {
2683 let ctx = self.get_query_context(tx_l0).await;
2684 for (vid, props) in vids.iter().zip(&mut properties_batch) {
2685 for key in &crdt_keys {
2686 if props.contains_key(key) {
2687 let existing =
2688 pm.get_vertex_prop_with_ctx(*vid, key, ctx.as_ref()).await?;
2689 if !existing.is_null()
2690 && let Some(val) = props.get_mut(key)
2691 {
2692 *val = pm.merge_crdt_values(&existing, val)?;
2693 }
2694 }
2695 }
2696 }
2697 }
2698 }
2699
2700 // Batch L0 writes — route to active L0 (transaction L0 if active, else current).
2701 let target_l0 = self.resolve_l0(tx_l0);
2702
2703 let properties_result = properties_batch.clone();
2704 {
2705 let mut l0_guard = target_l0.write();
2706 for (vid, props) in vids.iter().zip(properties_batch.iter()) {
2707 l0_guard.insert_vertex_with_labels(*vid, props.clone(), &labels);
2708 }
2709 }
2710
2711 // Update metrics (batch increment)
2712 metrics::counter!("uni_l0_buffer_mutations_total").increment(vids.len() as u64);
2713 self.update_metrics();
2714
2715 Ok::<Vec<Properties>, anyhow::Error>(properties_result)
2716 }
2717 .await;
2718
2719 let props = result?;
2720
2721 if start.elapsed().as_millis() > 100 {
2722 log::warn!(
2723 "Slow insert_vertices_batch ({} vertices): {}ms",
2724 vids.len(),
2725 start.elapsed().as_millis()
2726 );
2727 }
2728
2729 Ok(props)
2730 }
2731
2732 /// Delete a vertex by VID.
2733 ///
2734 /// When `labels` is provided, uses them directly to populate L0 for
2735 /// correct tombstone flushing. Otherwise discovers labels from L0
2736 /// buffers and storage (which can be slow for many vertices).
2737 ///
2738 /// # Errors
2739 ///
2740 /// Returns an error if write pressure stalls, label lookup fails, or
2741 /// the L0 delete operation fails.
2742 #[instrument(skip(self, labels), level = "trace")]
2743 pub async fn delete_vertex(
2744 &self,
2745 vid: Vid,
2746 labels: Option<Vec<String>>,
2747 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2748 ) -> Result<()> {
2749 let start = std::time::Instant::now();
2750 self.check_write_pressure().await?;
2751 self.check_transaction_memory(tx_l0)?;
2752 self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
2753
2754 // Before deleting, ensure we have the vertex's labels stored in L0 so
2755 // the tombstone can be flushed to the correct label datasets. Discover
2756 // them up front (this may await storage) WITHOUT pinning the buffer we
2757 // will eventually mutate — for non-tx writes the live `current` buffer
2758 // is re-resolved below under `flush_lock`, so a concurrent rotate can't
2759 // drop our write (Bug #4). `resolve_l0` here is only used for cheap
2760 // reads that tolerate a racing rotate.
2761 let has_labels = {
2762 let l0_guard = self.resolve_l0(tx_l0);
2763 let guard = l0_guard.read();
2764 guard.vertex_labels.contains_key(&vid)
2765 };
2766
2767 let backfill_labels = if has_labels {
2768 None
2769 } else if let Some(provided) = labels {
2770 // Caller provided labels — skip the lookup entirely
2771 Some(provided)
2772 } else {
2773 // Discover labels from pending flush L0s, then storage
2774 let mut found = None;
2775 for pending_l0 in self.l0_manager.get_pending_flush() {
2776 let pending_guard = pending_l0.read();
2777 if let Some(l) = pending_guard.get_vertex_labels(vid) {
2778 found = Some(l.to_vec());
2779 break;
2780 }
2781 }
2782 if found.is_none() {
2783 found = self.find_vertex_labels_in_storage(vid).await?;
2784 }
2785 found
2786 };
2787
2788 // Test-only seam (no-op without the `failpoints` feature): pause a
2789 // non-transactional delete AFTER the awaited label discovery but BEFORE
2790 // it re-resolves the live buffer and writes the tombstone. A concurrent
2791 // flush can rotate+complete a buffer in this window; the fix re-resolves
2792 // `get_current()` and mutates it under `flush_lock`, so the tombstone
2793 // always lands in the live buffer (Bug #4 — silent lost delete across
2794 // L0 rotation).
2795 fail::fail_point!("nontx::after-capture");
2796
2797 // Apply the label backfill and the tombstone together. For a non-tx
2798 // write, hold `flush_lock` across the (synchronous) re-resolve + write
2799 // so a concurrent flush rotate (which takes `flush_lock` via
2800 // `begin_flush`) cannot install a fresh `current` between our resolve
2801 // and our write. For a tx write `resolve_l0` returns the tx-private
2802 // buffer (never the rotating `current`), so no `flush_lock` is needed
2803 // — and taking it there would risk re-entrancy with the commit path.
2804 if tx_l0.is_none() {
2805 let _flush_lock_guard = self.flush_lock.lock().await;
2806 let l0 = self.l0_manager.get_current();
2807 let mut guard = l0.write();
2808 if let Some(found_labels) = backfill_labels {
2809 guard.vertex_labels.insert(vid, found_labels);
2810 }
2811 guard.delete_vertex(vid)?;
2812 } else {
2813 let l0 = self.resolve_l0(tx_l0);
2814 let mut guard = l0.write();
2815 if let Some(found_labels) = backfill_labels {
2816 guard.vertex_labels.insert(vid, found_labels);
2817 }
2818 guard.delete_vertex(vid)?;
2819 }
2820 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2821 self.update_metrics();
2822
2823 if tx_l0.is_none() {
2824 self.check_flush().await?;
2825 }
2826 if start.elapsed().as_millis() > 100 {
2827 log::warn!("Slow delete_vertex: {}ms", start.elapsed().as_millis());
2828 }
2829 Ok(())
2830 }
2831
2832 /// Find vertex labels from storage by querying the main vertices table.
2833 /// Returns the labels from the latest non-deleted version of the vertex.
2834 async fn find_vertex_labels_in_storage(&self, vid: Vid) -> Result<Option<Vec<String>>> {
2835 use crate::backend::types::ScanRequest;
2836 use arrow_array::Array;
2837 use arrow_array::cast::AsArray;
2838
2839 let backend = self.storage.backend();
2840 let table_name = MainVertexDataset::table_name();
2841
2842 // Check if table exists first; if not, vertex hasn't been flushed to storage yet
2843 if !backend.table_exists(table_name).await? {
2844 return Ok(None);
2845 }
2846
2847 // Query for this specific vid (don't filter by _deleted yet - we need to find the latest version first)
2848 let filter = format!("_vid = {}", vid.as_u64());
2849 let batches = backend
2850 .scan(
2851 ScanRequest::all(table_name)
2852 .with_filter(filter)
2853 .with_columns(vec![
2854 "_vid".to_string(),
2855 "labels".to_string(),
2856 "_version".to_string(),
2857 "_deleted".to_string(),
2858 ]),
2859 )
2860 .await
2861 .unwrap_or_default();
2862
2863 // Find the row with the highest version number
2864 let mut max_version: Option<u64> = None;
2865 let mut labels: Option<Vec<String>> = None;
2866 let mut is_deleted = false;
2867
2868 for batch in batches {
2869 if batch.num_rows() == 0 {
2870 continue;
2871 }
2872
2873 let version_array = batch
2874 .column_by_name("_version")
2875 .unwrap()
2876 .as_primitive::<arrow_array::types::UInt64Type>();
2877
2878 let deleted_array = batch.column_by_name("_deleted").unwrap().as_boolean();
2879
2880 let labels_array = batch.column_by_name("labels").unwrap().as_list::<i32>();
2881
2882 for row_idx in 0..batch.num_rows() {
2883 let version = version_array.value(row_idx);
2884
2885 if max_version.is_none_or(|mv| version > mv) {
2886 is_deleted = deleted_array.value(row_idx);
2887
2888 let labels_list = labels_array.value(row_idx);
2889 let string_array = labels_list.as_string::<i32>();
2890 let vertex_labels: Vec<String> = (0..string_array.len())
2891 .filter(|&i| !string_array.is_null(i))
2892 .map(|i| string_array.value(i).to_string())
2893 .collect();
2894
2895 max_version = Some(version);
2896 labels = Some(vertex_labels);
2897 }
2898 }
2899 }
2900
2901 // If the latest version is deleted, return None
2902 if is_deleted { Ok(None) } else { Ok(labels) }
2903 }
2904
2905 #[expect(clippy::too_many_arguments)]
2906 #[instrument(skip(self, props, touched_keys), level = "trace")]
2907 pub async fn insert_edge_partial_full(
2908 &self,
2909 src_vid: Vid,
2910 dst_vid: Vid,
2911 edge_type: u32,
2912 eid: Eid,
2913 props: Properties,
2914 edge_type_name: Option<String>,
2915 touched_keys: HashSet<String>,
2916 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2917 ) -> Result<()> {
2918 self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
2919 if !self.config.partial_lance_writes {
2920 return self
2921 .insert_edge(
2922 src_vid,
2923 dst_vid,
2924 edge_type,
2925 eid,
2926 props,
2927 edge_type_name,
2928 tx_l0,
2929 )
2930 .await;
2931 }
2932
2933 let start = std::time::Instant::now();
2934 self.check_write_pressure().await?;
2935 self.check_transaction_memory(tx_l0)?;
2936 let mut props = props;
2937 self.prepare_edge_upsert(eid, &mut props, tx_l0).await?;
2938
2939 let l0 = self.resolve_l0(tx_l0);
2940 l0.write().insert_edge_partial_full(
2941 src_vid,
2942 dst_vid,
2943 edge_type,
2944 eid,
2945 props,
2946 edge_type_name,
2947 touched_keys,
2948 )?;
2949
2950 if tx_l0.is_none() {
2951 let version = l0.read().current_version;
2952 self.adjacency_manager
2953 .insert_edge(src_vid, dst_vid, eid, edge_type, version);
2954 }
2955
2956 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
2957 metrics::counter!("uni_partial_writes_total").increment(1);
2958 self.update_metrics();
2959 if tx_l0.is_none() {
2960 self.check_flush().await?;
2961 }
2962 if start.elapsed().as_millis() > 100 {
2963 log::warn!(
2964 "Slow insert_edge_partial_full: {}ms",
2965 start.elapsed().as_millis()
2966 );
2967 }
2968 Ok(())
2969 }
2970
2971 #[expect(clippy::too_many_arguments)]
2972 pub async fn insert_edge(
2973 &self,
2974 src_vid: Vid,
2975 dst_vid: Vid,
2976 edge_type: u32,
2977 eid: Eid,
2978 mut properties: Properties,
2979 edge_type_name: Option<String>,
2980 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
2981 ) -> Result<()> {
2982 let start = std::time::Instant::now();
2983 self.check_write_pressure().await?;
2984 self.check_transaction_memory(tx_l0)?;
2985 self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
2986 self.prepare_edge_upsert(eid, &mut properties, tx_l0)
2987 .await?;
2988
2989 let l0 = self.resolve_l0(tx_l0);
2990 l0.write()
2991 .insert_edge(src_vid, dst_vid, edge_type, eid, properties, edge_type_name)?;
2992
2993 // Dual-write to AdjacencyManager overlay (survives flush).
2994 // Skip for transaction-local L0 -- transaction edges are overlaid separately.
2995 if tx_l0.is_none() {
2996 let version = l0.read().current_version;
2997 self.adjacency_manager
2998 .insert_edge(src_vid, dst_vid, eid, edge_type, version);
2999 }
3000
3001 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
3002 self.update_metrics();
3003
3004 if tx_l0.is_none() {
3005 self.check_flush().await?;
3006 }
3007 if start.elapsed().as_millis() > 100 {
3008 log::warn!("Slow insert_edge: {}ms", start.elapsed().as_millis());
3009 }
3010 Ok(())
3011 }
3012
3013 #[instrument(skip(self), level = "trace")]
3014 pub async fn delete_edge(
3015 &self,
3016 eid: Eid,
3017 src_vid: Vid,
3018 dst_vid: Vid,
3019 edge_type: u32,
3020 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
3021 ) -> Result<()> {
3022 let start = std::time::Instant::now();
3023 self.check_write_pressure().await?;
3024 self.check_transaction_memory(tx_l0)?;
3025 self.freeze_for_non_tx_write_if_pinned(tx_l0).await; // C1 (G4)
3026 let l0 = self.resolve_l0(tx_l0);
3027
3028 l0.write().delete_edge(eid, src_vid, dst_vid, edge_type)?;
3029
3030 // Dual-write tombstone to AdjacencyManager overlay.
3031 if tx_l0.is_none() {
3032 let version = l0.read().current_version;
3033 self.adjacency_manager
3034 .add_tombstone(eid, src_vid, dst_vid, edge_type, version);
3035 }
3036 metrics::counter!("uni_l0_buffer_mutations_total").increment(1);
3037 self.update_metrics();
3038
3039 if tx_l0.is_none() {
3040 self.check_flush().await?;
3041 }
3042 if start.elapsed().as_millis() > 100 {
3043 log::warn!("Slow delete_edge: {}ms", start.elapsed().as_millis());
3044 }
3045 Ok(())
3046 }
3047
3048 /// Decide whether a flush should be triggered based on mutation count
3049 /// or elapsed time since the last flush.
3050 ///
3051 /// Extracted from [`Writer::check_flush`] so `commit_transaction_l0` can
3052 /// reuse the decision while bypassing the lock-acquiring entry point
3053 /// (it already holds `flush_lock`).
3054 fn should_flush(&self) -> bool {
3055 let count = self.l0_manager.get_current().read().mutation_count;
3056 if count == 0 {
3057 return false;
3058 }
3059 if count >= self.config.auto_flush_threshold {
3060 return true;
3061 }
3062 if let Some(interval) = self.config.auto_flush_interval
3063 && self.last_flush_time.lock().elapsed() >= interval
3064 && count >= self.config.auto_flush_min_mutations
3065 {
3066 return true;
3067 }
3068 false
3069 }
3070
3071 /// Check if flush should be triggered based on mutation count or time elapsed.
3072 /// This method is called after each write operation and can also be called
3073 /// by a background task for time-based flushing.
3074 pub async fn check_flush(&self) -> Result<()> {
3075 if self.should_flush() {
3076 self.flush_to_l1(None).await?;
3077 }
3078 Ok(())
3079 }
3080
3081 /// Process embeddings for a vertex using labels passed directly.
3082 /// Use this when labels haven't been stored to L0 yet.
3083 async fn process_embeddings_for_labels(
3084 &self,
3085 labels: &[String],
3086 properties: &mut Properties,
3087 ) -> Result<()> {
3088 let label_name = labels.first().map(|s| s.as_str());
3089 self.process_embeddings_impl(label_name, properties).await
3090 }
3091
3092 /// Phase B: if `defer_embeddings` is enabled in `UniConfig` and the
3093 /// vertex has an embedding config that hasn't been satisfied by the
3094 /// caller-provided properties, enqueue the VID in
3095 /// `L0Buffer::pending_embeddings` and return `true`. The caller then
3096 /// skips `process_embeddings_for_labels` and the embedding is computed
3097 /// in a single batched call at flush time via
3098 /// `drain_pending_embeddings`.
3099 ///
3100 /// Returns `false` (caller falls back to today's per-row eager embed)
3101 /// if any of:
3102 /// - the flag is off,
3103 /// - no label has an embedding config,
3104 /// - the user already provided the target property (matches the
3105 /// existing skip-if-present semantics at writer.rs:2727).
3106 ///
3107 /// Trade-off: when deferral is active, in-tx reads of the embedding
3108 /// column return only what was already in storage (or nothing for
3109 /// brand-new vertices). Existing tests that RETURN n.embedding in
3110 /// the same tx as a SET on the source column must run with the flag
3111 /// off; opt in only when no such reads happen between write and
3112 /// commit.
3113 fn try_defer_embedding(
3114 &self,
3115 labels: &[String],
3116 properties: &Properties,
3117 vid: Vid,
3118 tx_l0: Option<&Arc<RwLock<L0Buffer>>>,
3119 ) -> bool {
3120 if !self.config.defer_embeddings {
3121 return false;
3122 }
3123 let Some(label) = labels.first() else {
3124 return false;
3125 };
3126
3127 let schema = self.schema_manager.schema();
3128 let mut has_unsatisfied_cfg = false;
3129 for idx in &schema.indexes {
3130 if let IndexDefinition::Vector(v_cfg) = idx
3131 && v_cfg.label == *label
3132 && v_cfg.embedding_config.is_some()
3133 && !properties.contains_key(&v_cfg.property)
3134 {
3135 has_unsatisfied_cfg = true;
3136 break;
3137 }
3138 }
3139 if !has_unsatisfied_cfg {
3140 return false;
3141 }
3142
3143 let l0 = self.resolve_l0(tx_l0);
3144 let mut guard = l0.write();
3145 guard.pending_embeddings.insert(vid, label.clone());
3146 true
3147 }
3148
3149 /// Drain `pending_embeddings` from the rotated old-L0 right before
3150 /// `flush_stream_l1` reads it. Groups by label, issues one batched
3151 /// `process_embeddings_for_batch` call per label, and writes the
3152 /// resulting embedding vectors into each VID's `vertex_properties`
3153 /// map. After this returns, the flush proceeds against an L0 that
3154 /// looks no different from one whose embeddings were generated
3155 /// per-row at insert.
3156 ///
3157 /// Idempotent: a VID whose embedding was already materialized
3158 /// (e.g., by on-demand read paths in a future Phase B revision) is
3159 /// detected via `properties.contains_key(target_prop)` inside
3160 /// `process_embeddings_for_batch` (writer.rs:~2650), so re-running
3161 /// the drain is safe.
3162 async fn drain_pending_embeddings(&self, old_l0_arc: &Arc<RwLock<L0Buffer>>) -> Result<()> {
3163 let by_label: HashMap<String, Vec<Vid>> = {
3164 let guard = old_l0_arc.read();
3165 if guard.pending_embeddings.is_empty() {
3166 return Ok(());
3167 }
3168 let mut m: HashMap<String, Vec<Vid>> = HashMap::new();
3169 for (vid, label) in &guard.pending_embeddings {
3170 m.entry(label.clone()).or_default().push(*vid);
3171 }
3172 m
3173 };
3174
3175 for (label, vids) in by_label {
3176 let mut properties_batch: Vec<Properties> = {
3177 let guard = old_l0_arc.read();
3178 vids.iter()
3179 .map(|vid| {
3180 guard
3181 .vertex_properties
3182 .get(vid)
3183 .cloned()
3184 .unwrap_or_default()
3185 })
3186 .collect()
3187 };
3188
3189 self.process_embeddings_for_batch(std::slice::from_ref(&label), &mut properties_batch)
3190 .await?;
3191
3192 let mut guard = old_l0_arc.write();
3193 for (vid, props) in vids.iter().zip(properties_batch) {
3194 let target = guard.vertex_properties.entry(*vid).or_default();
3195 for (k, v) in props {
3196 target.insert(k, v);
3197 }
3198 guard.pending_embeddings.remove(vid);
3199 }
3200 }
3201 Ok(())
3202 }
3203
3204 /// Process embeddings for a batch of vertices efficiently.
3205 ///
3206 /// Groups vertices by embedding config and makes batched API calls to the
3207 /// embedding service instead of calling once per vertex.
3208 ///
3209 /// # Performance
3210 /// For N vertices with embedding config:
3211 /// - Old approach: N API calls to embedding service
3212 /// - New approach: 1 API call per embedding config (usually 1 total)
3213 async fn process_embeddings_for_batch(
3214 &self,
3215 labels: &[String],
3216 properties_batch: &mut [Properties],
3217 ) -> Result<()> {
3218 let label_name = labels.first().map(|s| s.as_str());
3219 let schema = self.schema_manager.schema();
3220
3221 if let Some(label) = label_name {
3222 // Find vector indexes with embedding config for this label
3223 let mut configs = Vec::new();
3224 for idx in &schema.indexes {
3225 if let IndexDefinition::Vector(v_config) = idx
3226 && v_config.label == label
3227 && let Some(emb_config) = &v_config.embedding_config
3228 {
3229 configs.push((v_config.property.clone(), emb_config.clone()));
3230 }
3231 }
3232
3233 if configs.is_empty() {
3234 return Ok(());
3235 }
3236
3237 for (target_prop, emb_config) in configs {
3238 // Collect input texts from all vertices that need embeddings
3239 let mut input_texts: Vec<String> = Vec::new();
3240 let mut needs_embedding: Vec<usize> = Vec::new();
3241
3242 for (idx, properties) in properties_batch.iter().enumerate() {
3243 // Skip if target property already exists
3244 if properties.contains_key(&target_prop) {
3245 continue;
3246 }
3247
3248 // Check if source properties exist
3249 let mut inputs = Vec::new();
3250 for src_prop in &emb_config.source_properties {
3251 if let Some(val) = properties.get(src_prop)
3252 && let Some(s) = val.as_str()
3253 {
3254 inputs.push(s.to_string());
3255 }
3256 }
3257
3258 if !inputs.is_empty() {
3259 let input_text = inputs.join(" ");
3260 let input_text = match &emb_config.document_prefix {
3261 Some(prefix) => format!("{prefix}{input_text}"),
3262 None => input_text,
3263 };
3264 input_texts.push(input_text);
3265 needs_embedding.push(idx);
3266 }
3267 }
3268
3269 if input_texts.is_empty() {
3270 continue;
3271 }
3272
3273 let runtime = self.xervo_runtime.get().ok_or_else(|| {
3274 anyhow!("Uni-Xervo runtime not configured for auto-embedding")
3275 })?;
3276 let embedder = runtime.embedding(&emb_config.alias).await?;
3277
3278 // Batch generate embeddings (single API call)
3279 let input_refs: Vec<&str> = input_texts.iter().map(|s| s.as_str()).collect();
3280 let embeddings = embedder.embed(input_refs).await?;
3281
3282 // Distribute results back to properties
3283 for (embedding_idx, &prop_idx) in needs_embedding.iter().enumerate() {
3284 if let Some(vec) = embeddings.get(embedding_idx) {
3285 let vals: Vec<Value> =
3286 vec.iter().map(|f| Value::Float(*f as f64)).collect();
3287 properties_batch[prop_idx].insert(target_prop.clone(), Value::List(vals));
3288 }
3289 }
3290 }
3291 }
3292
3293 Ok(())
3294 }
3295
3296 async fn process_embeddings_impl(
3297 &self,
3298 label_name: Option<&str>,
3299 properties: &mut Properties,
3300 ) -> Result<()> {
3301 let schema = self.schema_manager.schema();
3302
3303 if let Some(label) = label_name {
3304 // Find vector indexes with embedding config for this label
3305 let mut configs = Vec::new();
3306 for idx in &schema.indexes {
3307 if let IndexDefinition::Vector(v_config) = idx
3308 && v_config.label == label
3309 && let Some(emb_config) = &v_config.embedding_config
3310 {
3311 configs.push((v_config.property.clone(), emb_config.clone()));
3312 }
3313 }
3314
3315 if configs.is_empty() {
3316 log::info!("No embedding config found for label {}", label);
3317 }
3318
3319 for (target_prop, emb_config) in configs {
3320 // If target property already exists, skip (assume user provided it)
3321 if properties.contains_key(&target_prop) {
3322 continue;
3323 }
3324
3325 // Check if source properties exist
3326 let mut inputs = Vec::new();
3327 for src_prop in &emb_config.source_properties {
3328 if let Some(val) = properties.get(src_prop)
3329 && let Some(s) = val.as_str()
3330 {
3331 inputs.push(s.to_string());
3332 }
3333 }
3334
3335 if inputs.is_empty() {
3336 continue;
3337 }
3338
3339 let input_text = inputs.join(" ");
3340 let input_text = match &emb_config.document_prefix {
3341 Some(prefix) => format!("{prefix}{input_text}"),
3342 None => input_text,
3343 };
3344
3345 let runtime = self.xervo_runtime.get().ok_or_else(|| {
3346 anyhow!("Uni-Xervo runtime not configured for auto-embedding")
3347 })?;
3348 let embedder = runtime.embedding(&emb_config.alias).await?;
3349
3350 // Generate
3351 let embeddings = embedder.embed(vec![input_text.as_str()]).await?;
3352 if let Some(vec) = embeddings.first() {
3353 // Store as array of floats
3354 let vals: Vec<Value> = vec.iter().map(|f| Value::Float(*f as f64)).collect();
3355 properties.insert(target_prop.clone(), Value::List(vals));
3356 }
3357 }
3358 }
3359 Ok(())
3360 }
3361
3362 /// Flushes the current in-memory L0 buffer to L1 storage.
3363 ///
3364 /// # Lock Ordering
3365 ///
3366 /// To prevent deadlocks, locks must be acquired in the following order:
3367 /// 1. `Writer` lock (held by caller via outer `Arc<RwLock<Writer>>`; removed in Phase 4)
3368 /// 2. `flush_lock` (acquired by this entry point; held across the whole flush)
3369 /// 3. `L0Manager` lock (via `begin_flush` / `get_current`)
3370 /// 4. `L0Buffer` lock (individual buffer RWLocks)
3371 /// 5. `Index` / `Storage` locks (during actual flush)
3372 ///
3373 /// Callers that already hold `flush_lock` (today only `commit_transaction_l0`)
3374 /// must call `flush_inline_under_lock` (private) directly to avoid a re-entrant
3375 /// `tokio::sync::Mutex` deadlock — see concurrent_writer.md §5.5.
3376 pub async fn flush_to_l1(&self, name: Option<String>) -> Result<String> {
3377 // Drain any in-flight async flushes first. `flush_to_l1` is a
3378 // SYNCHRONIZATION BARRIER — callers (test fixtures, fork
3379 // setup, shutdown paths) rely on it as "all writes are now
3380 // durably in Lance". Without the drain, an async stream from
3381 // a recent commit might still be writing to Lance when
3382 // `flush_to_l1` returns, leaving a window where forks branch
3383 // off pre-write Lance state and lose data.
3384 if let Some(coord) = self.flush_coordinator.as_ref() {
3385 let _ = coord.drain(self.config.drop_fork_drain_timeout).await;
3386 }
3387 let _flush_lock_guard = self.flush_lock.lock().await;
3388 self.flush_inline_under_lock(name).await
3389 }
3390
3391 /// Async-flush entry point: rotate under `flush_lock`, release the
3392 /// lock, then submit the stream phase to the [`FlushCoordinator`].
3393 /// Returns a [`FlushTicket`](crate::runtime::flush_coordinator::FlushTicket)
3394 /// that resolves when finalize completes.
3395 ///
3396 /// Errors if `config.async_flush_enabled = false` (the coordinator
3397 /// is `None` in that case — see `flush_coordinator` field doc).
3398 pub async fn flush_to_l1_async(
3399 self: &Arc<Self>,
3400 name: Option<String>,
3401 ) -> Result<crate::runtime::flush_coordinator::FlushTicket> {
3402 let coord = self
3403 .flush_coordinator
3404 .as_ref()
3405 .ok_or_else(|| anyhow!("async flush not enabled (config.async_flush_enabled=false)"))?
3406 .clone();
3407 // 1. Acquire permit FIRST (outside flush_lock) so we don't
3408 // introduce a permit-while-holding-flush-lock convoy.
3409 let permit = coord.acquire_permit().await?;
3410 // 2. Rotate under flush_lock (µs work), then allocate the rotate seq
3411 // and bump pending ONLY after the rotate succeeds (Bug #3). A failed
3412 // rotate (the `?` below) must consume neither: the finalizer
3413 // advances in strictly consecutive seq order and only decrements
3414 // pending on finalize, so a leaked seq/pending would wedge it
3415 // forever. The seq is allocated under `flush_lock`, immediately
3416 // after the rotate and before the guard drops, so concurrent rotates
3417 // keep seq order == rotation order, and the seq is unused until
3418 // submit. On the `?` error path the permit drops, freeing the slot.
3419 let (
3420 RotateOutput {
3421 old_l0_arc,
3422 wal_lsn,
3423 current_version,
3424 flush_in_progress_guard,
3425 },
3426 seq,
3427 ) = {
3428 let _flush_lock_guard = self.flush_lock.lock().await;
3429 let rotate_out = self.flush_l0_rotate().await?;
3430 let seq = coord.next_rotate_seq();
3431 coord.note_pending();
3432 (rotate_out, seq)
3433 };
3434 // 3. Build the coordinator's RotatedFlush. parent_manifest is the
3435 // cached_manifest snapshot at this moment.
3436 let parent_manifest = self.cached_manifest.lock().clone();
3437 let rotated = RotatedFlush {
3438 seq,
3439 old_l0_arc: old_l0_arc.clone(),
3440 wal_lsn,
3441 current_version,
3442 name: name.clone(),
3443 parent_manifest,
3444 permit,
3445 flush_in_progress_guard,
3446 };
3447 // 4. Spawn the stream phase via the coordinator. The closure
3448 // captures Arc<Writer> transiently — drops when stream
3449 // completes (bounded, ~50-500 ms).
3450 let writer = self.clone();
3451 let ticket = coord.submit_for_stream(rotated, move |old_l0, wal, ver, n| async move {
3452 let outcome = writer.flush_stream_l1(old_l0, wal, ver, n).await?;
3453 Ok(crate::runtime::flush_coordinator::FlushOutcome {
3454 new_manifest: outcome.manifest,
3455 snapshot_id: outcome.snapshot_id,
3456 })
3457 });
3458 Ok(ticket)
3459 }
3460
3461 /// Phase A+B+C of the flush: flush the WAL, rotate L0 (so the
3462 /// to-be-flushed buffer moves to `pending_flush` and a fresh L0 takes
3463 /// its place), and hand off the WAL to the new L0.
3464 ///
3465 /// Runs in microseconds. Must be called under `flush_lock` (the caller
3466 /// is responsible). The returned [`RotateOutput`] carries everything
3467 /// the subsequent stream + finalize phases need; in particular the
3468 /// [`FlushInProgressGuard`] is bound to the return value so it stays
3469 /// alive for the full flush lifetime — including any future async
3470 /// path where stream runs on a spawned task.
3471 async fn flush_l0_rotate(&self) -> Result<RotateOutput> {
3472 // Acquire the in-progress counter BEFORE any heavy work. The
3473 // guard lives on RotateOutput; dropping RotateOutput drops the
3474 // guard, so the counter goes back to zero exactly when the flush
3475 // is fully done.
3476 let flush_in_progress_guard = FlushInProgressGuard::new(&self.storage);
3477
3478 // A. Flush WAL BEFORE rotating L0. If WAL flush fails, the
3479 // current L0 is still active and mutations are retained in
3480 // memory until restart/retry.
3481 let wal_for_truncate = {
3482 let current_l0 = self.l0_manager.get_current();
3483 let l0_guard = current_l0.read();
3484 l0_guard.wal.clone()
3485 };
3486 // Test-only seam (no-op without the `failpoints` feature): inject a
3487 // WAL-flush failure here to drive the "failed async rotate wedges the
3488 // finalizer" regression (Bug #3). When configured to "return" it makes
3489 // `flush_l0_rotate` return Err exactly as a real WAL-flush failure would.
3490 fail::fail_point!("flush::rotate-fail", |_| {
3491 Err(anyhow!("flush::rotate-fail injected WAL-flush failure"))
3492 });
3493 let wal_lsn = if let Some(ref w) = wal_for_truncate {
3494 w.flush().await?
3495 } else {
3496 0
3497 };
3498
3499 // B. Begin flush: rotate L0 and keep old L0 visible to reads via
3500 // pending_flush until complete_flush is called by finalize.
3501 let old_l0_arc = self.l0_manager.begin_flush(0, None);
3502 metrics::counter!("uni_l0_buffer_rotations_total").increment(1);
3503
3504 // C. WAL handoff: record wal_lsn on old L0, transfer WAL handle
3505 // and current_version to the new L0.
3506 let current_version;
3507 {
3508 let mut old_l0_guard = old_l0_arc.write();
3509 current_version = old_l0_guard.current_version;
3510 old_l0_guard.wal_lsn_at_flush = wal_lsn;
3511 let wal = old_l0_guard.wal.take();
3512 let new_l0_arc = self.l0_manager.get_current();
3513 let mut new_l0_guard = new_l0_arc.write();
3514 new_l0_guard.wal = wal;
3515 new_l0_guard.current_version = current_version;
3516 }
3517
3518 Ok(RotateOutput {
3519 old_l0_arc,
3520 wal_lsn,
3521 current_version,
3522 flush_in_progress_guard,
3523 })
3524 }
3525
3526 /// Phases D, E, F, G of the flush: L1 collect, orphan resolve,
3527 /// manifest seed, Lance writes. Reads from `old_l0_arc` (kept in
3528 /// pending_flush by Phase B); writes append-only Lance datasets; does
3529 /// NOT call save_snapshot / set_latest_snapshot — those are
3530 /// finalize's job, so the manifest doesn't get published until the
3531 /// next phase.
3532 ///
3533 /// Today takes `&self`; in a follow-up commit this becomes a
3534 /// static `Send + 'static` function over `SharedFlushCtx` so it can
3535 /// run on a spawned task while concurrent commits proceed.
3536 async fn flush_stream_l1(
3537 &self,
3538 old_l0_arc: Arc<RwLock<L0Buffer>>,
3539 wal_lsn: u64,
3540 current_version: u64,
3541 name: Option<String>,
3542 ) -> Result<FlushOutcome> {
3543 // Test-only seam (no-op without the `failpoints` feature): the rotate
3544 // (begin_flush) already moved the to-be-flushed buffer onto
3545 // pending_flush and installed a fresh empty current buffer, but the
3546 // rotated rows are NOT yet durable in Lance. Pausing here holds that
3547 // window open to drive the unique-constraint-hole regression
3548 // (Bug #9 Mechanism A).
3549 fail::fail_point!("flush::after-rotate-before-lance");
3550
3551 // Phase B: materialize any deferred embeddings before column
3552 // extraction. No-op when `defer_embeddings` is off (the set will
3553 // be empty). On-demand reads of the embedding column are a TODO
3554 // for a future revision (see UniConfig::defer_embeddings docs).
3555 self.drain_pending_embeddings(&old_l0_arc).await?;
3556
3557 let schema = self.schema_manager.schema();
3558 // 2. Acquire Read lock on Old L0 for flushing
3559 let mut entries_by_type: HashMap<u32, Vec<L1Entry>> = HashMap::new();
3560 // (Vid, labels, properties, deleted, version)
3561 type VertexEntry = (Vid, Vec<String>, Properties, bool, u64);
3562 let mut vertices_by_label: HashMap<u16, Vec<VertexEntry>> = HashMap::new();
3563 // Partial-column updates (Lance MergeInsert path). Per-VID tuple:
3564 // (vid, full L0 properties map, version, set of keys to update).
3565 // Only the keys in the HashSet are emitted to the partial source;
3566 // the full props map is retained so the per-row column extractor
3567 // can read each touched key's value.
3568 type PartialEntry = (Vid, Properties, u64, std::collections::HashSet<String>);
3569 let mut partial_by_label: HashMap<u16, Vec<PartialEntry>> = HashMap::new();
3570 // DELETE-via-MergeInsert (Round-12 §B): tombstones flush as a
3571 // partial source with just `_vid`, `_deleted=true`, `_version`,
3572 // `_updated_at`. Skips the wide-row Append payload that adds
3573 // nothing on a soft-delete.
3574 let mut tombstones_by_label: HashMap<u16, Vec<(Vid, u64)>> = HashMap::new();
3575 let mut main_vertex_tombstones: Vec<(Vid, u64)> = Vec::new();
3576 // Collect vertex timestamps from L0 for flushing to storage
3577 let mut vertex_created_at: HashMap<Vid, i64> = HashMap::new();
3578 let mut vertex_updated_at: HashMap<Vid, i64> = HashMap::new();
3579 // Track tombstones missing labels for storage query fallback
3580 let mut orphaned_tombstones: Vec<(Vid, u64)> = Vec::new();
3581
3582 {
3583 let old_l0 = old_l0_arc.read();
3584
3585 // 1. Collect all edges and tombstones from L0
3586 for edge in old_l0.graph.edges() {
3587 let properties = old_l0
3588 .edge_properties
3589 .get(&edge.eid)
3590 .cloned()
3591 .unwrap_or_default();
3592 let version = old_l0.edge_versions.get(&edge.eid).copied().unwrap_or(0);
3593
3594 // Get timestamps from L0 buffer (populated during insert)
3595 let created_at = old_l0.edge_created_at.get(&edge.eid).copied();
3596 let updated_at = old_l0.edge_updated_at.get(&edge.eid).copied();
3597
3598 entries_by_type
3599 .entry(edge.edge_type)
3600 .or_default()
3601 .push(L1Entry {
3602 src_vid: edge.src_vid,
3603 dst_vid: edge.dst_vid,
3604 eid: edge.eid,
3605 op: Op::Insert,
3606 version,
3607 properties,
3608 created_at,
3609 updated_at,
3610 });
3611 }
3612
3613 // From tombstones
3614 for tombstone in old_l0.tombstones.values() {
3615 let version = old_l0
3616 .edge_versions
3617 .get(&tombstone.eid)
3618 .copied()
3619 .unwrap_or(0);
3620 // Get timestamps - for deletes, updated_at reflects deletion time
3621 let created_at = old_l0.edge_created_at.get(&tombstone.eid).copied();
3622 let updated_at = old_l0.edge_updated_at.get(&tombstone.eid).copied();
3623
3624 entries_by_type
3625 .entry(tombstone.edge_type)
3626 .or_default()
3627 .push(L1Entry {
3628 src_vid: tombstone.src_vid,
3629 dst_vid: tombstone.dst_vid,
3630 eid: tombstone.eid,
3631 op: Op::Delete,
3632 version,
3633 properties: HashMap::new(),
3634 created_at,
3635 updated_at,
3636 });
3637 }
3638
3639 // 1b. Collect vertices by label (using vertex_labels from L0)
3640 //
3641 // Helper: fan-out a single vertex entry into per-label buckets.
3642 // Each per-label table row carries the full label set so multi-label
3643 // info is preserved after flush.
3644 let push_vertex_to_labels =
3645 |vid: Vid,
3646 all_labels: &[String],
3647 props: Properties,
3648 deleted: bool,
3649 version: u64,
3650 out: &mut HashMap<u16, Vec<VertexEntry>>| {
3651 for label in all_labels {
3652 if let Some(label_id) = schema.label_id_by_name(label) {
3653 out.entry(label_id).or_default().push((
3654 vid,
3655 all_labels.to_vec(),
3656 props.clone(),
3657 deleted,
3658 version,
3659 ));
3660 }
3661 }
3662 };
3663
3664 for (vid, props) in &old_l0.vertex_properties {
3665 let version = old_l0.vertex_versions.get(vid).copied().unwrap_or(0);
3666 // Collect timestamps for this vertex
3667 if let Some(&ts) = old_l0.vertex_created_at.get(vid) {
3668 vertex_created_at.insert(*vid, ts);
3669 }
3670 if let Some(&ts) = old_l0.vertex_updated_at.get(vid) {
3671 vertex_updated_at.insert(*vid, ts);
3672 }
3673 if let Some(labels) = old_l0.vertex_labels.get(vid) {
3674 // Partial-write routing: when this VID was last
3675 // touched via `insert_vertex_partial` AND the
3676 // partial_lance_writes flag is on, send only the
3677 // touched columns to a MergeInsert batch. Otherwise
3678 // (CREATE, MERGE-ON-CREATE, full-replace SET, DELETE
3679 // — or flag off) use the existing full-row Append.
3680 let is_partial = self.config.partial_lance_writes
3681 && old_l0.vertex_partial_keys.contains_key(vid);
3682 if is_partial {
3683 if let Some(touched) = old_l0.vertex_partial_keys.get(vid) {
3684 for label in labels {
3685 if let Some(label_id) = schema.label_id_by_name(label) {
3686 partial_by_label.entry(label_id).or_default().push((
3687 *vid,
3688 props.clone(),
3689 version,
3690 touched.clone(),
3691 ));
3692 }
3693 }
3694 }
3695 } else {
3696 push_vertex_to_labels(
3697 *vid,
3698 labels,
3699 props.clone(),
3700 false,
3701 version,
3702 &mut vertices_by_label,
3703 );
3704 }
3705 }
3706 }
3707 for &vid in &old_l0.vertex_tombstones {
3708 let version = old_l0.vertex_versions.get(&vid).copied().unwrap_or(0);
3709 if let Some(&ts) = old_l0.vertex_updated_at.get(&vid) {
3710 vertex_updated_at.insert(vid, ts);
3711 }
3712 if let Some(labels) = old_l0.vertex_labels.get(&vid) {
3713 // Round-12 §B: tombstones flush via Lance MergeInsert
3714 // (just `_vid`, `_deleted=true`, `_version`,
3715 // `_updated_at`) — skipping the wide-row Append.
3716 // Unconditional (no `partial_lance_writes` gating);
3717 // tombstone Append carries no useful payload.
3718 for label in labels {
3719 if let Some(label_id) = schema.label_id_by_name(label) {
3720 tombstones_by_label
3721 .entry(label_id)
3722 .or_default()
3723 .push((vid, version));
3724 }
3725 }
3726 } else {
3727 // Tombstone missing labels (old WAL format) - collect for storage query fallback
3728 orphaned_tombstones.push((vid, version));
3729 }
3730 }
3731 } // Drop read lock
3732
3733 // Resolve orphaned tombstones (missing labels) from storage
3734 if !orphaned_tombstones.is_empty() {
3735 tracing::warn!(
3736 count = orphaned_tombstones.len(),
3737 "Tombstones missing labels in L0, querying storage as fallback"
3738 );
3739 for (vid, version) in orphaned_tombstones {
3740 if let Ok(Some(labels)) = self.find_vertex_labels_in_storage(vid).await
3741 && !labels.is_empty()
3742 {
3743 for label in &labels {
3744 if let Some(label_id) = schema.label_id_by_name(label) {
3745 // Round-12 §B: route through partial tombstone too.
3746 tombstones_by_label
3747 .entry(label_id)
3748 .or_default()
3749 .push((vid, version));
3750 }
3751 }
3752 }
3753 }
3754 }
3755
3756 // 1. Load previous snapshot from cache, or fall back to storage.
3757 //
3758 // Use clone() not take(): for the async path, multiple
3759 // concurrent streams may run; if we take() here, a sibling
3760 // stream sees cached_manifest = None and seeds from
3761 // load_latest_snapshot (stale), losing the chain. clone()
3762 // preserves the parent. Finalize writes back the new manifest
3763 // unconditionally.
3764 let mut manifest = if let Some(cached) = self.cached_manifest.lock().clone() {
3765 cached
3766 } else {
3767 self.storage
3768 .snapshot_manager()
3769 .load_latest_snapshot()
3770 .await?
3771 .unwrap_or_else(|| {
3772 SnapshotManifest::new(Uuid::new_v4().to_string(), schema.schema_version)
3773 })
3774 };
3775
3776 // Update snapshot metadata
3777 // Save parent snapshot ID before generating new one (for lineage tracking)
3778 let parent_id = manifest.snapshot_id.clone();
3779 manifest.parent_snapshot = Some(parent_id);
3780 manifest.snapshot_id = Uuid::new_v4().to_string();
3781 manifest.name = name;
3782 manifest.created_at = Utc::now();
3783 manifest.version_high_water_mark = current_version;
3784 manifest.wal_high_water_mark = wal_lsn;
3785 let snapshot_id = manifest.snapshot_id.clone();
3786
3787 tracing::Span::current().record("snapshot_id", &snapshot_id);
3788
3789 // 2. Write main unified tables FIRST (before deltas).
3790 // Ensures the dual-write invariant: by the time an EID appears in a
3791 // delta table, it already exists in main_edges. This prevents the
3792 // compaction debug_assert from firing when compaction interleaves
3793 // with flush at async yield points.
3794 //
3795 // 2.1 Main edges table
3796 let (main_edges, edge_created_at_map, edge_updated_at_map) = {
3797 let _old_l0 = old_l0_arc.read();
3798 let mut main_edges: Vec<(
3799 uni_common::core::id::Eid,
3800 Vid,
3801 Vid,
3802 String,
3803 Properties,
3804 bool,
3805 u64,
3806 )> = Vec::new();
3807 let mut edge_created_at_map: HashMap<uni_common::core::id::Eid, i64> = HashMap::new();
3808 let mut edge_updated_at_map: HashMap<uni_common::core::id::Eid, i64> = HashMap::new();
3809
3810 for (&edge_type_id, entries) in entries_by_type.iter() {
3811 for entry in entries {
3812 let edge_type_name = self
3813 .storage
3814 .schema_manager()
3815 .edge_type_name_by_id_unified(edge_type_id)
3816 .unwrap_or_else(|| "unknown".to_string());
3817
3818 let deleted = matches!(entry.op, Op::Delete);
3819 main_edges.push((
3820 entry.eid,
3821 entry.src_vid,
3822 entry.dst_vid,
3823 edge_type_name,
3824 entry.properties.clone(),
3825 deleted,
3826 entry.version,
3827 ));
3828
3829 if let Some(ts) = entry.created_at {
3830 edge_created_at_map.insert(entry.eid, ts);
3831 }
3832 if let Some(ts) = entry.updated_at {
3833 edge_updated_at_map.insert(entry.eid, ts);
3834 }
3835 }
3836 }
3837
3838 (main_edges, edge_created_at_map, edge_updated_at_map)
3839 };
3840
3841 if !main_edges.is_empty() {
3842 let main_edge_batch = MainEdgeDataset::build_record_batch(
3843 &main_edges,
3844 Some(&edge_created_at_map),
3845 Some(&edge_updated_at_map),
3846 )?;
3847 MainEdgeDataset::write_batch(self.storage.backend(), main_edge_batch).await?;
3848 MainEdgeDataset::ensure_default_indexes(self.storage.backend()).await?;
3849 }
3850
3851 // 2.2 Main vertices table
3852 let main_vertices: Vec<(Vid, Vec<String>, Properties, bool, u64)> = {
3853 let old_l0 = old_l0_arc.read();
3854 let mut vertices = Vec::new();
3855
3856 // Live vertices: full-row Append on the main table (the
3857 // props_json blob is required for global ID lookups). For
3858 // partial-row VIDs (vertex_partial_keys non-empty), the
3859 // main table still needs the full props for the
3860 // ext_id-uniqueness path; we keep the Append here. The
3861 // per-label Lance write IS partial via MergeInsert.
3862 for (vid, props) in &old_l0.vertex_properties {
3863 let version = old_l0.vertex_versions.get(vid).copied().unwrap_or(0);
3864 let labels = old_l0.vertex_labels.get(vid).cloned().unwrap_or_default();
3865 vertices.push((*vid, labels, props.clone(), false, version));
3866 }
3867
3868 // Tombstones: collected into `main_vertex_tombstones` for
3869 // the MergeInsert path below; skipping the wide-row Append.
3870 for &vid in &old_l0.vertex_tombstones {
3871 let version = old_l0.vertex_versions.get(&vid).copied().unwrap_or(0);
3872 main_vertex_tombstones.push((vid, version));
3873 }
3874
3875 vertices
3876 };
3877
3878 if !main_vertices.is_empty() {
3879 let main_vertex_batch = MainVertexDataset::build_record_batch(
3880 &main_vertices,
3881 Some(&vertex_created_at),
3882 Some(&vertex_updated_at),
3883 )?;
3884 MainVertexDataset::write_batch(self.storage.backend(), main_vertex_batch).await?;
3885 }
3886 // Round-12 §B: tombstones via MergeInsert on the main vertices
3887 // table. Independent of `vertex_properties` length.
3888 if !main_vertex_tombstones.is_empty() {
3889 let tomb_batch = MainVertexDataset::build_tombstone_partial_batch(
3890 &main_vertex_tombstones,
3891 Some(&vertex_updated_at),
3892 )?;
3893 MainVertexDataset::merge_insert_tombstone_batch(self.storage.backend(), tomb_batch)
3894 .await?;
3895 }
3896 if !main_vertices.is_empty() || !main_vertex_tombstones.is_empty() {
3897 MainVertexDataset::ensure_default_indexes(self.storage.backend()).await?;
3898 }
3899
3900 // 3. For each edge type, write FWD and BWD delta runs
3901 for (&edge_type_id, entries) in entries_by_type.iter() {
3902 // Get edge type name from unified lookup (handles both schema'd and schemaless)
3903 let edge_type_name = self
3904 .storage
3905 .schema_manager()
3906 .edge_type_name_by_id_unified(edge_type_id)
3907 .ok_or_else(|| anyhow!("Edge type ID {} not found", edge_type_id))?;
3908
3909 // FWD Run (sorted by src_vid)
3910 // Round-12 §A: split entries into full-row Append and
3911 // partial MergeInsert routes based on `edge_partial_keys`.
3912 // Edges in `edge_partial_keys` were last written via
3913 // `insert_edge_partial_full`; the per-edge-type delta
3914 // tables receive only the touched schema columns plus
3915 // (when any overflow key was touched) the regenerated
3916 // `overflow_json` blob. Untouched columns retain their
3917 // previous-version value via Lance MergeInsert.
3918 let partial_eids: std::collections::HashSet<Eid> = {
3919 let old_l0 = old_l0_arc.read();
3920 entries
3921 .iter()
3922 .filter(|e| {
3923 self.config.partial_lance_writes
3924 && old_l0.edge_partial_keys.contains_key(&e.eid)
3925 })
3926 .map(|e| e.eid)
3927 .collect()
3928 };
3929 let touched_union_by_eid: HashMap<Eid, std::collections::HashSet<String>> = {
3930 let old_l0 = old_l0_arc.read();
3931 partial_eids
3932 .iter()
3933 .filter_map(|eid| old_l0.edge_partial_keys.get(eid).map(|s| (*eid, s.clone())))
3934 .collect()
3935 };
3936 let (full_entries, partial_entries): (Vec<L1Entry>, Vec<L1Entry>) = entries
3937 .clone()
3938 .into_iter()
3939 .partition(|e| !partial_eids.contains(&e.eid));
3940
3941 let backend = self.storage.backend();
3942
3943 // FWD run (sorted by src_vid)
3944 let mut fwd_full = full_entries.clone();
3945 fwd_full.sort_by_key(|e| e.src_vid);
3946 let mut fwd_partial = partial_entries.clone();
3947 fwd_partial.sort_by_key(|e| e.src_vid);
3948 let fwd_ds = self.storage.delta_dataset(&edge_type_name, "fwd")?;
3949 if !fwd_full.is_empty() {
3950 let fwd_batch = fwd_ds.build_record_batch(&fwd_full, &schema)?;
3951 fwd_ds.write_run(backend, fwd_batch).await?;
3952 }
3953 if !fwd_partial.is_empty() {
3954 let touched_union: std::collections::HashSet<String> = fwd_partial
3955 .iter()
3956 .flat_map(|e| {
3957 touched_union_by_eid
3958 .get(&e.eid)
3959 .cloned()
3960 .unwrap_or_default()
3961 .into_iter()
3962 })
3963 .collect();
3964 let fwd_partial_batch =
3965 fwd_ds.build_partial_record_batch(&fwd_partial, &touched_union, &schema)?;
3966 fwd_ds
3967 .merge_insert_partial_run(backend, fwd_partial_batch)
3968 .await?;
3969 }
3970 fwd_ds.ensure_eid_index(backend).await?;
3971
3972 // BWD Run (sorted by dst_vid)
3973 let mut bwd_full = full_entries.clone();
3974 bwd_full.sort_by_key(|e| e.dst_vid);
3975 let mut bwd_partial = partial_entries.clone();
3976 bwd_partial.sort_by_key(|e| e.dst_vid);
3977 let bwd_ds = self.storage.delta_dataset(&edge_type_name, "bwd")?;
3978 if !bwd_full.is_empty() {
3979 let bwd_batch = bwd_ds.build_record_batch(&bwd_full, &schema)?;
3980 bwd_ds.write_run(backend, bwd_batch).await?;
3981 }
3982 if !bwd_partial.is_empty() {
3983 let touched_union: std::collections::HashSet<String> = bwd_partial
3984 .iter()
3985 .flat_map(|e| {
3986 touched_union_by_eid
3987 .get(&e.eid)
3988 .cloned()
3989 .unwrap_or_default()
3990 .into_iter()
3991 })
3992 .collect();
3993 let bwd_partial_batch =
3994 bwd_ds.build_partial_record_batch(&bwd_partial, &touched_union, &schema)?;
3995 bwd_ds
3996 .merge_insert_partial_run(backend, bwd_partial_batch)
3997 .await?;
3998 }
3999 bwd_ds.ensure_eid_index(backend).await?;
4000
4001 // Update Manifest
4002 let current_snap =
4003 manifest
4004 .edges
4005 .entry(edge_type_name.to_string())
4006 .or_insert(EdgeSnapshot {
4007 version: 0,
4008 count: 0,
4009 lance_version: 0,
4010 });
4011 current_snap.version += 1;
4012 current_snap.count += entries.len() as u64;
4013 // LanceDB tables don't expose Lance version directly
4014 current_snap.lance_version = 0;
4015
4016 // Note: No CSR invalidation needed. AdjacencyManager's overlay
4017 // already has these edges via dual-write in insert_edge/delete_edge.
4018 }
4019
4020 // 4. Per-label vertex table writes
4021 // Iterate all labels that have either full-row OR partial-write
4022 // data pending. A label may appear in only one of the two maps
4023 // (e.g., all updates on this label were partial-only).
4024 let all_label_ids: std::collections::HashSet<u16> = vertices_by_label
4025 .keys()
4026 .chain(partial_by_label.keys())
4027 .chain(tombstones_by_label.keys())
4028 .copied()
4029 .collect();
4030 for label_id in all_label_ids {
4031 let vertices = vertices_by_label.remove(&label_id).unwrap_or_default();
4032 let label_name = schema
4033 .label_name_by_id(label_id)
4034 .ok_or_else(|| anyhow!("Label ID {} not found", label_id))?;
4035
4036 let ds = self.storage.vertex_dataset(label_name)?;
4037
4038 // Collect inverted index updates before consuming vertices
4039 // Maps: cfg.property -> (added, removed)
4040 type InvertedUpdateMap = HashMap<String, (HashMap<Vid, Vec<String>>, HashSet<Vid>)>;
4041 let mut inverted_updates: InvertedUpdateMap = HashMap::new();
4042
4043 for idx in &schema.indexes {
4044 if let IndexDefinition::Inverted(cfg) = idx
4045 && cfg.label == label_name
4046 {
4047 let mut added: HashMap<Vid, Vec<String>> = HashMap::new();
4048 let mut removed: HashSet<Vid> = HashSet::new();
4049
4050 for (vid, _labels, props, deleted, _version) in &vertices {
4051 if *deleted {
4052 removed.insert(*vid);
4053 } else if let Some(prop_value) = props.get(&cfg.property) {
4054 // Extract terms from the property value (List<String>)
4055 if let Some(arr) = prop_value.as_array() {
4056 let terms: Vec<String> = arr
4057 .iter()
4058 .filter_map(|v| v.as_str().map(ToString::to_string))
4059 .collect();
4060 if !terms.is_empty() {
4061 added.insert(*vid, terms);
4062 }
4063 }
4064 }
4065 }
4066 // Round-12 §B: tombstones no longer in `vertices`;
4067 // pull them from `tombstones_by_label` for inverted
4068 // index removal.
4069 if let Some(tomb_rows) = tombstones_by_label.get(&label_id) {
4070 for (vid, _) in tomb_rows {
4071 removed.insert(*vid);
4072 }
4073 }
4074
4075 if !added.is_empty() || !removed.is_empty() {
4076 inverted_updates.insert(cfg.property.clone(), (added, removed));
4077 }
4078 }
4079 }
4080
4081 let mut v_data = Vec::new();
4082 let mut d_data = Vec::new();
4083 let mut ver_data = Vec::new();
4084 for (vid, labels, props, deleted, version) in vertices {
4085 v_data.push((vid, labels, props));
4086 d_data.push(deleted);
4087 ver_data.push(version);
4088 }
4089
4090 let backend = self.storage.backend();
4091
4092 // Skip the full-row Append entirely if this label only has
4093 // partial-write rows pending.
4094 if !v_data.is_empty() {
4095 let batch = ds.build_record_batch_with_timestamps(
4096 &v_data,
4097 &d_data,
4098 &ver_data,
4099 &schema,
4100 Some(&vertex_created_at),
4101 Some(&vertex_updated_at),
4102 )?;
4103 ds.write_batch(backend, batch, &schema).await?;
4104 }
4105
4106 // Partial-column batch (Lance MergeInsert path). The flag
4107 // gates whether the routing classified any VIDs as partial;
4108 // outside the flag this collection is always empty so the
4109 // call below is a cheap no-op.
4110 if let Some(partial_rows) = partial_by_label.remove(&label_id)
4111 && !partial_rows.is_empty()
4112 {
4113 let touched_union: std::collections::HashSet<String> = partial_rows
4114 .iter()
4115 .flat_map(|(_, _, _, keys)| keys.iter().cloned())
4116 .collect();
4117 let pairs: Vec<(Vid, Properties)> = partial_rows
4118 .iter()
4119 .map(|(vid, props, _, _)| (*vid, props.clone()))
4120 .collect();
4121 let versions: Vec<u64> = partial_rows.iter().map(|(_, _, v, _)| *v).collect();
4122 let partial_batch = ds.build_partial_record_batch(
4123 &pairs,
4124 &versions,
4125 &touched_union,
4126 &schema,
4127 Some(&vertex_updated_at),
4128 )?;
4129 if partial_batch.num_rows() > 0 {
4130 ds.merge_insert_batch(backend, partial_batch).await?;
4131 }
4132 }
4133
4134 // Tombstone batch (Round-12 §B): always MergeInsert with
4135 // just `_vid`, `_deleted=true`, `_version`, `_updated_at`.
4136 // No partial_lance_writes gating — tombstones never carry
4137 // useful property payload to write. Captured tombstone vids
4138 // also drive `remove_from_vid_labels_index` below.
4139 let tombstone_rows = tombstones_by_label.remove(&label_id).unwrap_or_default();
4140 if !tombstone_rows.is_empty() {
4141 let tomb_batch =
4142 ds.build_tombstone_partial_batch(&tombstone_rows, Some(&vertex_updated_at))?;
4143 if tomb_batch.num_rows() > 0 {
4144 ds.merge_insert_batch(backend, tomb_batch).await?;
4145 }
4146 }
4147
4148 ds.ensure_default_indexes(backend).await?;
4149
4150 // Update VidLabelsIndex (if enabled). v_data carries live
4151 // vertices; tombstone removals come from the
4152 // `tombstone_rows` captured above the MergeInsert call.
4153 for ((vid, labels, _props), &deleted) in v_data.iter().zip(d_data.iter()) {
4154 if deleted {
4155 self.storage.remove_from_vid_labels_index(*vid);
4156 } else {
4157 self.storage.update_vid_labels_index(*vid, labels.clone());
4158 }
4159 }
4160 for (vid, _) in &tombstone_rows {
4161 self.storage.remove_from_vid_labels_index(*vid);
4162 }
4163
4164 // Update Manifest
4165 let current_snap =
4166 manifest
4167 .vertices
4168 .entry(label_name.to_string())
4169 .or_insert(LabelSnapshot {
4170 version: 0,
4171 count: 0,
4172 lance_version: 0,
4173 });
4174 current_snap.version += 1;
4175 current_snap.count += v_data.len() as u64;
4176 // LanceDB tables don't expose Lance version directly
4177 current_snap.lance_version = 0;
4178
4179 // Invalidate table cache to ensure next read picks up new version
4180 self.storage.invalidate_table_cache(label_name);
4181
4182 // Apply inverted index updates incrementally
4183 #[cfg(feature = "lance-backend")]
4184 for idx in &schema.indexes {
4185 if let IndexDefinition::Inverted(cfg) = idx
4186 && cfg.label == label_name
4187 && let Some((added, removed)) = inverted_updates.get(&cfg.property)
4188 {
4189 self.storage
4190 .index_manager()
4191 .update_inverted_index_incremental(cfg, added, removed)
4192 .await?;
4193 }
4194 }
4195
4196 // Update UID index with new vertex mappings
4197 // Collect (UniId, Vid) mappings from non-deleted vertices
4198 #[cfg(feature = "lance-backend")]
4199 {
4200 let mut uid_mappings: Vec<(uni_common::core::id::UniId, Vid)> = Vec::new();
4201 for (vid, _labels, props) in &v_data {
4202 let ext_id = props.get("ext_id").and_then(|v| v.as_str());
4203 let uid = crate::storage::vertex::VertexDataset::compute_vertex_uid(
4204 label_name, ext_id, props,
4205 );
4206 uid_mappings.push((uid, *vid));
4207 }
4208
4209 if !uid_mappings.is_empty()
4210 && let Ok(uid_index) = self.storage.uid_index(label_name)
4211 {
4212 uid_index.write_mapping(&uid_mappings).await?;
4213 }
4214 }
4215 }
4216 Ok(FlushOutcome {
4217 manifest,
4218 snapshot_id,
4219 })
4220 }
4221
4222 /// Composition entry that assumes the caller already holds `flush_lock`.
4223 /// Runs rotate + stream + finalize_locked in sequence. Used by
4224 /// [`Writer::flush_to_l1`] (acquires the lock first) and by
4225 /// `commit_transaction_l0`'s post-merge auto-flush branch (which already
4226 /// holds the lock from the commit critical section).
4227 #[instrument(
4228 skip(self),
4229 fields(snapshot_id, mutations_count, size_bytes),
4230 level = "info"
4231 )]
4232 async fn flush_inline_under_lock(&self, name: Option<String>) -> Result<String> {
4233 let start = std::time::Instant::now();
4234
4235 let (initial_size, initial_count) = {
4236 let l0_arc = self.l0_manager.get_current();
4237 let l0 = l0_arc.read();
4238 (l0.estimated_size, l0.mutation_count)
4239 };
4240 tracing::Span::current().record("size_bytes", initial_size);
4241 tracing::Span::current().record("mutations_count", initial_count);
4242
4243 debug!("Starting L0 flush to L1");
4244
4245 // Phases A (WAL pre-flush), B (rotate), C (WAL handoff).
4246 // FlushInProgressGuard lives on RotateOutput and stays alive for
4247 // the full flush — including the finalize_locked call below.
4248 let RotateOutput {
4249 old_l0_arc,
4250 wal_lsn,
4251 current_version,
4252 flush_in_progress_guard: _flush_guard,
4253 } = self.flush_l0_rotate().await?;
4254
4255 // Phases D (L1 collect), E (orphan resolve), F (manifest seed),
4256 // G (Lance writes). Builds the manifest but does NOT publish it.
4257 let FlushOutcome {
4258 manifest,
4259 snapshot_id,
4260 } = self
4261 .flush_stream_l1(old_l0_arc.clone(), wal_lsn, current_version, name)
4262 .await?;
4263
4264 // Phases H..S: publish manifest, complete_flush, WAL truncate,
4265 // property cache clear, last_flush_time, metrics, l1_runs++,
4266 // compaction trigger, index-rebuild scheduling, fork tick.
4267 self.flush_finalize_locked(
4268 old_l0_arc,
4269 wal_lsn,
4270 manifest,
4271 snapshot_id,
4272 initial_size,
4273 initial_count,
4274 start,
4275 )
4276 .await
4277 }
4278
4279 /// Phases H..S of the flush: publish the manifest and run all
4280 /// post-publish bookkeeping. Assumes the caller already holds
4281 /// `flush_lock` — see [`Writer::flush_finalize_now`] for the
4282 /// lock-acquiring variant used by the async finalize path.
4283 #[allow(clippy::too_many_arguments)]
4284 async fn flush_finalize_locked(
4285 &self,
4286 old_l0_arc: Arc<RwLock<L0Buffer>>,
4287 wal_lsn: u64,
4288 manifest: SnapshotManifest,
4289 snapshot_id: String,
4290 initial_size: usize,
4291 initial_count: usize,
4292 start: std::time::Instant,
4293 ) -> Result<String> {
4294 Self::flush_finalize_body(
4295 &self.shared_ctx(),
4296 old_l0_arc,
4297 wal_lsn,
4298 manifest,
4299 snapshot_id,
4300 initial_size,
4301 initial_count,
4302 start,
4303 )
4304 .await
4305 }
4306
4307 /// Phases H..S of the flush, lock-acquiring variant. Used by the
4308 /// async-flush finalizer task (running on a spawned tokio task),
4309 /// which holds neither `&self` nor `flush_lock`. Briefly re-acquires
4310 /// `flush_lock` to serialize the publish boundary, then runs the
4311 /// same body as `flush_finalize_locked` but over a SharedFlushCtx.
4312 #[allow(clippy::too_many_arguments)]
4313 pub(crate) async fn flush_finalize_now(
4314 shared: SharedFlushCtx,
4315 old_l0_arc: Arc<RwLock<L0Buffer>>,
4316 wal_lsn: u64,
4317 manifest: SnapshotManifest,
4318 snapshot_id: String,
4319 initial_size: usize,
4320 initial_count: usize,
4321 start: std::time::Instant,
4322 ) -> Result<String> {
4323 let _flush_lock_guard = shared.flush_lock.clone().lock_owned().await;
4324 Self::flush_finalize_body(
4325 &shared,
4326 old_l0_arc,
4327 wal_lsn,
4328 manifest,
4329 snapshot_id,
4330 initial_size,
4331 initial_count,
4332 start,
4333 )
4334 .await
4335 }
4336
4337 /// Shared body of `flush_finalize_locked` and `flush_finalize_now`.
4338 /// Static over `SharedFlushCtx`; the caller is responsible for
4339 /// holding `flush_lock`.
4340 #[allow(clippy::too_many_arguments)]
4341 async fn flush_finalize_body(
4342 shared: &SharedFlushCtx,
4343 old_l0_arc: Arc<RwLock<L0Buffer>>,
4344 wal_lsn: u64,
4345 mut manifest: SnapshotManifest,
4346 snapshot_id: String,
4347 initial_size: usize,
4348 initial_count: usize,
4349 start: std::time::Instant,
4350 ) -> Result<String> {
4351 // Parent-snapshot fixup. The stream phase built `manifest` with
4352 // parent_snapshot set from cached_manifest at stream time. If
4353 // OTHER flushes (sync or async) have finalized since then,
4354 // cached_manifest has advanced. Re-link this manifest to the
4355 // current cached chain so we don't orphan their data when we
4356 // overwrite cached_manifest below.
4357 let current_parent_id = shared
4358 .cached_manifest
4359 .lock()
4360 .as_ref()
4361 .map(|m| m.snapshot_id.clone());
4362 if current_parent_id.is_some() && manifest.parent_snapshot != current_parent_id {
4363 manifest.parent_snapshot = current_parent_id;
4364 metrics::counter!("uni_flush_parent_chain_fixups_total").increment(1);
4365 }
4366
4367 // H. Publish manifest (body first, then pointer — recovery is
4368 // idempotent if we crash between the two).
4369 shared
4370 .storage
4371 .snapshot_manager()
4372 .save_snapshot(&manifest)
4373 .await?;
4374 shared
4375 .storage
4376 .snapshot_manager()
4377 .set_latest_snapshot(&manifest.snapshot_id)
4378 .await?;
4379
4380 // I. Cache manifest for next flush to avoid re-reading from object store.
4381 *shared.cached_manifest.lock() = Some(manifest.clone());
4382
4383 // L. Invalidate the property cache BEFORE removing the flushed buffer
4384 // from the L0 chain (Bug #10). `clear_cache` has no dependency on the
4385 // complete_flush (J) / WAL-truncate (K) steps below, so clearing it
4386 // first closes the non-monotonic-read window: once the buffer leaves
4387 // the L0 chain at J a freshly-written value would otherwise miss the
4388 // chain and fall through to a stale cache entry. By the time finalize
4389 // runs the streamed rows are already durable in L1, so a post-clear
4390 // read falls through to fresh storage instead. The finalizer holds
4391 // `flush_lock` throughout, so reordering L ahead of J is safe.
4392 if let Some(ref pm) = shared.property_manager {
4393 pm.clear_cache().await;
4394 }
4395
4396 // J. Complete flush: remove old L0 from pending_flush. MUST happen
4397 // BEFORE WAL truncation so min_pending_wal_lsn is accurate.
4398 shared.l0_manager.complete_flush(&old_l0_arc);
4399
4400 // Test-only seam (no-op without the `failpoints` feature): pause AFTER
4401 // complete_flush removed the buffer from the L0 chain (J) but BEFORE
4402 // WAL truncation (K). The property cache is already cleared (L moved
4403 // ahead of J above), so a read in this window falls through to fresh
4404 // L1 storage rather than a stale cache entry (Bug #10 — non-monotonic
4405 // read after flush finalize).
4406 fail::fail_point!("flush::after-complete-before-cache-clear");
4407
4408 // K. Truncate WAL up to the safe LSN.
4409 let wal_handle = shared.l0_manager.get_current().read().wal.clone();
4410 if let Some(w) = wal_handle {
4411 let safe_lsn = shared
4412 .l0_manager
4413 .min_pending_wal_lsn()
4414 .map(|min_pending| min_pending.min(wal_lsn))
4415 .unwrap_or(wal_lsn);
4416 w.truncate_before(safe_lsn).await?;
4417 }
4418
4419 // M. Reset last flush time for time-based auto-flush.
4420 *shared.last_flush_time.lock() = std::time::Instant::now();
4421
4422 info!(
4423 snapshot_id,
4424 mutations_count = initial_count,
4425 size_bytes = initial_size,
4426 "L0 flush to L1 completed successfully"
4427 );
4428 metrics::histogram!("uni_flush_duration_seconds").record(start.elapsed().as_secs_f64());
4429 metrics::counter!("uni_flush_bytes_total").increment(initial_size as u64);
4430 metrics::counter!("uni_flush_rows_total").increment(initial_count as u64);
4431
4432 // P. Increment flush generation counter for write throttling.
4433 {
4434 let mut status = uni_common::sync::acquire_mutex(
4435 &shared.storage.compaction_status,
4436 "compaction_status",
4437 )?;
4438 status.l1_runs += 1;
4439 }
4440
4441 // Q. Trigger CSR compaction if enough frozen segments have accumulated.
4442 let am = shared.adjacency_manager.clone();
4443 if am.should_compact(shared.compaction_config.frozen_segments_compact_threshold) {
4444 let previous_still_running = {
4445 let guard = shared.compaction_handle.read();
4446 guard.as_ref().is_some_and(|h| !h.is_finished())
4447 };
4448 if previous_still_running {
4449 info!("Skipping compaction: previous compaction still in progress");
4450 } else {
4451 let handle = tokio::spawn(async move {
4452 am.compact();
4453 });
4454 *shared.compaction_handle.write() = Some(handle);
4455 }
4456 }
4457
4458 // R. Post-flush: check if any indexes need rebuilding based on thresholds.
4459 if shared.auto_rebuild_enabled
4460 && let Some(rebuild_mgr) = shared.index_rebuild_manager.get()
4461 {
4462 Self::schedule_index_rebuilds_if_needed_static(
4463 &manifest,
4464 rebuild_mgr.clone(),
4465 shared.schema_manager.clone(),
4466 shared.index_rebuild_config.clone(),
4467 );
4468 }
4469
4470 // S. Emit fork-fragment observability after a successful forked flush.
4471 Self::tick_fork_fragment_observability_static(
4472 shared.fork_id,
4473 shared.fork_flush_count.clone(),
4474 shared.fork_fragment_warn_fired.clone(),
4475 shared.fork_fragment_warn_threshold,
4476 );
4477
4478 Ok(snapshot_id)
4479 }
4480
4481 /// Increment fork-flush bookkeeping and fire the fragment warn
4482 /// once if the threshold is crossed.
4483 ///
4484 /// Each flush typically appends ~1 fragment per touched dataset on
4485 /// the fork's branches; without compaction (deferred to Phase 5)
4486 /// long-lived heavy-write forks degrade. The flush count is a
4487 /// proxy for actual fragment growth — reading
4488 /// `Dataset::manifest().fragments.len()` per dataset would add a
4489 /// per-flush object-store roundtrip on the hot commit path, which
4490 /// is too costly for a purely observational guard rail.
4491 ///
4492 /// No-op for primary writers (`fork_id == None`).
4493 #[allow(dead_code)] // called by tests; production path uses _static
4494 pub(crate) fn tick_fork_fragment_observability(&self) {
4495 Self::tick_fork_fragment_observability_static(
4496 self.fork_id,
4497 self.fork_flush_count.clone(),
4498 self.fork_fragment_warn_fired.clone(),
4499 self.config.fork_fragment_warn_threshold,
4500 );
4501 }
4502
4503 /// Static variant of [`Writer::tick_fork_fragment_observability`].
4504 /// Used by the async-flush finalize path, where we hold a
4505 /// [`SharedFlushCtx`] bundle of Arcs rather than `&Writer`.
4506 pub(crate) fn tick_fork_fragment_observability_static(
4507 fork_id: Option<ForkId>,
4508 fork_flush_count: Arc<AtomicU64>,
4509 fork_fragment_warn_fired: Arc<AtomicBool>,
4510 warn_threshold: usize,
4511 ) {
4512 let Some(fork_id) = fork_id else { return };
4513 // `Relaxed` is sufficient: observational counter, no synchronizes-with.
4514 let new_count = fork_flush_count.fetch_add(1, Ordering::Relaxed) + 1;
4515 let fork_label = fork_id.to_string();
4516 metrics::gauge!(
4517 "uni_fork_l1_flushes",
4518 "fork" => fork_label.clone(),
4519 )
4520 .set(new_count as f64);
4521 let threshold = warn_threshold as u64;
4522 if !fork_fragment_warn_fired.load(Ordering::Relaxed)
4523 && threshold > 0
4524 && new_count >= threshold
4525 {
4526 fork_fragment_warn_fired.store(true, Ordering::Relaxed);
4527 tracing::warn!(
4528 fork = %fork_label,
4529 flush_count = new_count,
4530 threshold,
4531 "fork has exceeded the L1 flush-count threshold; \
4532 fork compaction is deferred to Phase 5 — consider \
4533 drop+recreate or promotion to bound fragment growth"
4534 );
4535 }
4536 }
4537
4538 /// Check rebuild thresholds and schedule background index rebuilds for
4539 /// labels that exceed growth or age limits. Marks affected indexes as
4540 /// `Stale` and spawns an async task to schedule the rebuild.
4541 #[allow(dead_code)] // production path uses _static; kept as the
4542 // documented instance entry point.
4543 fn schedule_index_rebuilds_if_needed(
4544 &self,
4545 manifest: &SnapshotManifest,
4546 rebuild_mgr: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
4547 ) {
4548 Self::schedule_index_rebuilds_if_needed_static(
4549 manifest,
4550 rebuild_mgr,
4551 self.schema_manager.clone(),
4552 self.config.index_rebuild.clone(),
4553 );
4554 }
4555
4556 /// Static variant of [`Writer::schedule_index_rebuilds_if_needed`].
4557 /// Used by the async-flush finalize path, where we hold the
4558 /// [`SchemaManager`] via `SharedFlushCtx` rather than `&Writer`.
4559 pub(crate) fn schedule_index_rebuilds_if_needed_static(
4560 manifest: &SnapshotManifest,
4561 rebuild_mgr: Arc<crate::storage::index_rebuild::IndexRebuildManager>,
4562 schema_manager: Arc<uni_common::core::schema::SchemaManager>,
4563 index_rebuild_config: uni_common::config::IndexRebuildConfig,
4564 ) {
4565 let checker =
4566 crate::storage::index_rebuild::RebuildTriggerChecker::new(index_rebuild_config);
4567 let schema = schema_manager.schema();
4568 let labels = checker.labels_needing_rebuild(manifest, &schema.indexes);
4569
4570 if labels.is_empty() {
4571 return;
4572 }
4573
4574 // Mark affected indexes as Stale
4575 for label in &labels {
4576 for idx in &schema.indexes {
4577 if idx.label() == label {
4578 let _ = schema_manager.update_index_metadata(idx.name(), |m| {
4579 m.status = uni_common::core::schema::IndexStatus::Stale;
4580 });
4581 }
4582 }
4583 }
4584
4585 tokio::spawn(async move {
4586 if let Err(e) = rebuild_mgr.schedule(labels).await {
4587 tracing::warn!("Failed to schedule index rebuild: {e}");
4588 }
4589 });
4590 }
4591}
4592
4593/// `FinalizeFn` implementation that the `FlushCoordinator` invokes from
4594/// its single-task finalizer loop. Unit struct on purpose: it must NOT
4595/// hold `Arc<Writer>` (that would create a reference cycle Writer ->
4596/// FlushCoordinator -> Arc<dyn FinalizeFn> -> Writer). All state needed
4597/// for finalize travels in via `SharedFlushCtx`.
4598pub(crate) struct WriterFinalizer;
4599
4600impl FinalizeFn for WriterFinalizer {
4601 fn finalize<'a>(
4602 &'a self,
4603 rotated: RotatedFlush,
4604 outcome: AsyncFlushOutcome,
4605 shared: SharedFlushCtx,
4606 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<String>> + Send + 'a>> {
4607 Box::pin(async move {
4608 // Read initial_size / initial_count from the rotated L0 so
4609 // we don't have to plumb them through the coordinator
4610 // submission. The buffer is still alive in pending_flush
4611 // until `complete_flush` (J) below pops it.
4612 let (initial_size, initial_count) = {
4613 let l0 = rotated.old_l0_arc.read();
4614 (l0.estimated_size, l0.mutation_count)
4615 };
4616 let result = Writer::flush_finalize_now(
4617 shared,
4618 rotated.old_l0_arc.clone(),
4619 rotated.wal_lsn,
4620 outcome.new_manifest,
4621 outcome.snapshot_id,
4622 initial_size,
4623 initial_count,
4624 std::time::Instant::now(),
4625 )
4626 .await;
4627 // `rotated` (permit + flush_in_progress_guard) drops here.
4628 drop(rotated.permit);
4629 result
4630 })
4631 }
4632
4633 fn finalize_failure<'a>(
4634 &'a self,
4635 rotated: RotatedFlush,
4636 err: anyhow::Error,
4637 _shared: SharedFlushCtx,
4638 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Error> + Send + 'a>> {
4639 Box::pin(async move {
4640 tracing::warn!(
4641 error = %err,
4642 seq = rotated.seq,
4643 "async flush stream failed; old L0 remains in pending_flush, \
4644 WAL retains its data, recovery via WAL replay on restart"
4645 );
4646 metrics::counter!("uni_flush_failures_total").increment(1);
4647 // Permit + guard drop here so back-pressure releases even on
4648 // failure.
4649 drop(rotated.permit);
4650 err
4651 })
4652 }
4653}
4654
4655#[cfg(test)]
4656mod tests {
4657 use super::*;
4658 use tempfile::tempdir;
4659
4660 /// Test that commit_transaction writes mutations to WAL before merging to main L0.
4661 /// This verifies fix for issue #137 (transaction commit atomicity).
4662 #[tokio::test]
4663 async fn test_commit_transaction_wal_before_merge() -> Result<()> {
4664 use crate::runtime::wal::WriteAheadLog;
4665 use crate::storage::manager::StorageManager;
4666 use object_store::local::LocalFileSystem;
4667 use object_store::path::Path as ObjectStorePath;
4668 use uni_common::core::schema::SchemaManager;
4669
4670 let dir = tempdir()?;
4671 let path = dir.path().to_str().unwrap();
4672 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
4673 let schema_path = ObjectStorePath::from("schema.json");
4674
4675 let schema_manager =
4676 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
4677 let _label_id = schema_manager.add_label("Test")?;
4678 schema_manager.save().await?;
4679
4680 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
4681
4682 // Create WAL for main L0
4683 let wal_path = ObjectStorePath::from("wal");
4684 let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
4685
4686 let writer = Writer::new_with_config(
4687 storage.clone(),
4688 schema_manager.clone(),
4689 1,
4690 UniConfig::default(),
4691 Some(wal),
4692 None,
4693 )
4694 .await?;
4695
4696 // Begin transaction — create a transaction L0
4697 let tx_l0 = writer.create_transaction_l0();
4698
4699 // Insert data in transaction
4700 let vid_a = writer.next_vid().await?;
4701 let vid_b = writer.next_vid().await?;
4702
4703 let mut props = std::collections::HashMap::new();
4704 props.insert("test".to_string(), Value::String("data".to_string()));
4705
4706 writer
4707 .insert_vertex_with_labels(vid_a, props.clone(), &["Test".to_string()], Some(&tx_l0))
4708 .await?;
4709 writer
4710 .insert_vertex_with_labels(
4711 vid_b,
4712 std::collections::HashMap::new(),
4713 &["Test".to_string()],
4714 Some(&tx_l0),
4715 )
4716 .await?;
4717
4718 let eid = writer.next_eid(1).await?;
4719 writer
4720 .insert_edge(
4721 vid_a,
4722 vid_b,
4723 1,
4724 eid,
4725 std::collections::HashMap::new(),
4726 None,
4727 Some(&tx_l0),
4728 )
4729 .await?;
4730
4731 // Get WAL before commit
4732 let l0 = writer.l0_manager.get_current();
4733 let wal = l0.read().wal.clone().expect("Main L0 should have WAL");
4734 let mutations_before = wal.replay().await?;
4735 let count_before = mutations_before.len();
4736
4737 // Commit transaction - this should write to WAL first
4738 let writer = Arc::new(writer);
4739 writer.commit_transaction_l0(tx_l0).await?;
4740
4741 // Verify WAL has the new mutations
4742 let mutations_after = wal.replay().await?;
4743 assert!(
4744 mutations_after.len() > count_before,
4745 "WAL should contain transaction mutations after commit"
4746 );
4747
4748 // Verify mutations are in correct order: vertices first, then edges
4749 let new_mutations: Vec<_> = mutations_after.into_iter().skip(count_before).collect();
4750
4751 let mut saw_vertex_a = false;
4752 let mut saw_vertex_b = false;
4753 let mut saw_edge = false;
4754
4755 for mutation in &new_mutations {
4756 match mutation {
4757 crate::runtime::wal::Mutation::InsertVertex { vid, .. } => {
4758 if *vid == vid_a {
4759 saw_vertex_a = true;
4760 }
4761 if *vid == vid_b {
4762 saw_vertex_b = true;
4763 }
4764 // Vertices should come before edges
4765 assert!(!saw_edge, "Vertices should be logged to WAL before edges");
4766 }
4767 crate::runtime::wal::Mutation::InsertEdge { eid: e, .. } => {
4768 if *e == eid {
4769 saw_edge = true;
4770 }
4771 // Edges should come after vertices
4772 assert!(
4773 saw_vertex_a && saw_vertex_b,
4774 "Edge should be logged after both vertices"
4775 );
4776 }
4777 _ => {}
4778 }
4779 }
4780
4781 assert!(saw_vertex_a, "Vertex A should be in WAL");
4782 assert!(saw_vertex_b, "Vertex B should be in WAL");
4783 assert!(saw_edge, "Edge should be in WAL");
4784
4785 // Verify data is also in main L0
4786 let l0_read = l0.read();
4787 assert!(
4788 l0_read.vertex_properties.contains_key(&vid_a),
4789 "Vertex A should be in main L0"
4790 );
4791 assert!(
4792 l0_read.vertex_properties.contains_key(&vid_b),
4793 "Vertex B should be in main L0"
4794 );
4795 assert!(
4796 l0_read.edge_endpoints.contains_key(&eid),
4797 "Edge should be in main L0"
4798 );
4799
4800 Ok(())
4801 }
4802
4803 /// Test that failed WAL flush leaves transaction intact for retry or rollback.
4804 #[tokio::test]
4805 async fn test_commit_transaction_wal_failure_rollback() -> Result<()> {
4806 use crate::runtime::wal::WriteAheadLog;
4807 use crate::storage::manager::StorageManager;
4808 use object_store::local::LocalFileSystem;
4809 use object_store::path::Path as ObjectStorePath;
4810 use uni_common::core::schema::SchemaManager;
4811
4812 let dir = tempdir()?;
4813 let path = dir.path().to_str().unwrap();
4814 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
4815 let schema_path = ObjectStorePath::from("schema.json");
4816
4817 let schema_manager =
4818 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
4819 let _label_id = schema_manager.add_label("Test")?;
4820 let _baseline_label_id = schema_manager.add_label("Baseline")?;
4821 let _txdata_label_id = schema_manager.add_label("TxData")?;
4822 schema_manager.save().await?;
4823
4824 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
4825
4826 // Create WAL for main L0
4827 let wal_path = ObjectStorePath::from("wal");
4828 let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
4829
4830 let writer = Writer::new_with_config(
4831 storage.clone(),
4832 schema_manager.clone(),
4833 1,
4834 UniConfig::default(),
4835 Some(wal),
4836 None,
4837 )
4838 .await?;
4839
4840 // Insert baseline data (outside transaction)
4841 let baseline_vid = writer.next_vid().await?;
4842 writer
4843 .insert_vertex_with_labels(
4844 baseline_vid,
4845 [("baseline".to_string(), Value::Bool(true))]
4846 .into_iter()
4847 .collect(),
4848 &["Baseline".to_string()],
4849 None,
4850 )
4851 .await?;
4852
4853 // Begin transaction — create a transaction L0
4854 let tx_l0 = writer.create_transaction_l0();
4855
4856 // Insert data in transaction
4857 let tx_vid = writer.next_vid().await?;
4858 writer
4859 .insert_vertex_with_labels(
4860 tx_vid,
4861 [("tx_data".to_string(), Value::Bool(true))]
4862 .into_iter()
4863 .collect(),
4864 &["TxData".to_string()],
4865 Some(&tx_l0),
4866 )
4867 .await?;
4868
4869 // Capture main L0 state before rollback
4870 let l0 = writer.l0_manager.get_current();
4871 let vertex_count_before = l0.read().vertex_properties.len();
4872
4873 // Rollback transaction (simulating what would happen after WAL flush failure)
4874 drop(tx_l0);
4875
4876 // Verify main L0 is unchanged
4877 let vertex_count_after = l0.read().vertex_properties.len();
4878 assert_eq!(
4879 vertex_count_before, vertex_count_after,
4880 "Main L0 should not change after rollback"
4881 );
4882
4883 // Baseline should still be present
4884 assert!(
4885 l0.read().vertex_properties.contains_key(&baseline_vid),
4886 "Baseline data should remain"
4887 );
4888
4889 // Transaction data should NOT be in main L0
4890 assert!(
4891 !l0.read().vertex_properties.contains_key(&tx_vid),
4892 "Transaction data should not be in main L0 after rollback"
4893 );
4894
4895 Ok(())
4896 }
4897
4898 /// Test that batch insert with shared labels does not clone labels per vertex.
4899 /// This verifies fix for issue #161 (redundant label cloning).
4900 #[tokio::test]
4901 async fn test_batch_insert_shared_labels() -> Result<()> {
4902 use crate::storage::manager::StorageManager;
4903 use object_store::local::LocalFileSystem;
4904 use object_store::path::Path as ObjectStorePath;
4905 use uni_common::core::schema::SchemaManager;
4906
4907 let dir = tempdir()?;
4908 let path = dir.path().to_str().unwrap();
4909 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
4910 let schema_path = ObjectStorePath::from("schema.json");
4911
4912 let schema_manager =
4913 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
4914 let _label_id = schema_manager.add_label("Person")?;
4915 schema_manager.save().await?;
4916
4917 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
4918
4919 let writer = Writer::new(storage.clone(), schema_manager.clone(), 1).await?;
4920
4921 // Shared labels - should not be cloned per vertex
4922 let labels = &["Person".to_string()];
4923
4924 // Insert batch of vertices with same labels
4925 let mut vids = Vec::new();
4926 for i in 0..100 {
4927 let vid = writer.next_vid().await?;
4928 let mut props = std::collections::HashMap::new();
4929 props.insert("id".to_string(), Value::Int(i));
4930 writer
4931 .insert_vertex_with_labels(vid, props, labels, None)
4932 .await?;
4933 vids.push(vid);
4934 }
4935
4936 // Verify all vertices have the correct labels
4937 let l0 = writer.l0_manager.get_current();
4938 for vid in vids {
4939 let l0_guard = l0.read();
4940 let vertex_labels = l0_guard.vertex_labels.get(&vid);
4941 assert!(vertex_labels.is_some(), "Vertex should have labels");
4942 assert_eq!(
4943 vertex_labels.unwrap(),
4944 &vec!["Person".to_string()],
4945 "Labels should match"
4946 );
4947 }
4948
4949 Ok(())
4950 }
4951
4952 /// Test that estimated_size tracks mutations correctly and approximates size_bytes().
4953 /// This verifies fix for issue #147 (O(V+E) size_bytes() in metrics).
4954 #[tokio::test]
4955 async fn test_estimated_size_tracks_mutations() -> Result<()> {
4956 use crate::storage::manager::StorageManager;
4957 use object_store::local::LocalFileSystem;
4958 use object_store::path::Path as ObjectStorePath;
4959 use uni_common::core::schema::SchemaManager;
4960
4961 let dir = tempdir()?;
4962 let path = dir.path().to_str().unwrap();
4963 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
4964 let schema_path = ObjectStorePath::from("schema.json");
4965
4966 let schema_manager =
4967 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
4968 let _label_id = schema_manager.add_label("Test")?;
4969 schema_manager.save().await?;
4970
4971 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
4972
4973 let writer = Writer::new(storage.clone(), schema_manager.clone(), 1).await?;
4974
4975 let l0 = writer.l0_manager.get_current();
4976
4977 // Initial state should be empty
4978 let initial_estimated = l0.read().estimated_size;
4979 let initial_actual = l0.read().size_bytes();
4980 assert_eq!(initial_estimated, 0, "Initial estimated_size should be 0");
4981 assert_eq!(initial_actual, 0, "Initial size_bytes should be 0");
4982
4983 // Insert vertices with properties
4984 let mut vids = Vec::new();
4985 for i in 0..10 {
4986 let vid = writer.next_vid().await?;
4987 let mut props = std::collections::HashMap::new();
4988 props.insert("name".to_string(), Value::String(format!("vertex_{}", i)));
4989 props.insert("index".to_string(), Value::Int(i));
4990 writer
4991 .insert_vertex_with_labels(vid, props, &[], None)
4992 .await?;
4993 vids.push(vid);
4994 }
4995
4996 // Verify estimated_size grew
4997 let after_vertices_estimated = l0.read().estimated_size;
4998 let after_vertices_actual = l0.read().size_bytes();
4999 assert!(
5000 after_vertices_estimated > 0,
5001 "estimated_size should grow after insertions"
5002 );
5003
5004 // Verify estimated_size is within reasonable bounds of actual size (within 2x)
5005 let ratio = after_vertices_estimated as f64 / after_vertices_actual as f64;
5006 assert!(
5007 (0.5..=2.0).contains(&ratio),
5008 "estimated_size ({}) should be within 2x of size_bytes ({}), ratio: {}",
5009 after_vertices_estimated,
5010 after_vertices_actual,
5011 ratio
5012 );
5013
5014 // Insert edges with a simple edge type
5015 let edge_type = 1u32;
5016 for i in 0..9 {
5017 let eid = writer.next_eid(edge_type).await?;
5018 writer
5019 .insert_edge(
5020 vids[i],
5021 vids[i + 1],
5022 edge_type,
5023 eid,
5024 std::collections::HashMap::new(),
5025 Some("NEXT".to_string()),
5026 None,
5027 )
5028 .await?;
5029 }
5030
5031 // Verify estimated_size grew further
5032 let after_edges_estimated = l0.read().estimated_size;
5033 let after_edges_actual = l0.read().size_bytes();
5034 assert!(
5035 after_edges_estimated > after_vertices_estimated,
5036 "estimated_size should grow after edge insertions"
5037 );
5038
5039 // Verify still within reasonable bounds
5040 let ratio = after_edges_estimated as f64 / after_edges_actual as f64;
5041 assert!(
5042 (0.5..=2.0).contains(&ratio),
5043 "estimated_size ({}) should be within 2x of size_bytes ({}), ratio: {}",
5044 after_edges_estimated,
5045 after_edges_actual,
5046 ratio
5047 );
5048
5049 Ok(())
5050 }
5051
5052 /// Test that flushing WAL on a writer with no mutations succeeds cleanly.
5053 #[tokio::test]
5054 async fn test_flush_wal_empty_l0_is_noop() -> Result<()> {
5055 use crate::runtime::wal::WriteAheadLog;
5056 use crate::storage::manager::StorageManager;
5057 use object_store::local::LocalFileSystem;
5058 use object_store::path::Path as ObjectStorePath;
5059 use uni_common::core::schema::SchemaManager;
5060
5061 let dir = tempdir()?;
5062 let path = dir.path().to_str().unwrap();
5063 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5064 let schema_path = ObjectStorePath::from("schema.json");
5065
5066 let schema_manager =
5067 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5068 schema_manager.save().await?;
5069
5070 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5071
5072 let wal_path = ObjectStorePath::from("wal");
5073 let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
5074
5075 let writer = Writer::new_with_config(
5076 storage.clone(),
5077 schema_manager.clone(),
5078 1,
5079 UniConfig::default(),
5080 Some(wal.clone()),
5081 None,
5082 )
5083 .await?;
5084
5085 // Flush with no mutations — should succeed cleanly
5086 let lsn = writer.flush_wal().await?;
5087 // LSN should be 0 or 1 (no real mutations flushed)
5088 assert!(lsn <= 1, "Empty flush should produce low LSN, got {}", lsn);
5089
5090 Ok(())
5091 }
5092
5093 /// Test that transaction data does not leak into main L0 without commit.
5094 #[tokio::test]
5095 async fn test_transaction_isolation_without_commit() -> Result<()> {
5096 use crate::runtime::wal::WriteAheadLog;
5097 use crate::storage::manager::StorageManager;
5098 use object_store::local::LocalFileSystem;
5099 use object_store::path::Path as ObjectStorePath;
5100 use uni_common::core::schema::SchemaManager;
5101
5102 let dir = tempdir()?;
5103 let path = dir.path().to_str().unwrap();
5104 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5105 let schema_path = ObjectStorePath::from("schema.json");
5106
5107 let schema_manager =
5108 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5109 let _label_id = schema_manager.add_label("Person")?;
5110 schema_manager.save().await?;
5111
5112 let storage = Arc::new(StorageManager::new(path, schema_manager.clone()).await?);
5113
5114 let wal_path = ObjectStorePath::from("wal");
5115 let wal = Arc::new(WriteAheadLog::new(store.clone(), wal_path));
5116
5117 let writer = Writer::new_with_config(
5118 storage.clone(),
5119 schema_manager.clone(),
5120 1,
5121 UniConfig::default(),
5122 Some(wal),
5123 None,
5124 )
5125 .await?;
5126
5127 // Create transaction L0
5128 let tx_l0 = writer.create_transaction_l0();
5129
5130 // Insert vertex into transaction L0
5131 let vid = writer.next_vid().await?;
5132 writer
5133 .insert_vertex_with_labels(
5134 vid,
5135 [("name".to_string(), Value::String("Ghost".to_string()))]
5136 .into_iter()
5137 .collect(),
5138 &["Person".to_string()],
5139 Some(&tx_l0),
5140 )
5141 .await?;
5142
5143 // Verify data is in transaction L0
5144 assert!(
5145 tx_l0.read().vertex_properties.contains_key(&vid),
5146 "Transaction L0 should contain the vertex"
5147 );
5148
5149 // Verify data is NOT in main L0
5150 let main_l0 = writer.l0_manager.get_current();
5151 assert!(
5152 !main_l0.read().vertex_properties.contains_key(&vid),
5153 "Main L0 should NOT contain uncommitted transaction data"
5154 );
5155
5156 // Drop transaction without committing — data should be lost
5157 drop(tx_l0);
5158
5159 // Main L0 still should not have it
5160 assert!(
5161 !main_l0.read().vertex_properties.contains_key(&vid),
5162 "Main L0 should remain clean after dropped transaction"
5163 );
5164
5165 Ok(())
5166 }
5167
5168 /// Phase 2 Day 12: the fork-fragment warn fires exactly once when
5169 /// the flush count crosses the configured threshold and stays
5170 /// silent on subsequent flushes for the lifetime of the writer.
5171 /// Primary writers (`fork_id == None`) never fire it.
5172 ///
5173 /// Tested directly against `tick_fork_fragment_observability` so
5174 /// the contract is locked in independently of the broader
5175 /// `flush_to_l1` path (the end-to-end fork-flush path is blocked
5176 /// on Day 10's on-the-fly schema overlay growth).
5177 #[tokio::test]
5178 async fn fork_fragment_warn_fires_once_then_silences() -> Result<()> {
5179 use crate::storage::manager::StorageManager;
5180 use object_store::local::LocalFileSystem;
5181 use object_store::path::Path as ObjectStorePath;
5182 use uni_common::core::fork::ForkId;
5183 use uni_common::core::schema::SchemaManager;
5184
5185 let dir = tempdir()?;
5186 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5187 let schema_path = ObjectStorePath::from("schema.json");
5188 let schema_manager =
5189 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5190 let storage = Arc::new(
5191 StorageManager::new(dir.path().to_str().unwrap(), schema_manager.clone()).await?,
5192 );
5193
5194 let config = UniConfig {
5195 fork_fragment_warn_threshold: 3,
5196 ..Default::default()
5197 };
5198 let mut writer =
5199 Writer::new_with_config(storage, schema_manager, 1, config, None, None).await?;
5200
5201 // Primary path: never fires.
5202 for _ in 0..10 {
5203 writer.tick_fork_fragment_observability();
5204 }
5205 assert!(!writer.fork_fragment_warn_fired.load(Ordering::Relaxed));
5206 assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 0);
5207
5208 // Fork path: tag and tick. Below threshold → no fire.
5209 writer.fork_id = Some(ForkId::new());
5210 writer.tick_fork_fragment_observability();
5211 writer.tick_fork_fragment_observability();
5212 assert!(!writer.fork_fragment_warn_fired.load(Ordering::Relaxed));
5213 assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 2);
5214
5215 // Crossing threshold → fires once.
5216 writer.tick_fork_fragment_observability();
5217 assert!(writer.fork_fragment_warn_fired.load(Ordering::Relaxed));
5218 assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 3);
5219
5220 // Subsequent ticks bump the gauge but do not re-fire.
5221 let fired_after = writer.fork_fragment_warn_fired.load(Ordering::Relaxed);
5222 for _ in 0..5 {
5223 writer.tick_fork_fragment_observability();
5224 }
5225 assert_eq!(writer.fork_flush_count.load(Ordering::Relaxed), 8);
5226 assert_eq!(
5227 writer.fork_fragment_warn_fired.load(Ordering::Relaxed),
5228 fired_after
5229 );
5230
5231 Ok(())
5232 }
5233
5234 /// The hot-path mutators must not write to any `Writer` struct field.
5235 /// Phase 2 of the refactor
5236 /// gave them `&self` receivers, which the compiler enforces against
5237 /// direct `self.x = y` assignment — but interior-mutable writes
5238 /// (Mutex/Atomic/OnceLock) still compile. This regression test snapshots
5239 /// every potentially-writable field, calls each hot-path mutator, and
5240 /// asserts no field changed.
5241 ///
5242 /// Cold-path methods (`flush_to_l1`, `commit_transaction_l0`,
5243 /// `tick_fork_fragment_observability`) DO mutate fields by design and
5244 /// are intentionally out of scope here.
5245 #[tokio::test]
5246 async fn hot_path_mutators_do_not_change_writer_fields() -> Result<()> {
5247 use crate::storage::manager::StorageManager;
5248 use object_store::local::LocalFileSystem;
5249 use object_store::path::Path as ObjectStorePath;
5250 use uni_common::core::schema::SchemaManager;
5251
5252 let dir = tempdir()?;
5253 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
5254 let schema_path = ObjectStorePath::from("schema.json");
5255 let schema_manager =
5256 Arc::new(SchemaManager::load_from_store(store.clone(), &schema_path).await?);
5257 schema_manager.add_label("Person")?;
5258 schema_manager.save().await?;
5259 let storage = Arc::new(
5260 StorageManager::new(dir.path().to_str().unwrap(), schema_manager.clone()).await?,
5261 );
5262
5263 let writer =
5264 Writer::new_with_config(storage, schema_manager, 1, UniConfig::default(), None, None)
5265 .await?;
5266
5267 /// Captures every `Writer` field that *could* be written by a
5268 /// hot-path mutator (i.e., every non-Arc, non-immutable-after-
5269 /// construction field). Arc'd substructures (`l0_manager`,
5270 /// `storage`, etc.) are intentionally not checked — they are
5271 /// re-pointed only at construction.
5272 #[derive(Debug, PartialEq)]
5273 struct Snapshot {
5274 last_flush_time: std::time::Instant,
5275 cached_manifest_some: bool,
5276 fork_flush_count: u64,
5277 fork_fragment_warn_fired: bool,
5278 xervo_runtime_some: bool,
5279 index_rebuild_manager_some: bool,
5280 fork_id: Option<ForkId>,
5281 }
5282
5283 fn snap(w: &Writer) -> Snapshot {
5284 Snapshot {
5285 last_flush_time: *w.last_flush_time.lock(),
5286 cached_manifest_some: w.cached_manifest.lock().is_some(),
5287 fork_flush_count: w.fork_flush_count.load(Ordering::Relaxed),
5288 fork_fragment_warn_fired: w.fork_fragment_warn_fired.load(Ordering::Relaxed),
5289 xervo_runtime_some: w.xervo_runtime.get().is_some(),
5290 index_rebuild_manager_some: w.index_rebuild_manager.get().is_some(),
5291 fork_id: w.fork_id,
5292 }
5293 }
5294
5295 // 1. insert_vertex_with_labels
5296 let before = snap(&writer);
5297 let vid = writer.next_vid().await?;
5298 writer
5299 .insert_vertex_with_labels(vid, Properties::new(), &["Person".to_string()], None)
5300 .await?;
5301 assert_eq!(
5302 snap(&writer),
5303 before,
5304 "insert_vertex_with_labels mutated a Writer field"
5305 );
5306
5307 // 2. insert_vertices_batch
5308 let before = snap(&writer);
5309 let vids = writer.allocate_vids(2).await?;
5310 writer
5311 .insert_vertices_batch(
5312 vids,
5313 vec![Properties::new(), Properties::new()],
5314 vec!["Person".into()],
5315 None,
5316 )
5317 .await?;
5318 assert_eq!(
5319 snap(&writer),
5320 before,
5321 "insert_vertices_batch mutated a Writer field"
5322 );
5323
5324 // 3. delete_vertex
5325 let before = snap(&writer);
5326 writer.delete_vertex(vid, None, None).await?;
5327 assert_eq!(
5328 snap(&writer),
5329 before,
5330 "delete_vertex mutated a Writer field"
5331 );
5332
5333 // (insert_edge / delete_edge are skipped here: their fixture cost is
5334 // disproportionate to the audit's marginal value, and the same
5335 // structural argument plus the compiler-enforced `&self` covers them.)
5336
5337 Ok(())
5338 }
5339}