use crate::kernel::lsm::compactor::{CompactTask, MergeShardingVec};
use crate::kernel::lsm::mem_table::{key_value_bytes_len, KeyValue};
use crate::kernel::lsm::storage::Gen;
use crate::kernel::lsm::version::Version;
use crate::kernel::KernelResult;
use crate::KernelError;
use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::mpsc::Sender;
pub mod compactor;
pub mod iterator;
mod log;
mod mem_table;
pub mod mvcc;
pub mod storage;
mod table;
pub mod trigger;
pub mod version;
const MAX_LEVEL: usize = 4;
fn data_sharding(mut vec_data: Vec<KeyValue>, file_size: usize) -> MergeShardingVec {
let part_size =
(vec_data.iter().map(key_value_bytes_len).sum::<usize>() + file_size - 1) / file_size;
vec_data.reverse();
let mut vec_sharding = vec![(0, Vec::new()); part_size];
let slice = vec_sharding.as_mut_slice();
for i in 0..part_size {
slice[i].0 = Gen::create();
let mut data_len = 0;
while !vec_data.is_empty() {
if let Some(key_value) = vec_data.pop() {
data_len += key_value_bytes_len(&key_value);
if data_len >= file_size && i < part_size - 1 {
slice[i + 1].1.push(key_value);
break;
}
slice[i].1.push(key_value);
} else {
break;
}
}
}
vec_sharding.retain(|(_, vec)| !vec.is_empty());
vec_sharding
}
fn query_and_compaction(
key: &[u8],
version: &Version,
compactor_tx: &Sender<CompactTask>,
) -> KernelResult<Option<KeyValue>> {
let (value_option, miss_option) = version.query(key)?;
if let Some(miss_scope) = miss_option {
if let Err(TrySendError::Closed(_)) = compactor_tx.try_send(CompactTask::Seek(miss_scope)) {
return Err(KernelError::ChannelClose);
}
}
if let Some(key_value) = value_option {
return Ok(Some(key_value));
}
Ok(None)
}