use super::writer::Writer;
use crate::Keyspace;
use lsm_tree::{AbstractTree, SeqNo};
use std::{path::PathBuf, sync::MutexGuard};
#[derive(Clone)]
pub struct EvictionWatermark {
pub(crate) keyspace: Keyspace,
pub(crate) lsn: SeqNo,
}
impl std::fmt::Debug for EvictionWatermark {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}:{}", self.keyspace.name, self.lsn)
}
}
pub struct Item {
pub(crate) path: PathBuf,
pub(crate) size_in_bytes: u64,
pub(crate) watermarks: Vec<EvictionWatermark>,
}
impl std::fmt::Debug for Item {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"JournalManagerItem {:?} => {:#?}",
self.path, self.watermarks
)
}
}
#[expect(clippy::module_name_repetitions)]
#[derive(Debug)]
pub struct JournalManager {
items: Vec<Item>,
disk_space_in_bytes: u64,
}
impl Drop for JournalManager {
fn drop(&mut self) {
log::trace!("Dropping journal manager");
#[cfg(feature = "__internal_whitebox")]
crate::drop::decrement_drop_counter();
}
}
impl JournalManager {
pub(crate) fn new() -> Self {
#[cfg(feature = "__internal_whitebox")]
crate::drop::increment_drop_counter();
Self {
items: Vec::with_capacity(10),
disk_space_in_bytes: 0,
}
}
pub(crate) fn clear(&mut self) {
self.items.clear();
}
pub(crate) fn enqueue(&mut self, item: Item) {
self.disk_space_in_bytes = self.disk_space_in_bytes.saturating_add(item.size_in_bytes);
self.items.push(item);
}
pub(crate) fn journal_count(&self) -> usize {
self.sealed_journal_count() + 1
}
pub(crate) fn sealed_journal_count(&self) -> usize {
self.items.len()
}
pub(crate) fn disk_space_used(&self) -> u64 {
self.disk_space_in_bytes
}
pub(crate) fn get_keyspaces_to_flush_for_oldest_journal_eviction(&self) -> Vec<Keyspace> {
let mut items = vec![];
if let Some(item) = self.items.first() {
for item in &item.watermarks {
let Some(partition_seqno) = item.keyspace.tree.get_highest_persisted_seqno() else {
items.push(item.keyspace.clone());
continue;
};
if partition_seqno < item.lsn {
items.push(item.keyspace.clone());
}
}
}
items
}
pub(crate) fn maintenance(&mut self) -> crate::Result<()> {
log::debug!("Running journal maintenance");
loop {
let Some(item) = self.items.first() else {
return Ok(());
};
for item in &item.watermarks {
if !item
.keyspace
.is_deleted
.load(std::sync::atomic::Ordering::Acquire)
{
let Some(keyspace_seqno) = item.keyspace.tree.get_highest_persisted_seqno()
else {
return Ok(());
};
if keyspace_seqno < item.lsn {
log::trace!(
"Keyspace {:?} not flushed enough to evict journal",
item.keyspace.name,
);
return Ok(());
}
}
}
log::trace!("Removing fully flushed journal at {}", item.path.display());
std::fs::remove_file(&item.path).inspect_err(|e| {
log::error!(
"Failed to clean up stale journal file at {}: {e:?}",
item.path.display(),
);
})?;
self.disk_space_in_bytes = self.disk_space_in_bytes.saturating_sub(item.size_in_bytes);
self.items.remove(0);
}
}
pub(crate) fn rotate_journal(
&mut self,
journal_writer: &mut MutexGuard<Writer>,
watermarks: Vec<EvictionWatermark>,
) -> crate::Result<()> {
let journal_size = journal_writer.len()?;
let (sealed_path, _) = journal_writer.rotate()?;
self.enqueue(Item {
path: sealed_path,
watermarks,
size_in_bytes: journal_size,
});
Ok(())
}
}