use crate::parsers::{FileReader, FileWriter, MMapReader, StdReader, Writer};
use core::marker::PhantomData;
use s2json::Properties;
use serde::{Serialize, de::DeserializeOwned};
use std::{
env, format,
fs::{self},
path::Path,
string::String,
time::{SystemTime, UNIX_EPOCH},
vec,
vec::Vec,
};
use super::{U64, external_sort};
#[derive(Debug, Default)]
pub struct FileOptions {
is_sorted: Option<bool>,
max_heap: Option<usize>,
thread_count: Option<usize>,
tmp_dir: Option<String>,
}
#[derive(Debug, Clone)]
pub enum FileState<R: StdReader> {
Read(R),
Write(FileWriter),
Empty,
}
pub const KEY_STORE_LENGTH: u64 = 20;
pub type S2FileStore<K, V> = S2BaseStore<FileReader, K, V>;
pub type S2MMapStore<K, V> = S2BaseStore<MMapReader, K, V>;
#[derive(Debug, Clone)]
pub struct S2BaseStore<
R: StdReader = FileReader,
K: U64 = u64,
V: Serialize + DeserializeOwned = Properties,
> {
tmp_dir: String,
file_name: String,
size: u64,
sorted: bool,
max_heap: Option<usize>,
thread_count: Option<usize>,
value_offset: u64,
key_file: FileState<R>,
value_file: FileState<R>,
_phantom: PhantomData<(K, V)>,
}
impl<R: StdReader, K: U64, V: Serialize + DeserializeOwned> Default for S2BaseStore<R, K, V> {
fn default() -> Self {
S2BaseStore {
tmp_dir: String::new(),
file_name: String::new(),
size: 0,
sorted: false,
max_heap: None,
thread_count: None,
value_offset: 0,
key_file: FileState::Empty,
value_file: FileState::Empty,
_phantom: PhantomData,
}
}
}
impl<R: StdReader, K: U64, V: Serialize + DeserializeOwned> S2BaseStore<R, K, V> {
pub fn new(file_name: Option<&str>, options: Option<FileOptions>) -> Self {
let mut file = Self::default();
let options = options.unwrap_or_default();
file.tmp_dir = options.tmp_dir.clone().unwrap_or_else(|| build_tmp_dir(None));
file.file_name = file_name
.map(|f| f.into())
.unwrap_or_else(|| build_tmp_file_name(file.tmp_dir.clone()));
file.sorted = options.is_sorted.unwrap_or(false);
file.max_heap = options.max_heap;
file.thread_count = options.thread_count;
if !file.sorted {
file.switch_to_write_state();
} else {
file.switch_to_read_state();
}
if let Ok(stat) = fs::metadata(format!("{}.keys", file.file_name)) {
file.size = stat.len() / KEY_STORE_LENGTH;
}
file
}
pub fn len(&self) -> u64 {
self.size
}
pub fn is_empty(&self) -> bool {
self.size == 0
}
pub fn set(&mut self, key: K, value: V) {
let key = key.into();
self.switch_to_write_state();
self.sorted = false;
let (FileState::Write(key_file), FileState::Write(value_file)) =
(&mut self.key_file, &mut self.value_file)
else {
panic!("Not in write state");
};
let vec_key = u64::to_le_bytes(key).to_vec();
let value_str = serde_json::to_vec(&value).unwrap();
let value_offest = u64::to_le_bytes(self.value_offset).to_vec();
let value_length = u32::to_le_bytes(value_str.len() as u32).to_vec();
key_file.append(&vec_key);
key_file.append(&value_offest);
key_file.append(&value_length);
value_file.append(&value_str);
self.value_offset += value_str.len() as u64;
self.size += 1;
}
pub fn has(&mut self, key: K) -> bool {
if self.is_empty() {
return false;
}
let key = key.into();
self.switch_to_read_state();
let lower_index = self.lower_bound(key);
if lower_index >= self.size {
return false;
}
let lower_key = self.get_key(lower_index);
lower_key == key
}
pub fn get(&mut self, key: K, max: Option<usize>) -> Option<(u64, Vec<V>)> {
if self.is_empty() {
return None;
}
self.switch_to_read_state();
let key = key.into();
let mut lower_index = self.lower_bound(key);
if lower_index >= self.size {
return None;
}
let max = max.unwrap_or(usize::MAX);
let mut res = vec![];
while res.len() < max && lower_index < self.size {
let (search_key, value_index, value_length) = self.get_key_value(lower_index);
if search_key != key {
break;
}
let value = self.get_value(value_index, value_length);
res.push(serde_json::from_slice(&value).unwrap());
lower_index += 1;
}
if res.is_empty() { None } else { Some((lower_index - res.len() as u64, res)) }
}
pub fn get_index(&mut self, index: u64) -> Option<(K, V)> {
if index >= self.size {
return None;
}
self.switch_to_read_state();
let (search_key, value_index, value_length) = self.get_key_value(index);
let value = self.get_value(value_index, value_length);
Some((K::from(search_key), serde_json::from_slice(&value).unwrap()))
}
pub fn sort(&mut self) {
if self.sorted || self.is_empty() {
return;
}
let inputs: Vec<&str> = vec![&self.file_name];
external_sort(
&inputs,
&self.file_name,
self.max_heap,
self.thread_count,
Some(&self.tmp_dir),
);
self.sorted = true;
}
pub fn cleanup(&mut self) {
fs::remove_file(format!("{}.keys", self.file_name)).unwrap();
fs::remove_file(format!("{}.values", self.file_name)).unwrap();
self.sorted = false;
self.size = 0;
self.value_offset = 0;
}
fn switch_to_write_state(&mut self) {
match &self.key_file {
FileState::Write(_) => {}
_ => {
self.key_file =
FileState::Write(FileWriter::new(format!("{}.keys", self.file_name)).unwrap());
}
}
match &self.value_file {
FileState::Write(_) => {}
_ => {
self.value_file = FileState::Write(
FileWriter::new(format!("{}.values", self.file_name)).unwrap(),
);
}
}
}
fn switch_to_read_state(&mut self) {
match &self.key_file {
FileState::Read(_) => {}
_ => {
self.key_file =
FileState::Read(R::new(format!("{}.keys", self.file_name)).unwrap());
}
}
match &self.value_file {
FileState::Read(_) => {}
_ => {
self.value_file =
FileState::Read(R::new(format!("{}.values", self.file_name)).unwrap());
}
}
self.sort();
}
fn lower_bound(&mut self, id: u64) -> u64 {
let mut lo: u64 = 0;
let mut hi: u64 = self.size;
let mut mid: u64;
while lo < hi {
mid = lo + (hi - lo) / 2;
let lo_hi = self.get_key(mid);
if lo_hi < id {
lo = mid + 1;
} else {
hi = mid;
}
}
lo
}
fn get_key(&mut self, index: u64) -> u64 {
if let FileState::Read(file) = &mut self.key_file {
file.uint64_le(Some(index * KEY_STORE_LENGTH))
} else {
panic!("Not in read state");
}
}
fn get_key_value(&mut self, index: u64) -> (u64, u64, u32) {
if let FileState::Read(file) = &mut self.key_file {
(
file.uint64_le(Some(index * KEY_STORE_LENGTH)),
file.uint64_le(Some(index * KEY_STORE_LENGTH + 8)),
file.uint32_le(Some(index * KEY_STORE_LENGTH + 16)),
)
} else {
panic!("Not in read state");
}
}
fn get_value(&mut self, index: u64, length: u32) -> Vec<u8> {
if let FileState::Read(file) = &mut self.value_file {
file.slice(Some(index), Some(index + length as u64))
} else {
panic!("Not in read state");
}
}
pub fn iter(&mut self) -> Iter<'_, R, K, V> {
Iter { container: self, index: 0 }
}
pub fn iter_multi(&mut self) -> IterMulti<'_, R, K, V> {
IterMulti { container: self, index: 0 }
}
}
#[derive(Debug)]
pub struct Iter<'a, R: StdReader, K: U64, V: Serialize + DeserializeOwned> {
container: &'a mut S2BaseStore<R, K, V>,
index: u64,
}
impl<R: StdReader, K: U64, V: Serialize + DeserializeOwned> Iterator for Iter<'_, R, K, V> {
type Item = (K, V);
fn next(&mut self) -> Option<Self::Item> {
let result = self.container.get_index(self.index);
self.index += 1;
result
}
}
#[derive(Debug)]
pub struct IterMulti<'a, R: StdReader, K: U64, V: Serialize + DeserializeOwned> {
container: &'a mut S2BaseStore<R, K, V>,
index: u64,
}
impl<R: StdReader, K: U64, V: Serialize + DeserializeOwned> Iterator for IterMulti<'_, R, K, V> {
type Item = (K, Vec<V>);
fn next(&mut self) -> Option<Self::Item> {
let first = self.container.get_index(self.index);
self.index += 1;
if let Some((key, value)) = first {
let mut result: (K, Vec<V>) = (key, vec![value]);
loop {
let next = self.container.get_index(self.index);
if let Some((next_key, next_value)) = next {
if next_key == key {
self.index += 1;
result.1.push(next_value);
} else {
return Some(result);
}
}
}
} else {
None
}
}
}
fn build_tmp_dir(tmp_dir: Option<String>) -> String {
tmp_dir.unwrap_or_else(|| {
let tmp_dir = env::temp_dir().join("s2_data_store");
fs::create_dir_all(&tmp_dir).unwrap();
tmp_dir.to_string_lossy().into()
})
}
fn build_tmp_file_name(tmp_dir: String) -> String {
let random_name = format!(
"tmp_{:?}",
SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos() );
let file_name = format!("{tmp_dir}/{random_name}");
if Path::new(&file_name).exists() {
fs::remove_file(&file_name).unwrap();
}
file_name
}