1use crate::storage::manager::{FlushInProgressGuard, StorageManager};
24use parking_lot::RwLock as PlRwLock;
25use std::cmp::Reverse;
26use std::collections::BinaryHeap;
27use std::sync::Arc;
28use std::sync::atomic::{AtomicU64, Ordering};
29use tokio::sync::{Semaphore, mpsc, oneshot};
30use uni_common::core::snapshot::SnapshotManifest;
31
32pub struct RotatedFlush {
35 pub seq: u64,
36 pub old_l0_arc: Arc<PlRwLock<crate::runtime::l0::L0Buffer>>,
37 pub wal_lsn: u64,
38 pub current_version: u64,
39 pub name: Option<String>,
40 pub parent_manifest: Option<SnapshotManifest>,
44 pub permit: tokio::sync::OwnedSemaphorePermit,
46 pub flush_in_progress_guard: FlushInProgressGuard,
50}
51
52pub struct FlushOutcome {
54 pub new_manifest: SnapshotManifest,
55 pub snapshot_id: String,
56}
57
58#[derive(Clone)]
61pub struct SharedFlushCtx {
62 pub storage: Arc<StorageManager>,
63 pub l0_manager: Arc<crate::runtime::l0_manager::L0Manager>,
64 pub adjacency_manager: Arc<crate::storage::adjacency_manager::AdjacencyManager>,
65 pub property_manager: Option<Arc<crate::runtime::property_manager::PropertyManager>>,
66 pub schema_manager: Arc<uni_common::core::schema::SchemaManager>,
67 pub cached_manifest: Arc<parking_lot::Mutex<Option<SnapshotManifest>>>,
68 pub last_flush_time: Arc<parking_lot::Mutex<std::time::Instant>>,
69 pub fork_id: Option<uni_common::core::fork::ForkId>,
70 pub fork_flush_count: Arc<std::sync::atomic::AtomicU64>,
71 pub fork_fragment_warn_fired: Arc<std::sync::atomic::AtomicBool>,
72 pub fork_fragment_warn_threshold: usize,
73 pub flush_lock: Arc<tokio::sync::Mutex<()>>,
76 pub index_rebuild_manager:
77 Arc<std::sync::OnceLock<Arc<crate::storage::index_rebuild::IndexRebuildManager>>>,
78 pub compaction_handle: Arc<parking_lot::RwLock<Option<tokio::task::JoinHandle<()>>>>,
79 pub compaction_config: uni_common::config::CompactionConfig,
80 pub index_rebuild_config: uni_common::config::IndexRebuildConfig,
81 pub auto_rebuild_enabled: bool,
82}
83
84struct FlushSubmit {
86 seq: u64,
87 rotated: RotatedFlush,
88 result: anyhow::Result<FlushOutcome>,
89 ack: Option<oneshot::Sender<anyhow::Result<String>>>,
91}
92
93pub struct FlushTicket {
95 rx: Option<oneshot::Receiver<anyhow::Result<String>>>,
97}
98
99impl FlushTicket {
100 pub fn ready(snapshot_id: anyhow::Result<String>) -> Self {
101 let (tx, rx) = oneshot::channel();
103 let _ = tx.send(snapshot_id);
104 Self { rx: Some(rx) }
105 }
106
107 pub fn pending(rx: oneshot::Receiver<anyhow::Result<String>>) -> Self {
108 Self { rx: Some(rx) }
109 }
110
111 pub async fn await_finalize(self) -> anyhow::Result<String> {
113 match self.rx {
114 Some(rx) => rx
115 .await
116 .unwrap_or_else(|_| Err(anyhow::anyhow!("flush ticket dropped before completion"))),
117 None => Err(anyhow::anyhow!("flush ticket has no completion channel")),
118 }
119 }
120}
121
122pub struct FlushCoordinator {
123 permits: Arc<Semaphore>,
124 next_seq: AtomicU64,
125 submit_tx: parking_lot::Mutex<Option<mpsc::UnboundedSender<FlushSubmit>>>,
130 pending_count: Arc<std::sync::atomic::AtomicUsize>,
133 drain_notify: Arc<tokio::sync::Notify>,
134 max_pending_flushes: usize,
135 finalizer_handle: parking_lot::Mutex<Option<tokio::task::JoinHandle<()>>>,
138 stream_handles: parking_lot::Mutex<Vec<tokio::task::JoinHandle<()>>>,
147}
148
149impl FlushCoordinator {
150 pub fn new(
151 max_pending_flushes: usize,
152 shared: SharedFlushCtx,
153 finalize_fn: Arc<dyn FinalizeFn>,
154 ) -> Self {
155 let permits = Arc::new(Semaphore::new(max_pending_flushes.max(1)));
156 let next_seq = AtomicU64::new(0);
157 let (submit_tx, submit_rx) = mpsc::unbounded_channel::<FlushSubmit>();
158 let pending_count = Arc::new(std::sync::atomic::AtomicUsize::new(0));
159 let drain_notify = Arc::new(tokio::sync::Notify::new());
160
161 let pending_count_for_task = pending_count.clone();
162 let drain_notify_for_task = drain_notify.clone();
163 let handle = tokio::spawn(finalizer_loop(
164 submit_rx,
165 shared,
166 finalize_fn,
167 pending_count_for_task,
168 drain_notify_for_task,
169 ));
170
171 Self {
172 permits,
173 next_seq,
174 submit_tx: parking_lot::Mutex::new(Some(submit_tx)),
175 pending_count,
176 drain_notify,
177 max_pending_flushes,
178 finalizer_handle: parking_lot::Mutex::new(Some(handle)),
179 stream_handles: parking_lot::Mutex::new(Vec::new()),
180 }
181 }
182
183 pub async fn shutdown(&self) {
191 let stream_handles: Vec<_> = self.stream_handles.lock().drain(..).collect();
197 for h in stream_handles {
198 let _ = h.await;
199 }
200 drop(self.submit_tx.lock().take());
203 let handle = self.finalizer_handle.lock().take();
208 if let Some(h) = handle {
209 let _ = h.await;
210 }
211 }
212
213 pub fn take_finalizer_handle(&self) -> Option<tokio::task::JoinHandle<()>> {
216 self.finalizer_handle.lock().take()
217 }
218
219 pub fn max_pending_flushes(&self) -> usize {
220 self.max_pending_flushes
221 }
222
223 pub async fn acquire_permit(&self) -> anyhow::Result<tokio::sync::OwnedSemaphorePermit> {
224 self.permits
225 .clone()
226 .acquire_owned()
227 .await
228 .map_err(|_| anyhow::anyhow!("flush coordinator permit semaphore closed"))
229 }
230
231 pub fn try_acquire_permit(&self) -> Option<tokio::sync::OwnedSemaphorePermit> {
235 self.permits.clone().try_acquire_owned().ok()
236 }
237
238 pub fn next_rotate_seq(&self) -> u64 {
239 self.next_seq.fetch_add(1, Ordering::AcqRel)
240 }
241
242 pub fn note_pending(&self) {
243 self.pending_count.fetch_add(1, Ordering::AcqRel);
244 }
245
246 pub fn pending_flush_count(&self) -> usize {
247 self.pending_count.load(Ordering::Acquire)
248 }
249
250 pub fn submit(
254 &self,
255 seq: u64,
256 rotated: RotatedFlush,
257 result: anyhow::Result<FlushOutcome>,
258 ack: Option<oneshot::Sender<anyhow::Result<String>>>,
259 ) {
260 let submit_msg = FlushSubmit {
261 seq,
262 rotated,
263 result,
264 ack,
265 };
266 if let Some(tx) = self.submit_tx.lock().as_ref() {
267 let _ = tx.send(submit_msg);
268 }
269 else {
275 self.pending_count
276 .fetch_sub(1, std::sync::atomic::Ordering::AcqRel);
277 self.drain_notify.notify_waiters();
278 }
279 }
280
281 pub fn submit_for_stream<F, Fut>(
297 self: &Arc<Self>,
298 rotated: RotatedFlush,
299 run_stream: F,
300 ) -> FlushTicket
301 where
302 F: FnOnce(Arc<PlRwLock<crate::runtime::l0::L0Buffer>>, u64, u64, Option<String>) -> Fut
303 + Send
304 + 'static,
305 Fut: std::future::Future<Output = anyhow::Result<FlushOutcome>> + Send + 'static,
306 {
307 let (ack_tx, ack_rx) = oneshot::channel();
308 let coord = self.clone();
309 let seq = rotated.seq;
310 let old_l0 = rotated.old_l0_arc.clone();
311 let wal_lsn = rotated.wal_lsn;
312 let current_version = rotated.current_version;
313 let name = rotated.name.clone();
314 let handle = tokio::spawn(async move {
315 let result =
321 run_stream_catching(run_stream(old_l0, wal_lsn, current_version, name)).await;
322 coord.submit(seq, rotated, result, Some(ack_tx));
323 });
324 let mut handles = self.stream_handles.lock();
328 handles.retain(|h| !h.is_finished());
329 handles.push(handle);
330 FlushTicket::pending(ack_rx)
331 }
332
333 pub async fn drain(&self, timeout: std::time::Duration) -> Result<(), &'static str> {
335 let deadline = tokio::time::Instant::now() + timeout;
336 loop {
337 if self.pending_flush_count() == 0 {
338 return Ok(());
339 }
340 let notified = self.drain_notify.notified();
341 tokio::select! {
342 _ = notified => continue,
343 _ = tokio::time::sleep_until(deadline) => {
344 return if self.pending_flush_count() == 0 {
345 Ok(())
346 } else {
347 Err("pending flushes did not drain before deadline")
348 };
349 }
350 }
351 }
352 }
353}
354
355pub trait FinalizeFn: Send + Sync {
361 fn finalize<'a>(
362 &'a self,
363 rotated: RotatedFlush,
364 outcome: FlushOutcome,
365 shared: SharedFlushCtx,
366 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Result<String>> + Send + 'a>>;
367
368 fn finalize_failure<'a>(
369 &'a self,
370 rotated: RotatedFlush,
371 err: anyhow::Error,
372 shared: SharedFlushCtx,
373 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Error> + Send + 'a>>;
374}
375
376async fn run_stream_catching<Fut>(fut: Fut) -> anyhow::Result<FlushOutcome>
386where
387 Fut: std::future::Future<Output = anyhow::Result<FlushOutcome>>,
388{
389 use futures::FutureExt as _;
390 match std::panic::AssertUnwindSafe(fut).catch_unwind().await {
391 Ok(result) => result,
392 Err(panic) => {
393 let msg = panic
394 .downcast_ref::<&str>()
395 .map(|s| (*s).to_string())
396 .or_else(|| panic.downcast_ref::<String>().cloned())
397 .unwrap_or_else(|| "unknown panic".to_string());
398 tracing::error!(panic = %msg, "flush stream task panicked");
399 Err(anyhow::anyhow!("flush stream task panicked: {msg}"))
400 }
401 }
402}
403
404async fn finalizer_loop(
405 mut submit_rx: mpsc::UnboundedReceiver<FlushSubmit>,
406 shared: SharedFlushCtx,
407 finalize_fn: Arc<dyn FinalizeFn>,
408 pending_count: Arc<std::sync::atomic::AtomicUsize>,
409 drain_notify: Arc<tokio::sync::Notify>,
410) {
411 let mut pending: BinaryHeap<Reverse<(u64, FlushSubmit)>> = BinaryHeap::new();
413 let mut expected: u64 = 0;
414 while let Some(submit) = submit_rx.recv().await {
415 pending.push(Reverse((submit.seq, submit)));
416 while let Some(Reverse((seq, _))) = pending.peek() {
417 if *seq != expected {
418 break;
419 }
420 let Reverse((_, s)) = pending.pop().unwrap();
421 let FlushSubmit {
422 rotated,
423 result,
424 ack,
425 ..
426 } = s;
427 let ack_result = match result {
428 Ok(outcome) => finalize_fn.finalize(rotated, outcome, shared.clone()).await,
429 Err(e) => {
430 let _err = finalize_fn
431 .finalize_failure(rotated, e, shared.clone())
432 .await;
433 Err(anyhow::anyhow!("flush stream failed: {}", _err))
434 }
435 };
436 if let Some(ack) = ack {
437 let _ = ack.send(ack_result);
438 }
439 pending_count.fetch_sub(1, Ordering::AcqRel);
440 drain_notify.notify_waiters();
441 expected += 1;
442 }
443 }
444}
445
446impl PartialEq for FlushSubmit {
452 fn eq(&self, other: &Self) -> bool {
453 self.seq == other.seq
454 }
455}
456impl Eq for FlushSubmit {}
457impl PartialOrd for FlushSubmit {
458 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
459 Some(self.cmp(other))
460 }
461}
462impl Ord for FlushSubmit {
463 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
464 self.seq.cmp(&other.seq)
465 }
466}
467
468#[cfg(test)]
469mod tests {
470 use super::*;
471
472 #[tokio::test]
477 async fn run_stream_catching_converts_panic_to_err() {
478 let normal = run_stream_catching(async { Err(anyhow::anyhow!("normal failure")) }).await;
481 let normal_err = match normal {
482 Ok(_) => panic!("normal failure should stay an error"),
483 Err(e) => e,
484 };
485 assert!(normal_err.to_string().contains("normal failure"));
486
487 let panicked = run_stream_catching(async {
489 panic!("boom in stream");
490 #[allow(unreachable_code)]
491 Ok(unreachable!())
492 })
493 .await;
494 let panic_err = match panicked {
495 Ok(_) => panic!("panic must be caught as an error"),
496 Err(e) => e,
497 };
498 assert!(
499 panic_err.to_string().contains("panicked"),
500 "error should identify the panic, got: {panic_err}"
501 );
502 }
503}