use std::{
fs::{File, create_dir_all},
io::Write,
path::{Path, PathBuf},
sync::{Arc, OnceLock},
};
use byteorder::{BigEndian, WriteBytesExt};
use tracing::{info, instrument};
use crate::database::{
config::{DbConfig, bloom_config::BloomConfig, index_config::IndexConfig},
factory::wal::build_wal_manger,
iterator::merged_iterator::MergedIterator,
memtable::Memtable,
sstable::{
errors::SSTableError,
metadata::{
SSTMetadata, SSTableFooter, SSTableKeyRange, bloom_filter::bloom_factory::BloomFactory,
index::index_factory::IndexFactory,
},
version::{Version, version_update::VersionUpdate},
},
wal::WAL,
};
pub struct VersionManager {
root_dir: PathBuf,
version: Arc<Version>,
version_wal: Box<dyn WAL>,
config: Arc<DbConfig>,
cleaner_channel_producer: std::sync::mpsc::Sender<(Arc<Version>, Vec<PathBuf>)>,
}
impl VersionManager {
#[instrument(name = "Verion Manger New", skip(config, cleaner_channel_producer))]
pub fn new(
config: Arc<DbConfig>,
cleaner_channel_producer: std::sync::mpsc::Sender<(Arc<Version>, Vec<PathBuf>)>,
) -> Result<Self, SSTableError> {
info!("Creating new Version Manager");
let sstable_root_dir = config.sstable_root_dir.clone();
let mut wal_config = config.wal.clone();
wal_config.wal_sync_variant = config
.version_manager
.version_manager_sync_mode
.clone()
.into();
wal_config.wal_dir = sstable_root_dir.join("version");
create_dir_all(&sstable_root_dir).unwrap();
let version_wal = build_wal_manger(&wal_config)?;
let mut version = Version::new(Vec::default(), 0);
let version_iterator = version_wal.read(0)?;
let mut version_update_operations = vec![];
let mut last_commited_offset = 0;
info!("Reading the Version WAL");
for version_update_encoded in version_iterator {
if let Ok((_, update_bytes)) = version_update_encoded {
let mut version_update = VersionUpdate::decode(&mut update_bytes.as_slice())?;
last_commited_offset =
std::cmp::max(version_update.wal_offset, last_commited_offset);
version_update_operations.append(&mut version_update.operations);
} else {
return Err(SSTableError::General(
"Error happend while reading the version update manifest".into(),
));
}
}
let mut version_update = VersionUpdate::new(last_commited_offset);
version_update.operations = version_update_operations;
info!(
"Read Version WAL found {} total version operations, and last committed offset {}",
version_update.operations.len(),
last_commited_offset
);
Version::normalize_version_update_operation(&mut version_update, &sstable_root_dir)?;
version.apply_update(version_update)?;
Ok(Self {
root_dir: sstable_root_dir,
version: Arc::new(version),
version_wal,
config,
cleaner_channel_producer,
})
}
pub fn get_latest_version(&self) -> Arc<Version> {
self.version.clone()
}
#[allow(unused)]
pub fn push_memtable(&self, mt: Arc<dyn Memtable>) -> Result<SSTMetadata, SSTableError> {
assert!(mt.size() > 0);
let new_table_id = format!("{}", mt.get_id());
let l0_dir = self.root_dir.join("l0");
create_dir_all(&l0_dir)?;
let new_table_path = l0_dir.join(&new_table_id);
let mut writer = Vec::new();
let mut bloom = BloomFactory::build_bloom_filter(&self.config.bloom, mt.num_enteries());
let mut index = IndexFactory::build_index(&self.config.index);
let mut bytes_encoded = 0;
let mut byte_encoded_since_last_index = self.config.index.index_block_min_size;
let mt_iter = mt.iter(None, None);
let first_key = mt_iter
.first_entry()
.expect("Memtable to Be flushed should contain atleast one entry")
.get_key()
.into();
let last_key = mt_iter.last_entry().unwrap().get_key().into();
for i in mt_iter {
if byte_encoded_since_last_index >= self.config.index.index_block_min_size {
index.add_entry(i.get_key(), bytes_encoded);
byte_encoded_since_last_index = 0;
}
let bytes_encoded_for_this_entry = i.encode(&mut writer)?;
byte_encoded_since_last_index += bytes_encoded_for_this_entry as usize;
bytes_encoded += bytes_encoded_for_this_entry;
bloom.add(i.get_key());
}
index.add_last_offset(bytes_encoded);
let mut bloom_filter_size = 0;
let bloom_name = bloom.get_name().as_bytes();
writer.write_u16::<BigEndian>(bloom_name.len() as u16)?;
bloom_filter_size += 2;
writer.write_all(bloom_name)?;
bloom_filter_size += bloom_name.len() as u64;
bloom_filter_size += bloom.serialize(&mut writer)?;
let mut index_size = 0;
let index_name = index.get_name().as_bytes();
writer.write_u16::<BigEndian>(index_name.len() as u16)?;
index_size += 2;
writer.write_all(index_name)?;
index_size += index_name.len() as u64;
index_size += index.serialize(&mut writer)?;
let key_range = SSTableKeyRange {
first_key,
last_key,
};
let key_range_block_size = key_range.serialize(&mut writer)?;
let footer = SSTableFooter::new(
bytes_encoded,
bloom_filter_size,
index_size,
key_range_block_size,
);
footer.seriealize(&mut writer)?;
File::options()
.append(true)
.create_new(true)
.open(&new_table_path)?
.write_all(&writer)?;
let sst_meta = SSTMetadata::new(
*mt.get_id(),
bloom.into(),
index.into(),
key_range.first_key,
key_range.last_key,
OnceLock::new(),
new_table_path,
footer,
);
Ok(sst_meta)
}
pub fn push_memtable_static(
sstable_root_dir: &Path,
bloom_config: &BloomConfig,
index_config: &IndexConfig,
mt: Arc<dyn Memtable>,
) -> Result<SSTMetadata, SSTableError> {
assert!(mt.size() > 0);
let new_table_id = format!("{}", mt.get_id());
let l0_dir = sstable_root_dir.join("l0");
create_dir_all(&l0_dir)?;
let new_table_path = l0_dir.join(&new_table_id);
let mut writer = Vec::new();
let mut bloom = BloomFactory::build_bloom_filter(bloom_config, mt.num_enteries());
let mut index = IndexFactory::build_index(index_config);
let mut bytes_encoded = 0;
let mut byte_encoded_since_last_index = index_config.index_block_min_size;
let mt_iter = mt.iter(None, None);
let first_key = mt_iter
.first_entry()
.expect("Memtable to Be flushed should contain atleast one entry")
.get_key()
.into();
let last_key = mt_iter.last_entry().unwrap().get_key().into();
for i in mt_iter {
if byte_encoded_since_last_index >= index_config.index_block_min_size {
index.add_entry(i.get_key(), bytes_encoded);
byte_encoded_since_last_index = 0;
}
let bytes_encoded_for_this_entry = i.encode(&mut writer)?;
byte_encoded_since_last_index += bytes_encoded_for_this_entry as usize;
bytes_encoded += bytes_encoded_for_this_entry;
bloom.add(i.get_key());
}
index.add_last_offset(bytes_encoded);
let mut bloom_filter_size = 0;
let bloom_name = bloom.get_name().as_bytes();
writer.write_u16::<BigEndian>(bloom_name.len() as u16)?;
bloom_filter_size += 2;
writer.write_all(bloom_name)?;
bloom_filter_size += bloom_name.len() as u64;
bloom_filter_size += bloom.serialize(&mut writer)?;
let mut index_size = 0;
let index_name = index.get_name().as_bytes();
writer.write_u16::<BigEndian>(index_name.len() as u16)?;
index_size += 2;
writer.write_all(index_name)?;
index_size += index_name.len() as u64;
index_size += index.serialize(&mut writer)?;
let key_range = SSTableKeyRange {
first_key,
last_key,
};
let key_range_block_size = key_range.serialize(&mut writer)?;
let footer = SSTableFooter::new(
bytes_encoded,
bloom_filter_size,
index_size,
key_range_block_size,
);
footer.seriealize(&mut writer)?;
File::options()
.append(true)
.create_new(true)
.open(&new_table_path)?
.write_all(&writer)?;
let sst_meta = SSTMetadata::new(
*mt.get_id(),
bloom.into(),
index.into(),
key_range.first_key,
key_range.last_key,
OnceLock::new(),
new_table_path,
footer,
);
Ok(sst_meta)
}
pub fn push_version_update(&mut self, mut update: VersionUpdate) -> Result<(), SSTableError> {
let mut buff = vec![];
update.encode(&mut buff)?;
self.version_wal.append_log(&buff)?;
let mut updated_version = (*self.version).clone();
let deleted_files = std::mem::take(&mut update.files_to_be_deleted);
updated_version.apply_update(update)?;
let old_version = std::mem::replace(&mut self.version, Arc::new(updated_version));
let _ = self
.cleaner_channel_producer
.send((old_version, deleted_files));
Ok(())
}
pub fn iter(
&self,
start_key: Option<&[u8]>,
end_key: Option<&[u8]>,
) -> Result<MergedIterator, SSTableError> {
self.version.iter(start_key, end_key)
}
}