1use crate::storage::manager::{FlushInProgressGuard, StorageManager};
24use parking_lot::RwLock as PlRwLock;
25use std::cmp::Reverse;
26use std::collections::BinaryHeap;
27use std::sync::Arc;
28use std::sync::atomic::{AtomicU64, Ordering};
29use tokio::sync::{Semaphore, mpsc, oneshot};
30use uni_common::core::snapshot::SnapshotManifest;
31
32pub struct RotatedFlush {
35 pub seq: u64,
36 pub old_l0_arc: Arc<PlRwLock<crate::runtime::l0::L0Buffer>>,
37 pub wal_lsn: u64,
38 pub current_version: u64,
39 pub name: Option<String>,
40 pub parent_manifest: Option<SnapshotManifest>,
44 pub permit: tokio::sync::OwnedSemaphorePermit,
46 pub flush_in_progress_guard: FlushInProgressGuard,
50}
51
52pub struct FlushOutcome {
54 pub new_manifest: SnapshotManifest,
55 pub snapshot_id: String,
56}
57
58#[derive(Clone)]
61pub struct SharedFlushCtx {
62 pub storage: Arc<StorageManager>,
63 pub l0_manager: Arc<crate::runtime::l0_manager::L0Manager>,
64 pub adjacency_manager: Arc<crate::storage::adjacency_manager::AdjacencyManager>,
65 pub property_manager: Option<Arc<crate::runtime::property_manager::PropertyManager>>,
66 pub schema_manager: Arc<uni_common::core::schema::SchemaManager>,
67 pub cached_manifest: Arc<parking_lot::Mutex<Option<SnapshotManifest>>>,
68 pub last_flush_time: Arc<parking_lot::Mutex<std::time::Instant>>,
69 pub fork_id: Option<uni_common::core::fork::ForkId>,
70 pub fork_flush_count: Arc<std::sync::atomic::AtomicU64>,
71 pub fork_fragment_warn_fired: Arc<std::sync::atomic::AtomicBool>,
72 pub fork_fragment_warn_threshold: usize,
73 pub flush_lock: Arc<tokio::sync::Mutex<()>>,
76 pub index_rebuild_manager:
77 Arc<std::sync::OnceLock<Arc<crate::storage::index_rebuild::IndexRebuildManager>>>,
78 pub compaction_handle: Arc<parking_lot::RwLock<Option<tokio::task::JoinHandle<()>>>>,
79 pub compaction_config: uni_common::config::CompactionConfig,
80 pub index_rebuild_config: uni_common::config::IndexRebuildConfig,
81 pub auto_rebuild_enabled: bool,
82}
83
84struct FlushSubmit {
86 seq: u64,
87 rotated: RotatedFlush,
88 result: anyhow::Result<FlushOutcome>,
89 ack: Option<oneshot::Sender<anyhow::Result<String>>>,
91}
92
93pub struct FlushTicket {
95 rx: Option<oneshot::Receiver<anyhow::Result<String>>>,
97}
98
99impl FlushTicket {
100 pub fn ready(snapshot_id: anyhow::Result<String>) -> Self {
101 let (tx, rx) = oneshot::channel();
103 let _ = tx.send(snapshot_id);
104 Self { rx: Some(rx) }
105 }
106
107 pub fn pending(rx: oneshot::Receiver<anyhow::Result<String>>) -> Self {
108 Self { rx: Some(rx) }
109 }
110
111 pub async fn await_finalize(self) -> anyhow::Result<String> {
113 match self.rx {
114 Some(rx) => rx
115 .await
116 .unwrap_or_else(|_| Err(anyhow::anyhow!("flush ticket dropped before completion"))),
117 None => Err(anyhow::anyhow!("flush ticket has no completion channel")),
118 }
119 }
120}
121
122pub struct FlushCoordinator {
123 permits: Arc<Semaphore>,
124 next_seq: AtomicU64,
125 submit_tx: parking_lot::Mutex<Option<mpsc::UnboundedSender<FlushSubmit>>>,
130 pending_count: Arc<std::sync::atomic::AtomicUsize>,
133 drain_notify: Arc<tokio::sync::Notify>,
134 max_pending_flushes: usize,
135 finalizer_handle: parking_lot::Mutex<Option<tokio::task::JoinHandle<()>>>,
138 stream_handles: parking_lot::Mutex<Vec<tokio::task::JoinHandle<()>>>,
147}
148
149impl FlushCoordinator {
150 pub fn new(
151 max_pending_flushes: usize,
152 shared: SharedFlushCtx,
153 finalize_fn: Arc<dyn FinalizeFn>,
154 ) -> Self {
155 let permits = Arc::new(Semaphore::new(max_pending_flushes.max(1)));
156 let next_seq = AtomicU64::new(0);
157 let (submit_tx, submit_rx) = mpsc::unbounded_channel::<FlushSubmit>();
158 let pending_count = Arc::new(std::sync::atomic::AtomicUsize::new(0));
159 let drain_notify = Arc::new(tokio::sync::Notify::new());
160
161 let pending_count_for_task = pending_count.clone();
162 let drain_notify_for_task = drain_notify.clone();
163 let handle = tokio::spawn(finalizer_loop(
164 submit_rx,
165 shared,
166 finalize_fn,
167 pending_count_for_task,
168 drain_notify_for_task,
169 ));
170
171 Self {
172 permits,
173 next_seq,
174 submit_tx: parking_lot::Mutex::new(Some(submit_tx)),
175 pending_count,
176 drain_notify,
177 max_pending_flushes,
178 finalizer_handle: parking_lot::Mutex::new(Some(handle)),
179 stream_handles: parking_lot::Mutex::new(Vec::new()),
180 }
181 }
182
183 pub async fn shutdown(&self) {
191 let stream_handles: Vec<_> = self.stream_handles.lock().drain(..).collect();
197 for h in stream_handles {
198 let _ = h.await;
199 }
200 drop(self.submit_tx.lock().take());
203 let handle = self.finalizer_handle.lock().take();
208 if let Some(h) = handle {
209 let _ = h.await;
210 }
211 }
212
213 pub fn take_finalizer_handle(&self) -> Option<tokio::task::JoinHandle<()>> {
216 self.finalizer_handle.lock().take()
217 }
218
219 pub fn max_pending_flushes(&self) -> usize {
220 self.max_pending_flushes
221 }
222
223 pub async fn acquire_permit(&self) -> anyhow::Result<tokio::sync::OwnedSemaphorePermit> {
224 self.permits
225 .clone()
226 .acquire_owned()
227 .await
228 .map_err(|_| anyhow::anyhow!("flush coordinator permit semaphore closed"))
229 }
230
231 pub fn try_acquire_permit(&self) -> Option<tokio::sync::OwnedSemaphorePermit> {
235 self.permits.clone().try_acquire_owned().ok()
236 }
237
238 pub fn next_rotate_seq(&self) -> u64 {
239 self.next_seq.fetch_add(1, Ordering::AcqRel)
240 }
241
242 pub fn note_pending(&self) {
243 self.pending_count.fetch_add(1, Ordering::AcqRel);
244 }
245
246 pub fn pending_flush_count(&self) -> usize {
247 self.pending_count.load(Ordering::Acquire)
248 }
249
250 pub fn submit(
254 &self,
255 seq: u64,
256 rotated: RotatedFlush,
257 result: anyhow::Result<FlushOutcome>,
258 ack: Option<oneshot::Sender<anyhow::Result<String>>>,
259 ) {
260 let submit_msg = FlushSubmit {
261 seq,
262 rotated,
263 result,
264 ack,
265 };
266 if let Some(tx) = self.submit_tx.lock().as_ref() {
267 let _ = tx.send(submit_msg);
268 }
269 else {
275 self.pending_count
276 .fetch_sub(1, std::sync::atomic::Ordering::AcqRel);
277 self.drain_notify.notify_waiters();
278 }
279 }
280
281 pub fn submit_for_stream<F, Fut>(
297 self: &Arc<Self>,
298 rotated: RotatedFlush,
299 run_stream: F,
300 ) -> FlushTicket
301 where
302 F: FnOnce(Arc<PlRwLock<crate::runtime::l0::L0Buffer>>, u64, u64, Option<String>) -> Fut
303 + Send
304 + 'static,
305 Fut: std::future::Future<Output = anyhow::Result<FlushOutcome>> + Send + 'static,
306 {
307 let (ack_tx, ack_rx) = oneshot::channel();
308 let coord = self.clone();
309 let seq = rotated.seq;
310 let old_l0 = rotated.old_l0_arc.clone();
311 let wal_lsn = rotated.wal_lsn;
312 let current_version = rotated.current_version;
313 let name = rotated.name.clone();
314 let handle = tokio::spawn(async move {
315 let result = run_stream(old_l0, wal_lsn, current_version, name).await;
316 coord.submit(seq, rotated, result, Some(ack_tx));
317 });
318 let mut handles = self.stream_handles.lock();
322 handles.retain(|h| !h.is_finished());
323 handles.push(handle);
324 FlushTicket::pending(ack_rx)
325 }
326
327 pub async fn drain(&self, timeout: std::time::Duration) -> Result<(), &'static str> {
329 let deadline = tokio::time::Instant::now() + timeout;
330 loop {
331 if self.pending_flush_count() == 0 {
332 return Ok(());
333 }
334 let notified = self.drain_notify.notified();
335 tokio::select! {
336 _ = notified => continue,
337 _ = tokio::time::sleep_until(deadline) => {
338 return if self.pending_flush_count() == 0 {
339 Ok(())
340 } else {
341 Err("pending flushes did not drain before deadline")
342 };
343 }
344 }
345 }
346 }
347}
348
349pub trait FinalizeFn: Send + Sync {
355 fn finalize<'a>(
356 &'a self,
357 rotated: RotatedFlush,
358 outcome: FlushOutcome,
359 shared: SharedFlushCtx,
360 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Result<String>> + Send + 'a>>;
361
362 fn finalize_failure<'a>(
363 &'a self,
364 rotated: RotatedFlush,
365 err: anyhow::Error,
366 shared: SharedFlushCtx,
367 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Error> + Send + 'a>>;
368}
369
370async fn finalizer_loop(
371 mut submit_rx: mpsc::UnboundedReceiver<FlushSubmit>,
372 shared: SharedFlushCtx,
373 finalize_fn: Arc<dyn FinalizeFn>,
374 pending_count: Arc<std::sync::atomic::AtomicUsize>,
375 drain_notify: Arc<tokio::sync::Notify>,
376) {
377 let mut pending: BinaryHeap<Reverse<(u64, FlushSubmit)>> = BinaryHeap::new();
379 let mut expected: u64 = 0;
380 while let Some(submit) = submit_rx.recv().await {
381 pending.push(Reverse((submit.seq, submit)));
382 while let Some(Reverse((seq, _))) = pending.peek() {
383 if *seq != expected {
384 break;
385 }
386 let Reverse((_, s)) = pending.pop().unwrap();
387 let FlushSubmit {
388 rotated,
389 result,
390 ack,
391 ..
392 } = s;
393 let ack_result = match result {
394 Ok(outcome) => finalize_fn.finalize(rotated, outcome, shared.clone()).await,
395 Err(e) => {
396 let _err = finalize_fn
397 .finalize_failure(rotated, e, shared.clone())
398 .await;
399 Err(anyhow::anyhow!("flush stream failed: {}", _err))
400 }
401 };
402 if let Some(ack) = ack {
403 let _ = ack.send(ack_result);
404 }
405 pending_count.fetch_sub(1, Ordering::AcqRel);
406 drain_notify.notify_waiters();
407 expected += 1;
408 }
409 }
410}
411
412impl PartialEq for FlushSubmit {
418 fn eq(&self, other: &Self) -> bool {
419 self.seq == other.seq
420 }
421}
422impl Eq for FlushSubmit {}
423impl PartialOrd for FlushSubmit {
424 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
425 Some(self.cmp(other))
426 }
427}
428impl Ord for FlushSubmit {
429 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
430 self.seq.cmp(&other.seq)
431 }
432}