uni-store 2.0.3

Storage layer for Uni graph database - Lance datasets, LSM deltas, and WAL
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
// SPDX-License-Identifier: Apache-2.0
// Copyright 2024-2026 Dragonscale Team

//! Async-flush coordination.
//!
//! Bounds the number of in-flight L0→L1 flushes (via a semaphore),
//! assigns rotate-order sequence numbers, and serializes finalize so
//! the manifest parent-chain stays consistent.
//!
//! ## Architecture
//!
//! ```text
//! Writer
//!   ├── flush_lock              (brief: rotate + finalize)
//!   └── flush_coordinator
//!         ├── permits: Semaphore(max_pending_flushes)
//!         ├── next_seq: AtomicU64
//!         └── submit_tx → finalizer task
//!                          └─ mpsc<FlushSubmit>
//!                          └─ BinaryHeap reorder by seq
//! ```

use crate::storage::manager::{FlushInProgressGuard, StorageManager};
use parking_lot::RwLock as PlRwLock;
use std::cmp::Reverse;
use std::collections::BinaryHeap;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use tokio::sync::{Semaphore, mpsc, oneshot};
use uni_common::core::snapshot::SnapshotManifest;

/// Result of a rotate phase: the snapshot of state needed to stream and
/// finalize. Send + 'static so it can travel into a spawned task.
pub struct RotatedFlush {
    pub seq: u64,
    pub old_l0_arc: Arc<PlRwLock<crate::runtime::l0::L0Buffer>>,
    pub wal_lsn: u64,
    pub current_version: u64,
    pub name: Option<String>,
    /// Snapshot of `cached_manifest` taken at rotate time. Stream uses this
    /// as a tentative parent; finalize may rewrite it if predecessors
    /// finalized in between.
    pub parent_manifest: Option<SnapshotManifest>,
    /// Permit holding the back-pressure slot. Released on finalize drop.
    pub permit: tokio::sync::OwnedSemaphorePermit,
    /// Acquired during rotate; dropped when this `RotatedFlush` is consumed
    /// by finalize (success or failure). Keeps `flush_in_progress` accurate
    /// for the full async pipeline duration.
    pub flush_in_progress_guard: FlushInProgressGuard,
}

/// Result of a stream phase: the manifest to publish.
pub struct FlushOutcome {
    pub new_manifest: SnapshotManifest,
    pub snapshot_id: String,
}

/// Carried across the spawn boundary so a finalize step can run without
/// touching `Writer` (which is `&self` and lives in the caller).
#[derive(Clone)]
pub struct SharedFlushCtx {
    pub storage: Arc<StorageManager>,
    pub l0_manager: Arc<crate::runtime::l0_manager::L0Manager>,
    pub adjacency_manager: Arc<crate::storage::adjacency_manager::AdjacencyManager>,
    pub property_manager: Option<Arc<crate::runtime::property_manager::PropertyManager>>,
    pub schema_manager: Arc<uni_common::core::schema::SchemaManager>,
    pub cached_manifest: Arc<parking_lot::Mutex<Option<SnapshotManifest>>>,
    pub last_flush_time: Arc<parking_lot::Mutex<std::time::Instant>>,
    pub fork_id: Option<uni_common::core::fork::ForkId>,
    pub fork_flush_count: Arc<std::sync::atomic::AtomicU64>,
    pub fork_fragment_warn_fired: Arc<std::sync::atomic::AtomicBool>,
    pub fork_fragment_warn_threshold: usize,
    /// Re-acquired by the static `flush_finalize_now` running on the
    /// finalizer task. NOT held during stream — that's the whole point.
    pub flush_lock: Arc<tokio::sync::Mutex<()>>,
    pub index_rebuild_manager:
        Arc<std::sync::OnceLock<Arc<crate::storage::index_rebuild::IndexRebuildManager>>>,
    pub compaction_handle: Arc<parking_lot::RwLock<Option<tokio::task::JoinHandle<()>>>>,
    pub compaction_config: uni_common::config::CompactionConfig,
    pub index_rebuild_config: uni_common::config::IndexRebuildConfig,
    pub auto_rebuild_enabled: bool,
}

/// A submission to the ordered finalizer.
struct FlushSubmit {
    seq: u64,
    rotated: RotatedFlush,
    result: anyhow::Result<FlushOutcome>,
    /// Optional notification when finalize completes (for `FlushTicket`).
    ack: Option<oneshot::Sender<anyhow::Result<String>>>,
}

/// User-facing handle to wait on an async-flush completion (proposal §5.6).
pub struct FlushTicket {
    /// `None` means the flush completed inline (sync path).
    rx: Option<oneshot::Receiver<anyhow::Result<String>>>,
}

impl FlushTicket {
    pub fn ready(snapshot_id: anyhow::Result<String>) -> Self {
        // For sync paths: pre-resolved.
        let (tx, rx) = oneshot::channel();
        let _ = tx.send(snapshot_id);
        Self { rx: Some(rx) }
    }

    pub fn pending(rx: oneshot::Receiver<anyhow::Result<String>>) -> Self {
        Self { rx: Some(rx) }
    }

    /// Wait for the flush to finalize. Returns the snapshot id on success.
    pub async fn await_finalize(self) -> anyhow::Result<String> {
        match self.rx {
            Some(rx) => rx
                .await
                .unwrap_or_else(|_| Err(anyhow::anyhow!("flush ticket dropped before completion"))),
            None => Err(anyhow::anyhow!("flush ticket has no completion channel")),
        }
    }
}

pub struct FlushCoordinator {
    permits: Arc<Semaphore>,
    next_seq: AtomicU64,
    /// Wrapped in Mutex<Option<...>> so `shutdown()` can take and drop
    /// it explicitly, which closes the mpsc and lets the finalizer task
    /// exit. `submit()` reads through the option; if absent, the
    /// submission is silently dropped (coordinator is shutting down).
    submit_tx: parking_lot::Mutex<Option<mpsc::UnboundedSender<FlushSubmit>>>,
    /// Counter exposed for `drop_fork` to wait on. Incremented at rotate,
    /// decremented after finalize.
    pending_count: Arc<std::sync::atomic::AtomicUsize>,
    drain_notify: Arc<tokio::sync::Notify>,
    max_pending_flushes: usize,
    /// Tracked for `ShutdownHandle::track_task` registration AND for
    /// `shutdown()`'s await. Set to None after either takes it.
    finalizer_handle: parking_lot::Mutex<Option<tokio::task::JoinHandle<()>>>,
    /// Every spawned stream-phase task. `shutdown()` awaits each so
    /// the closure-captured `Arc<Writer>` (and through it
    /// `Arc<StorageManager>` + `Arc<ForkScope>` on a fork-scoped
    /// writer) actually drops before `shutdown` returns. Without this,
    /// `drop_fork` sees a transient `ForkInUse` because the stream
    /// task's destructor is still on tokio's scheduler queue after
    /// `drain()` returned. Opportunistically pruned in
    /// `submit_for_stream` to keep the vec bounded.
    stream_handles: parking_lot::Mutex<Vec<tokio::task::JoinHandle<()>>>,
}

impl FlushCoordinator {
    pub fn new(
        max_pending_flushes: usize,
        shared: SharedFlushCtx,
        finalize_fn: Arc<dyn FinalizeFn>,
    ) -> Self {
        let permits = Arc::new(Semaphore::new(max_pending_flushes.max(1)));
        let next_seq = AtomicU64::new(0);
        let (submit_tx, submit_rx) = mpsc::unbounded_channel::<FlushSubmit>();
        let pending_count = Arc::new(std::sync::atomic::AtomicUsize::new(0));
        let drain_notify = Arc::new(tokio::sync::Notify::new());

        let pending_count_for_task = pending_count.clone();
        let drain_notify_for_task = drain_notify.clone();
        let handle = tokio::spawn(finalizer_loop(
            submit_rx,
            shared,
            finalize_fn,
            pending_count_for_task,
            drain_notify_for_task,
        ));

        Self {
            permits,
            next_seq,
            submit_tx: parking_lot::Mutex::new(Some(submit_tx)),
            pending_count,
            drain_notify,
            max_pending_flushes,
            finalizer_handle: parking_lot::Mutex::new(Some(handle)),
            stream_handles: parking_lot::Mutex::new(Vec::new()),
        }
    }

    /// Drop the submit channel and await the finalizer task to exit.
    /// After this returns, the coordinator's spawned task is gone and
    /// any Arcs it held (including the writer's `Arc<StorageManager>`
    /// inside `SharedFlushCtx`, which on a fork-scoped writer pins
    /// `Arc<ForkScope>`) are released. Used by `drop_fork` so the
    /// ForkHolderGuard can finally drop. Idempotent: safe to call
    /// repeatedly.
    pub async fn shutdown(&self) {
        // 1. Drain every spawned stream task. Each task's destructor
        //    drops the closure-captured `Arc<Writer>` (and through it
        //    `Arc<StorageManager>` / `Arc<ForkScope>`). Awaiting forces
        //    those drops to happen before we return — closing the L8
        //    fork-drop race documented in the plan.
        let stream_handles: Vec<_> = self.stream_handles.lock().drain(..).collect();
        for h in stream_handles {
            let _ = h.await;
        }
        // 2. Drop submit_tx — closes the mpsc; the finalizer task will
        //    receive None and exit its loop.
        drop(self.submit_tx.lock().take());
        // 3. Await the finalizer task. If already taken (e.g. by
        //    ShutdownHandle::track_task), the JoinHandle is None and we
        //    have no way to await — accept that and return; the task
        //    is still on its way to exit because submit_tx is closed.
        let handle = self.finalizer_handle.lock().take();
        if let Some(h) = handle {
            let _ = h.await;
        }
    }

    /// Hand off the finalizer task's JoinHandle for tracking by
    /// `ShutdownHandle`. Returns `None` if already taken.
    pub fn take_finalizer_handle(&self) -> Option<tokio::task::JoinHandle<()>> {
        self.finalizer_handle.lock().take()
    }

    pub fn max_pending_flushes(&self) -> usize {
        self.max_pending_flushes
    }

    pub async fn acquire_permit(&self) -> anyhow::Result<tokio::sync::OwnedSemaphorePermit> {
        self.permits
            .clone()
            .acquire_owned()
            .await
            .map_err(|_| anyhow::anyhow!("flush coordinator permit semaphore closed"))
    }

    /// Non-blocking variant of [`Self::acquire_permit`]. Returns `None`
    /// if the permit pool is at capacity. Used on the commit hot path
    /// to avoid awaiting under `flush_lock`.
    pub fn try_acquire_permit(&self) -> Option<tokio::sync::OwnedSemaphorePermit> {
        self.permits.clone().try_acquire_owned().ok()
    }

    pub fn next_rotate_seq(&self) -> u64 {
        self.next_seq.fetch_add(1, Ordering::AcqRel)
    }

    pub fn note_pending(&self) {
        self.pending_count.fetch_add(1, Ordering::AcqRel);
    }

    pub fn pending_flush_count(&self) -> usize {
        self.pending_count.load(Ordering::Acquire)
    }

    /// Submit a completed-stream flush for ordered finalization.
    /// Silently drops the submission if the coordinator has been shut
    /// down (submit_tx taken).
    pub fn submit(
        &self,
        seq: u64,
        rotated: RotatedFlush,
        result: anyhow::Result<FlushOutcome>,
        ack: Option<oneshot::Sender<anyhow::Result<String>>>,
    ) {
        let submit_msg = FlushSubmit {
            seq,
            rotated,
            result,
            ack,
        };
        if let Some(tx) = self.submit_tx.lock().as_ref() {
            let _ = tx.send(submit_msg);
        }
        // else: coordinator is shutting down; pending_count will be
        // decremented by the matching drop of submit_msg (RotatedFlush
        // contains the FlushInProgressGuard which already adjusts
        // flush_in_progress on drop). We must also decrement
        // pending_count manually because the finalizer won't see this.
        else {
            self.pending_count
                .fetch_sub(1, std::sync::atomic::Ordering::AcqRel);
            self.drain_notify.notify_waiters();
        }
    }

    /// Spawn the stream phase on a tokio task and return a [`FlushTicket`].
    ///
    /// `run_stream` is the closure that actually performs the L1 stream
    /// work — it takes the rotate snapshot (`old_l0_arc`, `wal_lsn`,
    /// `current_version`, `name`) and returns the built (but not yet
    /// published) manifest as a `FlushOutcome`. The closure typically
    /// captures `Arc<Writer>` so it can call `writer.flush_stream_l1`.
    ///
    /// On stream completion, the result and the consumed `RotatedFlush`
    /// are sent through the coordinator's mpsc to the single-task
    /// finalizer, which preserves rotate-order via a BinaryHeap.
    ///
    /// The returned `FlushTicket` resolves when finalize completes
    /// (or fails). Dropping the ticket does NOT cancel the flush — the
    /// pipeline runs to completion either way.
    pub fn submit_for_stream<F, Fut>(
        self: &Arc<Self>,
        rotated: RotatedFlush,
        run_stream: F,
    ) -> FlushTicket
    where
        F: FnOnce(Arc<PlRwLock<crate::runtime::l0::L0Buffer>>, u64, u64, Option<String>) -> Fut
            + Send
            + 'static,
        Fut: std::future::Future<Output = anyhow::Result<FlushOutcome>> + Send + 'static,
    {
        let (ack_tx, ack_rx) = oneshot::channel();
        let coord = self.clone();
        let seq = rotated.seq;
        let old_l0 = rotated.old_l0_arc.clone();
        let wal_lsn = rotated.wal_lsn;
        let current_version = rotated.current_version;
        let name = rotated.name.clone();
        let handle = tokio::spawn(async move {
            let result = run_stream(old_l0, wal_lsn, current_version, name).await;
            coord.submit(seq, rotated, result, Some(ack_tx));
        });
        // Track the handle so `shutdown()` can await all stream tasks'
        // destructors. Opportunistically prune finished handles to keep
        // the vec bounded under high flush rates.
        let mut handles = self.stream_handles.lock();
        handles.retain(|h| !h.is_finished());
        handles.push(handle);
        FlushTicket::pending(ack_rx)
    }

    /// Wait until pending_count drops to zero. Used by `drop_fork`.
    pub async fn drain(&self, timeout: std::time::Duration) -> Result<(), &'static str> {
        let deadline = tokio::time::Instant::now() + timeout;
        loop {
            if self.pending_flush_count() == 0 {
                return Ok(());
            }
            let notified = self.drain_notify.notified();
            tokio::select! {
                _ = notified => continue,
                _ = tokio::time::sleep_until(deadline) => {
                    return if self.pending_flush_count() == 0 {
                        Ok(())
                    } else {
                        Err("pending flushes did not drain before deadline")
                    };
                }
            }
        }
    }
}

/// Closure run by the finalizer task. Captures the parts of Writer that
/// finalize touches; runs without holding any Writer reference.
///
/// `Writer::flush_finalize_now` implements this and is bound to the
/// concrete WAL/storage state.
pub trait FinalizeFn: Send + Sync {
    fn finalize<'a>(
        &'a self,
        rotated: RotatedFlush,
        outcome: FlushOutcome,
        shared: SharedFlushCtx,
    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Result<String>> + Send + 'a>>;

    fn finalize_failure<'a>(
        &'a self,
        rotated: RotatedFlush,
        err: anyhow::Error,
        shared: SharedFlushCtx,
    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Error> + Send + 'a>>;
}

async fn finalizer_loop(
    mut submit_rx: mpsc::UnboundedReceiver<FlushSubmit>,
    shared: SharedFlushCtx,
    finalize_fn: Arc<dyn FinalizeFn>,
    pending_count: Arc<std::sync::atomic::AtomicUsize>,
    drain_notify: Arc<tokio::sync::Notify>,
) {
    // Reorder-by-seq using a min-heap; finalize strictly in seq order.
    let mut pending: BinaryHeap<Reverse<(u64, FlushSubmit)>> = BinaryHeap::new();
    let mut expected: u64 = 0;
    while let Some(submit) = submit_rx.recv().await {
        pending.push(Reverse((submit.seq, submit)));
        while let Some(Reverse((seq, _))) = pending.peek() {
            if *seq != expected {
                break;
            }
            let Reverse((_, s)) = pending.pop().unwrap();
            let FlushSubmit {
                rotated,
                result,
                ack,
                ..
            } = s;
            let ack_result = match result {
                Ok(outcome) => finalize_fn.finalize(rotated, outcome, shared.clone()).await,
                Err(e) => {
                    let _err = finalize_fn
                        .finalize_failure(rotated, e, shared.clone())
                        .await;
                    Err(anyhow::anyhow!("flush stream failed: {}", _err))
                }
            };
            if let Some(ack) = ack {
                let _ = ack.send(ack_result);
            }
            pending_count.fetch_sub(1, Ordering::AcqRel);
            drain_notify.notify_waiters();
            expected += 1;
        }
    }
}

// We need a wrapper allowing FlushSubmit to be ordered by seq for the heap.
// Default Ord on tuples uses the first element so (u64, FlushSubmit) needs
// FlushSubmit to be Ord/PartialOrd. We don't actually compare FlushSubmits;
// the seq is at position 0 of the tuple and the heap is keyed off it. To
// avoid trait headaches we wrap manually:
impl PartialEq for FlushSubmit {
    fn eq(&self, other: &Self) -> bool {
        self.seq == other.seq
    }
}
impl Eq for FlushSubmit {}
impl PartialOrd for FlushSubmit {
    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
        Some(self.cmp(other))
    }
}
impl Ord for FlushSubmit {
    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
        self.seq.cmp(&other.seq)
    }
}