mod manifest_writer;
mod tracker;
mod uploader;
pub(crate) use manifest_writer::FlushResult;
use crate::checkpoint::CheckpointCreateResult;
use crate::config::CheckpointOptions;
use crate::db::DbInner;
use crate::db_status::ClosedResultWriter;
use crate::dispatcher::MessageHandlerExecutor;
use crate::error::SlateDBError;
use crate::manifest::store::FenceableManifest;
use crate::memtable_flusher::manifest_writer::ManifestWriter;
use crate::memtable_flusher::tracker::FlushTracker;
use crate::memtable_flusher::uploader::Uploader;
use crate::utils::SafeSender;
use log::warn;
use std::sync::Arc;
use tokio::runtime::Handle;
use tokio::sync::oneshot;
const TRACKER_TASK_NAME: &str = "l0_flush_tracker";
#[derive(Clone, Copy, Debug)]
pub(crate) enum FlushTarget {
CurrentDurable,
All,
}
pub(crate) struct MemtableFlusher {
messages_tx: SafeSender<tracker::TrackerMessage>,
messages_rx: async_channel::Receiver<tracker::TrackerMessage>,
}
impl MemtableFlusher {
pub(crate) fn new(closed_result: &dyn ClosedResultWriter) -> Self {
let (messages_tx, messages_rx) =
SafeSender::unbounded_channel(closed_result.result_reader());
Self {
messages_tx,
messages_rx,
}
}
pub(crate) fn start(
&self,
inner: Arc<DbInner>,
manifest: FenceableManifest,
tokio_handle: &Handle,
executor: &MessageHandlerExecutor,
closed_result: &dyn ClosedResultWriter,
) -> Result<(), SlateDBError> {
let uploader = Uploader::start(
Arc::clone(&inner),
closed_result,
self.messages_tx.clone(),
executor,
tokio_handle,
)?;
let manifest_writer = ManifestWriter::start(
Arc::clone(&inner),
manifest,
inner.settings.manifest_poll_interval,
closed_result,
executor,
tokio_handle,
self.messages_tx.clone(),
)?;
let tracker = FlushTracker::new(inner, uploader, manifest_writer);
executor.add_handler(
TRACKER_TASK_NAME.to_string(),
Box::new(tracker),
self.messages_rx.clone(),
tokio_handle,
)?;
Ok(())
}
pub(crate) async fn flush(&self, target: FlushTarget) -> Result<FlushResult, SlateDBError> {
let (tx, rx) = oneshot::channel();
self.messages_tx
.send(tracker::TrackerMessage::FlushRequest { target, sender: tx })?;
rx.await.map_err(SlateDBError::ReadChannelError)?
}
pub(crate) fn notify_memtable_frozen(&self) -> Result<(), SlateDBError> {
self.messages_tx
.send(tracker::TrackerMessage::MemtableFrozen)
}
pub(crate) async fn create_checkpoint(
&self,
target: FlushTarget,
options: CheckpointOptions,
) -> Result<CheckpointCreateResult, SlateDBError> {
let (tx, rx) = oneshot::channel();
self.messages_tx
.send(tracker::TrackerMessage::CheckpointRequest {
target,
options,
sender: tx,
})?;
rx.await.map_err(SlateDBError::ReadChannelError)?
}
pub(crate) async fn shutdown(executor: &MessageHandlerExecutor) {
Uploader::shutdown(executor).await;
ManifestWriter::shutdown(executor).await;
if let Err(e) = executor.shutdown_task(TRACKER_TASK_NAME).await {
warn!("failed to shutdown l0 flush tracker [error={:?}]", e);
}
}
}