use crate::kernel::io::{FileExtension, IoFactory, IoType, IoWriter};
use crate::kernel::lsm::log::{LogLoader, LogWriter};
use crate::kernel::lsm::storage::{Config, Gen};
use crate::kernel::lsm::table::loader::TableLoader;
use crate::kernel::lsm::version::cleaner::Cleaner;
use crate::kernel::lsm::version::edit::VersionEdit;
use crate::kernel::lsm::version::{
snapshot_gen, Version, DEFAULT_SS_TABLE_PATH, DEFAULT_VERSION_PATH,
};
use crate::kernel::KernelResult;
use std::mem;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::sync::mpsc::unbounded_channel;
use tokio::sync::RwLock;
use tracing::info;
struct VersionInner {
version: Arc<Version>,
ver_log_writer: (LogWriter<Box<dyn IoWriter>>, i64),
}
pub(crate) struct VersionStatus {
inner: RwLock<VersionInner>,
ss_table_loader: Arc<TableLoader>,
log_factory: Arc<IoFactory>,
edit_approximate_count: AtomicUsize,
}
impl VersionStatus {
pub(crate) fn load_with_path(config: Config, wal: LogLoader) -> KernelResult<Self> {
let sst_path = config.path().join(DEFAULT_SS_TABLE_PATH);
let sst_factory = Arc::new(IoFactory::new(sst_path, FileExtension::SSTable)?);
let ss_table_loader = Arc::new(TableLoader::new(
config.clone(),
Arc::clone(&sst_factory),
wal,
)?);
let log_factory = Arc::new(IoFactory::new(
config.path().join(DEFAULT_VERSION_PATH),
FileExtension::Log,
)?);
let mut version_logs = Vec::new();
let (ver_log_loader, log_gen) = LogLoader::reload(
config.path(),
(DEFAULT_VERSION_PATH, Some(snapshot_gen(&log_factory)?)),
IoType::Direct,
&mut version_logs,
|bytes, records| {
records.append(&mut bincode::deserialize::<Vec<VersionEdit>>(bytes)?);
Ok(())
},
)?;
let edit_approximate_count = AtomicUsize::new(version_logs.len());
let (clean_tx, clean_rx) = unbounded_channel();
let version = Arc::new(Version::load_from_log(
version_logs,
&ss_table_loader,
clean_tx,
)?);
let mut cleaner = Cleaner::new(&ss_table_loader, clean_rx);
let _ignore = tokio::spawn(async move {
cleaner.listen().await;
});
let mut ver_log_writer = ver_log_loader.writer(log_gen)?;
let _ = ver_log_writer.seek_end()?;
Ok(Self {
inner: RwLock::new(VersionInner {
version,
ver_log_writer: ((ver_log_writer), log_gen),
}),
ss_table_loader,
log_factory,
edit_approximate_count,
})
}
pub(crate) async fn current(&self) -> Arc<Version> {
Arc::clone(&self.inner.read().await.version)
}
pub(crate) async fn log_and_apply(
&self,
vec_version_edit: Vec<VersionEdit>,
snapshot_threshold: usize,
) -> KernelResult<()> {
let mut new_version = Version::clone(self.current().await.as_ref());
let mut inner = self.inner.write().await;
info!("[Version Status][log_and_apply]: {new_version}");
if self.edit_approximate_count.load(Ordering::Relaxed) >= snapshot_threshold {
Self::write_snap_shot(&mut inner, &self.log_factory).await?;
} else {
let _ = self.edit_approximate_count.fetch_add(1, Ordering::Relaxed);
}
let _ = inner
.ver_log_writer
.0
.add_record(&bincode::serialize(&vec_version_edit)?)?;
new_version.apply(vec_version_edit)?;
inner.version = Arc::new(new_version);
Ok(())
}
async fn write_snap_shot(
inner: &mut VersionInner,
log_factory: &IoFactory,
) -> KernelResult<()> {
let version = &inner.version;
info!(
"[Version: {}][write_snap_shot]: Start Snapshot!",
version.version_num
);
let new_gen = Gen::create();
let new_writer = log_factory.writer(new_gen, IoType::Direct)?;
let (mut old_writer, old_gen) = mem::replace(
&mut inner.ver_log_writer,
(LogWriter::new(new_writer), new_gen),
);
old_writer.flush()?;
let snap_shot_version_edits = version.to_vec_edit();
let _ = inner
.ver_log_writer
.0
.add_record(&bincode::serialize(&snap_shot_version_edits)?)?;
log_factory.clean(old_gen)?;
Ok(())
}
pub(crate) fn loader(&self) -> &TableLoader {
&self.ss_table_loader
}
}