Skip to main content

uni_store/runtime/
flush_coordinator.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Async-flush coordination.
5//!
6//! Bounds the number of in-flight L0→L1 flushes (via a semaphore),
7//! assigns rotate-order sequence numbers, and serializes finalize so
8//! the manifest parent-chain stays consistent.
9//!
10//! ## Architecture
11//!
12//! ```text
13//! Writer
14//!   ├── flush_lock              (brief: rotate + finalize)
15//!   └── flush_coordinator
16//!         ├── permits: Semaphore(max_pending_flushes)
17//!         ├── next_seq: AtomicU64
18//!         └── submit_tx → finalizer task
19//!                          └─ mpsc<FlushSubmit>
20//!                          └─ BinaryHeap reorder by seq
21//! ```
22
23use crate::storage::manager::{FlushInProgressGuard, StorageManager};
24use parking_lot::RwLock as PlRwLock;
25use std::cmp::Reverse;
26use std::collections::BinaryHeap;
27use std::sync::Arc;
28use std::sync::atomic::{AtomicU64, Ordering};
29use tokio::sync::{Semaphore, mpsc, oneshot};
30use uni_common::core::snapshot::SnapshotManifest;
31
32/// Result of a rotate phase: the snapshot of state needed to stream and
33/// finalize. Send + 'static so it can travel into a spawned task.
34pub struct RotatedFlush {
35    pub seq: u64,
36    pub old_l0_arc: Arc<PlRwLock<crate::runtime::l0::L0Buffer>>,
37    pub wal_lsn: u64,
38    pub current_version: u64,
39    pub name: Option<String>,
40    /// Snapshot of `cached_manifest` taken at rotate time. Stream uses this
41    /// as a tentative parent; finalize may rewrite it if predecessors
42    /// finalized in between.
43    pub parent_manifest: Option<SnapshotManifest>,
44    /// Permit holding the back-pressure slot. Released on finalize drop.
45    pub permit: tokio::sync::OwnedSemaphorePermit,
46    /// Acquired during rotate; dropped when this `RotatedFlush` is consumed
47    /// by finalize (success or failure). Keeps `flush_in_progress` accurate
48    /// for the full async pipeline duration.
49    pub flush_in_progress_guard: FlushInProgressGuard,
50}
51
52/// Result of a stream phase: the manifest to publish.
53pub struct FlushOutcome {
54    pub new_manifest: SnapshotManifest,
55    pub snapshot_id: String,
56}
57
58/// Carried across the spawn boundary so a finalize step can run without
59/// touching `Writer` (which is `&self` and lives in the caller).
60#[derive(Clone)]
61pub struct SharedFlushCtx {
62    pub storage: Arc<StorageManager>,
63    pub l0_manager: Arc<crate::runtime::l0_manager::L0Manager>,
64    pub adjacency_manager: Arc<crate::storage::adjacency_manager::AdjacencyManager>,
65    pub property_manager: Option<Arc<crate::runtime::property_manager::PropertyManager>>,
66    pub schema_manager: Arc<uni_common::core::schema::SchemaManager>,
67    pub cached_manifest: Arc<parking_lot::Mutex<Option<SnapshotManifest>>>,
68    pub last_flush_time: Arc<parking_lot::Mutex<std::time::Instant>>,
69    pub fork_id: Option<uni_common::core::fork::ForkId>,
70    pub fork_flush_count: Arc<std::sync::atomic::AtomicU64>,
71    pub fork_fragment_warn_fired: Arc<std::sync::atomic::AtomicBool>,
72    pub fork_fragment_warn_threshold: usize,
73    /// Re-acquired by the static `flush_finalize_now` running on the
74    /// finalizer task. NOT held during stream — that's the whole point.
75    pub flush_lock: Arc<tokio::sync::Mutex<()>>,
76    pub index_rebuild_manager:
77        Arc<std::sync::OnceLock<Arc<crate::storage::index_rebuild::IndexRebuildManager>>>,
78    pub compaction_handle: Arc<parking_lot::RwLock<Option<tokio::task::JoinHandle<()>>>>,
79    pub compaction_config: uni_common::config::CompactionConfig,
80    pub index_rebuild_config: uni_common::config::IndexRebuildConfig,
81    pub auto_rebuild_enabled: bool,
82}
83
84/// A submission to the ordered finalizer.
85struct FlushSubmit {
86    seq: u64,
87    rotated: RotatedFlush,
88    result: anyhow::Result<FlushOutcome>,
89    /// Optional notification when finalize completes (for `FlushTicket`).
90    ack: Option<oneshot::Sender<anyhow::Result<String>>>,
91}
92
93/// User-facing handle to wait on an async-flush completion (proposal §5.6).
94pub struct FlushTicket {
95    /// `None` means the flush completed inline (sync path).
96    rx: Option<oneshot::Receiver<anyhow::Result<String>>>,
97}
98
99impl FlushTicket {
100    pub fn ready(snapshot_id: anyhow::Result<String>) -> Self {
101        // For sync paths: pre-resolved.
102        let (tx, rx) = oneshot::channel();
103        let _ = tx.send(snapshot_id);
104        Self { rx: Some(rx) }
105    }
106
107    pub fn pending(rx: oneshot::Receiver<anyhow::Result<String>>) -> Self {
108        Self { rx: Some(rx) }
109    }
110
111    /// Wait for the flush to finalize. Returns the snapshot id on success.
112    pub async fn await_finalize(self) -> anyhow::Result<String> {
113        match self.rx {
114            Some(rx) => rx
115                .await
116                .unwrap_or_else(|_| Err(anyhow::anyhow!("flush ticket dropped before completion"))),
117            None => Err(anyhow::anyhow!("flush ticket has no completion channel")),
118        }
119    }
120}
121
122pub struct FlushCoordinator {
123    permits: Arc<Semaphore>,
124    next_seq: AtomicU64,
125    /// Wrapped in Mutex<Option<...>> so `shutdown()` can take and drop
126    /// it explicitly, which closes the mpsc and lets the finalizer task
127    /// exit. `submit()` reads through the option; if absent, the
128    /// submission is silently dropped (coordinator is shutting down).
129    submit_tx: parking_lot::Mutex<Option<mpsc::UnboundedSender<FlushSubmit>>>,
130    /// Counter exposed for `drop_fork` to wait on. Incremented at rotate,
131    /// decremented after finalize.
132    pending_count: Arc<std::sync::atomic::AtomicUsize>,
133    drain_notify: Arc<tokio::sync::Notify>,
134    max_pending_flushes: usize,
135    /// Tracked for `ShutdownHandle::track_task` registration AND for
136    /// `shutdown()`'s await. Set to None after either takes it.
137    finalizer_handle: parking_lot::Mutex<Option<tokio::task::JoinHandle<()>>>,
138    /// Every spawned stream-phase task. `shutdown()` awaits each so
139    /// the closure-captured `Arc<Writer>` (and through it
140    /// `Arc<StorageManager>` + `Arc<ForkScope>` on a fork-scoped
141    /// writer) actually drops before `shutdown` returns. Without this,
142    /// `drop_fork` sees a transient `ForkInUse` because the stream
143    /// task's destructor is still on tokio's scheduler queue after
144    /// `drain()` returned. Opportunistically pruned in
145    /// `submit_for_stream` to keep the vec bounded.
146    stream_handles: parking_lot::Mutex<Vec<tokio::task::JoinHandle<()>>>,
147}
148
149impl FlushCoordinator {
150    pub fn new(
151        max_pending_flushes: usize,
152        shared: SharedFlushCtx,
153        finalize_fn: Arc<dyn FinalizeFn>,
154    ) -> Self {
155        let permits = Arc::new(Semaphore::new(max_pending_flushes.max(1)));
156        let next_seq = AtomicU64::new(0);
157        let (submit_tx, submit_rx) = mpsc::unbounded_channel::<FlushSubmit>();
158        let pending_count = Arc::new(std::sync::atomic::AtomicUsize::new(0));
159        let drain_notify = Arc::new(tokio::sync::Notify::new());
160
161        let pending_count_for_task = pending_count.clone();
162        let drain_notify_for_task = drain_notify.clone();
163        let handle = tokio::spawn(finalizer_loop(
164            submit_rx,
165            shared,
166            finalize_fn,
167            pending_count_for_task,
168            drain_notify_for_task,
169        ));
170
171        Self {
172            permits,
173            next_seq,
174            submit_tx: parking_lot::Mutex::new(Some(submit_tx)),
175            pending_count,
176            drain_notify,
177            max_pending_flushes,
178            finalizer_handle: parking_lot::Mutex::new(Some(handle)),
179            stream_handles: parking_lot::Mutex::new(Vec::new()),
180        }
181    }
182
183    /// Drop the submit channel and await the finalizer task to exit.
184    /// After this returns, the coordinator's spawned task is gone and
185    /// any Arcs it held (including the writer's `Arc<StorageManager>`
186    /// inside `SharedFlushCtx`, which on a fork-scoped writer pins
187    /// `Arc<ForkScope>`) are released. Used by `drop_fork` so the
188    /// ForkHolderGuard can finally drop. Idempotent: safe to call
189    /// repeatedly.
190    pub async fn shutdown(&self) {
191        // 1. Drain every spawned stream task. Each task's destructor
192        //    drops the closure-captured `Arc<Writer>` (and through it
193        //    `Arc<StorageManager>` / `Arc<ForkScope>`). Awaiting forces
194        //    those drops to happen before we return — closing the L8
195        //    fork-drop race documented in the plan.
196        let stream_handles: Vec<_> = self.stream_handles.lock().drain(..).collect();
197        for h in stream_handles {
198            let _ = h.await;
199        }
200        // 2. Drop submit_tx — closes the mpsc; the finalizer task will
201        //    receive None and exit its loop.
202        drop(self.submit_tx.lock().take());
203        // 3. Await the finalizer task. If already taken (e.g. by
204        //    ShutdownHandle::track_task), the JoinHandle is None and we
205        //    have no way to await — accept that and return; the task
206        //    is still on its way to exit because submit_tx is closed.
207        let handle = self.finalizer_handle.lock().take();
208        if let Some(h) = handle {
209            let _ = h.await;
210        }
211    }
212
213    /// Hand off the finalizer task's JoinHandle for tracking by
214    /// `ShutdownHandle`. Returns `None` if already taken.
215    pub fn take_finalizer_handle(&self) -> Option<tokio::task::JoinHandle<()>> {
216        self.finalizer_handle.lock().take()
217    }
218
219    pub fn max_pending_flushes(&self) -> usize {
220        self.max_pending_flushes
221    }
222
223    pub async fn acquire_permit(&self) -> anyhow::Result<tokio::sync::OwnedSemaphorePermit> {
224        self.permits
225            .clone()
226            .acquire_owned()
227            .await
228            .map_err(|_| anyhow::anyhow!("flush coordinator permit semaphore closed"))
229    }
230
231    /// Non-blocking variant of [`Self::acquire_permit`]. Returns `None`
232    /// if the permit pool is at capacity. Used on the commit hot path
233    /// to avoid awaiting under `flush_lock`.
234    pub fn try_acquire_permit(&self) -> Option<tokio::sync::OwnedSemaphorePermit> {
235        self.permits.clone().try_acquire_owned().ok()
236    }
237
238    pub fn next_rotate_seq(&self) -> u64 {
239        self.next_seq.fetch_add(1, Ordering::AcqRel)
240    }
241
242    pub fn note_pending(&self) {
243        self.pending_count.fetch_add(1, Ordering::AcqRel);
244    }
245
246    pub fn pending_flush_count(&self) -> usize {
247        self.pending_count.load(Ordering::Acquire)
248    }
249
250    /// Submit a completed-stream flush for ordered finalization.
251    /// Silently drops the submission if the coordinator has been shut
252    /// down (submit_tx taken).
253    pub fn submit(
254        &self,
255        seq: u64,
256        rotated: RotatedFlush,
257        result: anyhow::Result<FlushOutcome>,
258        ack: Option<oneshot::Sender<anyhow::Result<String>>>,
259    ) {
260        let submit_msg = FlushSubmit {
261            seq,
262            rotated,
263            result,
264            ack,
265        };
266        if let Some(tx) = self.submit_tx.lock().as_ref() {
267            let _ = tx.send(submit_msg);
268        }
269        // else: coordinator is shutting down; pending_count will be
270        // decremented by the matching drop of submit_msg (RotatedFlush
271        // contains the FlushInProgressGuard which already adjusts
272        // flush_in_progress on drop). We must also decrement
273        // pending_count manually because the finalizer won't see this.
274        else {
275            self.pending_count
276                .fetch_sub(1, std::sync::atomic::Ordering::AcqRel);
277            self.drain_notify.notify_waiters();
278        }
279    }
280
281    /// Spawn the stream phase on a tokio task and return a [`FlushTicket`].
282    ///
283    /// `run_stream` is the closure that actually performs the L1 stream
284    /// work — it takes the rotate snapshot (`old_l0_arc`, `wal_lsn`,
285    /// `current_version`, `name`) and returns the built (but not yet
286    /// published) manifest as a `FlushOutcome`. The closure typically
287    /// captures `Arc<Writer>` so it can call `writer.flush_stream_l1`.
288    ///
289    /// On stream completion, the result and the consumed `RotatedFlush`
290    /// are sent through the coordinator's mpsc to the single-task
291    /// finalizer, which preserves rotate-order via a BinaryHeap.
292    ///
293    /// The returned `FlushTicket` resolves when finalize completes
294    /// (or fails). Dropping the ticket does NOT cancel the flush — the
295    /// pipeline runs to completion either way.
296    pub fn submit_for_stream<F, Fut>(
297        self: &Arc<Self>,
298        rotated: RotatedFlush,
299        run_stream: F,
300    ) -> FlushTicket
301    where
302        F: FnOnce(Arc<PlRwLock<crate::runtime::l0::L0Buffer>>, u64, u64, Option<String>) -> Fut
303            + Send
304            + 'static,
305        Fut: std::future::Future<Output = anyhow::Result<FlushOutcome>> + Send + 'static,
306    {
307        let (ack_tx, ack_rx) = oneshot::channel();
308        let coord = self.clone();
309        let seq = rotated.seq;
310        let old_l0 = rotated.old_l0_arc.clone();
311        let wal_lsn = rotated.wal_lsn;
312        let current_version = rotated.current_version;
313        let name = rotated.name.clone();
314        let handle = tokio::spawn(async move {
315            let result = run_stream(old_l0, wal_lsn, current_version, name).await;
316            coord.submit(seq, rotated, result, Some(ack_tx));
317        });
318        // Track the handle so `shutdown()` can await all stream tasks'
319        // destructors. Opportunistically prune finished handles to keep
320        // the vec bounded under high flush rates.
321        let mut handles = self.stream_handles.lock();
322        handles.retain(|h| !h.is_finished());
323        handles.push(handle);
324        FlushTicket::pending(ack_rx)
325    }
326
327    /// Wait until pending_count drops to zero. Used by `drop_fork`.
328    pub async fn drain(&self, timeout: std::time::Duration) -> Result<(), &'static str> {
329        let deadline = tokio::time::Instant::now() + timeout;
330        loop {
331            if self.pending_flush_count() == 0 {
332                return Ok(());
333            }
334            let notified = self.drain_notify.notified();
335            tokio::select! {
336                _ = notified => continue,
337                _ = tokio::time::sleep_until(deadline) => {
338                    return if self.pending_flush_count() == 0 {
339                        Ok(())
340                    } else {
341                        Err("pending flushes did not drain before deadline")
342                    };
343                }
344            }
345        }
346    }
347}
348
349/// Closure run by the finalizer task. Captures the parts of Writer that
350/// finalize touches; runs without holding any Writer reference.
351///
352/// `Writer::flush_finalize_now` implements this and is bound to the
353/// concrete WAL/storage state.
354pub trait FinalizeFn: Send + Sync {
355    fn finalize<'a>(
356        &'a self,
357        rotated: RotatedFlush,
358        outcome: FlushOutcome,
359        shared: SharedFlushCtx,
360    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Result<String>> + Send + 'a>>;
361
362    fn finalize_failure<'a>(
363        &'a self,
364        rotated: RotatedFlush,
365        err: anyhow::Error,
366        shared: SharedFlushCtx,
367    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Error> + Send + 'a>>;
368}
369
370async fn finalizer_loop(
371    mut submit_rx: mpsc::UnboundedReceiver<FlushSubmit>,
372    shared: SharedFlushCtx,
373    finalize_fn: Arc<dyn FinalizeFn>,
374    pending_count: Arc<std::sync::atomic::AtomicUsize>,
375    drain_notify: Arc<tokio::sync::Notify>,
376) {
377    // Reorder-by-seq using a min-heap; finalize strictly in seq order.
378    let mut pending: BinaryHeap<Reverse<(u64, FlushSubmit)>> = BinaryHeap::new();
379    let mut expected: u64 = 0;
380    while let Some(submit) = submit_rx.recv().await {
381        pending.push(Reverse((submit.seq, submit)));
382        while let Some(Reverse((seq, _))) = pending.peek() {
383            if *seq != expected {
384                break;
385            }
386            let Reverse((_, s)) = pending.pop().unwrap();
387            let FlushSubmit {
388                rotated,
389                result,
390                ack,
391                ..
392            } = s;
393            let ack_result = match result {
394                Ok(outcome) => finalize_fn.finalize(rotated, outcome, shared.clone()).await,
395                Err(e) => {
396                    let _err = finalize_fn
397                        .finalize_failure(rotated, e, shared.clone())
398                        .await;
399                    Err(anyhow::anyhow!("flush stream failed: {}", _err))
400                }
401            };
402            if let Some(ack) = ack {
403                let _ = ack.send(ack_result);
404            }
405            pending_count.fetch_sub(1, Ordering::AcqRel);
406            drain_notify.notify_waiters();
407            expected += 1;
408        }
409    }
410}
411
412// We need a wrapper allowing FlushSubmit to be ordered by seq for the heap.
413// Default Ord on tuples uses the first element so (u64, FlushSubmit) needs
414// FlushSubmit to be Ord/PartialOrd. We don't actually compare FlushSubmits;
415// the seq is at position 0 of the tuple and the heap is keyed off it. To
416// avoid trait headaches we wrap manually:
417impl PartialEq for FlushSubmit {
418    fn eq(&self, other: &Self) -> bool {
419        self.seq == other.seq
420    }
421}
422impl Eq for FlushSubmit {}
423impl PartialOrd for FlushSubmit {
424    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
425        Some(self.cmp(other))
426    }
427}
428impl Ord for FlushSubmit {
429    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
430        self.seq.cmp(&other.seq)
431    }
432}