use crate::storage::manager::{FlushInProgressGuard, StorageManager};
use parking_lot::RwLock as PlRwLock;
use std::cmp::Reverse;
use std::collections::BinaryHeap;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use tokio::sync::{Semaphore, mpsc, oneshot};
use uni_common::core::snapshot::SnapshotManifest;
pub struct RotatedFlush {
pub seq: u64,
pub old_l0_arc: Arc<PlRwLock<crate::runtime::l0::L0Buffer>>,
pub wal_lsn: u64,
pub current_version: u64,
pub name: Option<String>,
pub parent_manifest: Option<SnapshotManifest>,
pub permit: tokio::sync::OwnedSemaphorePermit,
pub flush_in_progress_guard: FlushInProgressGuard,
}
pub struct FlushOutcome {
pub new_manifest: SnapshotManifest,
pub snapshot_id: String,
}
#[derive(Clone)]
pub struct SharedFlushCtx {
pub storage: Arc<StorageManager>,
pub l0_manager: Arc<crate::runtime::l0_manager::L0Manager>,
pub adjacency_manager: Arc<crate::storage::adjacency_manager::AdjacencyManager>,
pub property_manager: Option<Arc<crate::runtime::property_manager::PropertyManager>>,
pub schema_manager: Arc<uni_common::core::schema::SchemaManager>,
pub cached_manifest: Arc<parking_lot::Mutex<Option<SnapshotManifest>>>,
pub last_flush_time: Arc<parking_lot::Mutex<std::time::Instant>>,
pub fork_id: Option<uni_common::core::fork::ForkId>,
pub fork_flush_count: Arc<std::sync::atomic::AtomicU64>,
pub fork_fragment_warn_fired: Arc<std::sync::atomic::AtomicBool>,
pub fork_fragment_warn_threshold: usize,
pub flush_lock: Arc<tokio::sync::Mutex<()>>,
pub index_rebuild_manager:
Arc<std::sync::OnceLock<Arc<crate::storage::index_rebuild::IndexRebuildManager>>>,
pub compaction_handle: Arc<parking_lot::RwLock<Option<tokio::task::JoinHandle<()>>>>,
pub compaction_config: uni_common::config::CompactionConfig,
pub index_rebuild_config: uni_common::config::IndexRebuildConfig,
pub auto_rebuild_enabled: bool,
}
struct FlushSubmit {
seq: u64,
rotated: RotatedFlush,
result: anyhow::Result<FlushOutcome>,
ack: Option<oneshot::Sender<anyhow::Result<String>>>,
}
pub struct FlushTicket {
rx: Option<oneshot::Receiver<anyhow::Result<String>>>,
}
impl FlushTicket {
pub fn ready(snapshot_id: anyhow::Result<String>) -> Self {
let (tx, rx) = oneshot::channel();
let _ = tx.send(snapshot_id);
Self { rx: Some(rx) }
}
pub fn pending(rx: oneshot::Receiver<anyhow::Result<String>>) -> Self {
Self { rx: Some(rx) }
}
pub async fn await_finalize(self) -> anyhow::Result<String> {
match self.rx {
Some(rx) => rx
.await
.unwrap_or_else(|_| Err(anyhow::anyhow!("flush ticket dropped before completion"))),
None => Err(anyhow::anyhow!("flush ticket has no completion channel")),
}
}
}
pub struct FlushCoordinator {
permits: Arc<Semaphore>,
next_seq: AtomicU64,
submit_tx: parking_lot::Mutex<Option<mpsc::UnboundedSender<FlushSubmit>>>,
pending_count: Arc<std::sync::atomic::AtomicUsize>,
drain_notify: Arc<tokio::sync::Notify>,
max_pending_flushes: usize,
finalizer_handle: parking_lot::Mutex<Option<tokio::task::JoinHandle<()>>>,
stream_handles: parking_lot::Mutex<Vec<tokio::task::JoinHandle<()>>>,
}
impl FlushCoordinator {
pub fn new(
max_pending_flushes: usize,
shared: SharedFlushCtx,
finalize_fn: Arc<dyn FinalizeFn>,
) -> Self {
let permits = Arc::new(Semaphore::new(max_pending_flushes.max(1)));
let next_seq = AtomicU64::new(0);
let (submit_tx, submit_rx) = mpsc::unbounded_channel::<FlushSubmit>();
let pending_count = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let drain_notify = Arc::new(tokio::sync::Notify::new());
let pending_count_for_task = pending_count.clone();
let drain_notify_for_task = drain_notify.clone();
let handle = tokio::spawn(finalizer_loop(
submit_rx,
shared,
finalize_fn,
pending_count_for_task,
drain_notify_for_task,
));
Self {
permits,
next_seq,
submit_tx: parking_lot::Mutex::new(Some(submit_tx)),
pending_count,
drain_notify,
max_pending_flushes,
finalizer_handle: parking_lot::Mutex::new(Some(handle)),
stream_handles: parking_lot::Mutex::new(Vec::new()),
}
}
pub async fn shutdown(&self) {
let stream_handles: Vec<_> = self.stream_handles.lock().drain(..).collect();
for h in stream_handles {
let _ = h.await;
}
drop(self.submit_tx.lock().take());
let handle = self.finalizer_handle.lock().take();
if let Some(h) = handle {
let _ = h.await;
}
}
pub fn take_finalizer_handle(&self) -> Option<tokio::task::JoinHandle<()>> {
self.finalizer_handle.lock().take()
}
pub fn max_pending_flushes(&self) -> usize {
self.max_pending_flushes
}
pub async fn acquire_permit(&self) -> anyhow::Result<tokio::sync::OwnedSemaphorePermit> {
self.permits
.clone()
.acquire_owned()
.await
.map_err(|_| anyhow::anyhow!("flush coordinator permit semaphore closed"))
}
pub fn try_acquire_permit(&self) -> Option<tokio::sync::OwnedSemaphorePermit> {
self.permits.clone().try_acquire_owned().ok()
}
pub fn next_rotate_seq(&self) -> u64 {
self.next_seq.fetch_add(1, Ordering::AcqRel)
}
pub fn note_pending(&self) {
self.pending_count.fetch_add(1, Ordering::AcqRel);
}
pub fn pending_flush_count(&self) -> usize {
self.pending_count.load(Ordering::Acquire)
}
pub fn submit(
&self,
seq: u64,
rotated: RotatedFlush,
result: anyhow::Result<FlushOutcome>,
ack: Option<oneshot::Sender<anyhow::Result<String>>>,
) {
let submit_msg = FlushSubmit {
seq,
rotated,
result,
ack,
};
if let Some(tx) = self.submit_tx.lock().as_ref() {
let _ = tx.send(submit_msg);
}
else {
self.pending_count
.fetch_sub(1, std::sync::atomic::Ordering::AcqRel);
self.drain_notify.notify_waiters();
}
}
pub fn submit_for_stream<F, Fut>(
self: &Arc<Self>,
rotated: RotatedFlush,
run_stream: F,
) -> FlushTicket
where
F: FnOnce(Arc<PlRwLock<crate::runtime::l0::L0Buffer>>, u64, u64, Option<String>) -> Fut
+ Send
+ 'static,
Fut: std::future::Future<Output = anyhow::Result<FlushOutcome>> + Send + 'static,
{
let (ack_tx, ack_rx) = oneshot::channel();
let coord = self.clone();
let seq = rotated.seq;
let old_l0 = rotated.old_l0_arc.clone();
let wal_lsn = rotated.wal_lsn;
let current_version = rotated.current_version;
let name = rotated.name.clone();
let handle = tokio::spawn(async move {
let result = run_stream(old_l0, wal_lsn, current_version, name).await;
coord.submit(seq, rotated, result, Some(ack_tx));
});
let mut handles = self.stream_handles.lock();
handles.retain(|h| !h.is_finished());
handles.push(handle);
FlushTicket::pending(ack_rx)
}
pub async fn drain(&self, timeout: std::time::Duration) -> Result<(), &'static str> {
let deadline = tokio::time::Instant::now() + timeout;
loop {
if self.pending_flush_count() == 0 {
return Ok(());
}
let notified = self.drain_notify.notified();
tokio::select! {
_ = notified => continue,
_ = tokio::time::sleep_until(deadline) => {
return if self.pending_flush_count() == 0 {
Ok(())
} else {
Err("pending flushes did not drain before deadline")
};
}
}
}
}
}
pub trait FinalizeFn: Send + Sync {
fn finalize<'a>(
&'a self,
rotated: RotatedFlush,
outcome: FlushOutcome,
shared: SharedFlushCtx,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Result<String>> + Send + 'a>>;
fn finalize_failure<'a>(
&'a self,
rotated: RotatedFlush,
err: anyhow::Error,
shared: SharedFlushCtx,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Error> + Send + 'a>>;
}
async fn finalizer_loop(
mut submit_rx: mpsc::UnboundedReceiver<FlushSubmit>,
shared: SharedFlushCtx,
finalize_fn: Arc<dyn FinalizeFn>,
pending_count: Arc<std::sync::atomic::AtomicUsize>,
drain_notify: Arc<tokio::sync::Notify>,
) {
let mut pending: BinaryHeap<Reverse<(u64, FlushSubmit)>> = BinaryHeap::new();
let mut expected: u64 = 0;
while let Some(submit) = submit_rx.recv().await {
pending.push(Reverse((submit.seq, submit)));
while let Some(Reverse((seq, _))) = pending.peek() {
if *seq != expected {
break;
}
let Reverse((_, s)) = pending.pop().unwrap();
let FlushSubmit {
rotated,
result,
ack,
..
} = s;
let ack_result = match result {
Ok(outcome) => finalize_fn.finalize(rotated, outcome, shared.clone()).await,
Err(e) => {
let _err = finalize_fn
.finalize_failure(rotated, e, shared.clone())
.await;
Err(anyhow::anyhow!("flush stream failed: {}", _err))
}
};
if let Some(ack) = ack {
let _ = ack.send(ack_result);
}
pending_count.fetch_sub(1, Ordering::AcqRel);
drain_notify.notify_waiters();
expected += 1;
}
}
}
impl PartialEq for FlushSubmit {
fn eq(&self, other: &Self) -> bool {
self.seq == other.seq
}
}
impl Eq for FlushSubmit {}
impl PartialOrd for FlushSubmit {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for FlushSubmit {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.seq.cmp(&other.seq)
}
}