use std::fmt::Debug;
use std::fs::{File, OpenOptions};
use std::hash::Hash;
use std::io::{self, BufRead, Seek, SeekFrom, Write};
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use bincode::deserialize_from;
use hashbrown::HashMap;
use log::LevelFilter;
use rayon::prelude::*;
use serde::de::DeserializeOwned;
use serde::Serialize;
use simple_logger::SimpleLogger;
use time::macros::format_description;
use crate::types::BinaryKv;
#[derive(Debug, Clone)]
pub struct QuickConfiguration
{
pub path: Option<PathBuf>,
pub logs: bool,
pub log_level: Option<LevelFilter>,
}
impl QuickConfiguration
{
pub fn new(path: Option<PathBuf>, logs: bool, log_level: Option<LevelFilter>) -> Self
{
Self { path, logs, log_level }
}
}
impl Default for QuickConfiguration
{
fn default() -> Self
{
Self {
path: Some(PathBuf::from("db.qkv")),
logs: false,
log_level: Some(LevelFilter::Info),
}
}
}
#[cfg(feature = "full")]
#[derive(Debug, Clone)]
pub struct QuickClient<T>
where
T: Serialize + DeserializeOwned + Clone + Debug + Eq + PartialEq + Hash + Send + Sync,
{
pub file: Arc<Mutex<File>>,
pub cache: Arc<Mutex<HashMap<String, BinaryKv<T>>>>,
pub config: QuickConfiguration,
}
impl<T> QuickClient<T>
where
T: Serialize + DeserializeOwned + Clone + Debug + Eq + PartialEq + Hash + Send + Sync,
{
pub fn new(config: Option<QuickConfiguration>) -> std::io::Result<Self>
{
let config = match config {
Some(config) => config,
None => QuickConfiguration::default(),
};
if config.clone().logs {
let log_level = match config.clone().log_level {
Some(log_level) => log_level,
None => QuickConfiguration::default().log_level.unwrap(),
};
SimpleLogger::new()
.with_colors(true)
.with_threads(true)
.with_level(log_level)
.with_timestamp_format(format_description!("[year]-[month]-[day] [hour]:[minute]:[second]"))
.init()
.unwrap();
}
let file = match OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(&config.clone().path.unwrap())
{
Ok(file) => file,
Err(e) => {
return Err(io::Error::new(io::ErrorKind::Other, format!("Error opening file: {:?}", e)));
}
};
log::info!("QuickSchemaClient Initialized!");
Ok(Self {
file: Arc::new(Mutex::new(file)),
cache: Arc::new(Mutex::new(HashMap::new())),
config,
})
}
pub fn get(&mut self, key: &str) -> std::io::Result<Option<T>>
where
T: Clone,
{
log::info!("[GET] Searching for key: {}", key);
{
let cache = self.cache.lock().unwrap();
if let Some(entry) = cache.get(key) {
log::debug!("[GET] Found cached key: {}", key);
return Ok(Some(entry.value.clone()));
}
}
let mut file = match self.file.lock() {
Ok(file) => file,
Err(e) => {
return Err(io::Error::new(io::ErrorKind::Other, format!("Error locking file: {:?}", e)));
}
};
let mut reader = io::BufReader::new(&mut *file);
reader.seek(SeekFrom::Start(0))?;
let key_clone = key.to_string();
let result = reader
.lines()
.par_bridge()
.filter_map(|line| {
if let Ok(line) = line {
let mut line_reader = io::Cursor::new(line);
match deserialize_from::<_, BinaryKv<T>>(&mut line_reader) {
Ok(BinaryKv { key: entry_key, value }) if key == entry_key => {
self.cache
.lock()
.unwrap()
.insert(key_clone.clone(), BinaryKv::new(key_clone.clone(), value.clone()));
log::debug!("[GET] Caching uncached key: {}", key_clone);
log::debug!("[GET] Found key: {}", key_clone);
Some(value)
}
Err(e) => {
if let bincode::ErrorKind::Io(io_err) = e.as_ref() {
if io_err.kind() == io::ErrorKind::UnexpectedEof {
None
} else {
None
}
} else {
None
}
}
_ => None,
}
} else {
None
}
})
.collect::<Vec<T>>();
if result.is_empty() {
log::debug!("[GET] Key not found: {}", key);
return Ok(None);
}
log::info!("[GET] Key found: {}", key);
Ok(Some(result[0].clone()))
}
pub fn set(&mut self, key: &str, value: T) -> std::io::Result<()>
{
log::info!("[SET] Setting key: {}", key);
{
if self.cache.lock().unwrap().get(key).is_some() {
log::debug!("[SET] Key already exists, updating {} instead", key);
return self.update(key, value);
}
}
let mut file = match self.file.lock() {
Ok(file) => file,
Err(e) => {
return Err(io::Error::new(io::ErrorKind::Other, format!("Error locking file: {:?}", e)));
}
};
let mut writer = io::BufWriter::new(&mut *file);
let data = BinaryKv::new(key.to_string(), value.clone());
let serialized = match bincode::serialize(&data) {
Ok(data) => data,
Err(e) => {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("Error serializing data: {:?}", e),
));
}
};
writer.write_all(&serialized)?;
writer.get_ref().sync_all()?;
self.cache
.lock()
.unwrap()
.insert(key.to_string(), BinaryKv::new(key.to_string(), value.clone()));
log::info!("[SET] Key set: {}", key);
Ok(())
}
pub fn delete(&mut self, key: &str) -> std::io::Result<()>
{
log::info!("[DELETE] Deleting key: {}", key);
{
if self.cache.lock().unwrap().remove(key).is_none() {
return Ok(());
}
}
let mut file = match self.file.lock() {
Ok(file) => file,
Err(e) => {
return Err(io::Error::new(io::ErrorKind::Other, format!("Error locking file: {:?}", e)));
}
};
let mut reader = io::BufReader::new(&mut *file);
let mut updated_buffer = Vec::new();
loop {
match deserialize_from::<_, BinaryKv<T>>(&mut reader) {
Ok(BinaryKv { key: entry_key, .. }) if key != entry_key => {
updated_buffer.extend_from_slice(reader.buffer());
}
Ok(_) => {
}
Err(e) => {
if let bincode::ErrorKind::Io(io_err) = e.as_ref() {
if io_err.kind() == io::ErrorKind::UnexpectedEof {
break;
}
}
}
}
}
drop(reader); let mut writer = io::BufWriter::new(&mut *file);
writer.seek(SeekFrom::Start(0))?;
writer.write_all(&updated_buffer)?;
writer.get_ref().sync_all()?;
self.cache.lock().unwrap().remove(key);
log::debug!("[DELETE] Cache deleted: {}", key);
log::info!("[DELETE] Key deleted: {}", key);
Ok(())
}
pub fn update(&mut self, key: &str, value: T) -> std::io::Result<()>
{
log::info!("[UPDATE] Updating key: {}", key);
{
if self.cache.lock().unwrap().get(key).is_none() {
log::debug!("[UPDATE] Key not found, attempting to set {} instead", key);
return self.set(key, value);
};
}
let mut file = match self.file.lock() {
Ok(file) => file,
Err(e) => {
return Err(io::Error::new(io::ErrorKind::Other, format!("Error locking file: {:?}", e)));
}
};
let mut reader = io::BufReader::new(&mut *file);
reader.seek(SeekFrom::Start(0))?;
let mut updated_entries = Vec::new();
let mut updated = false;
loop {
match deserialize_from::<_, BinaryKv<T>>(&mut reader) {
Ok(entry) => {
if key == entry.key {
let mut updated_entry = entry.clone();
updated_entry.value = value.clone();
updated_entries.push(updated_entry);
updated = true;
} else {
updated_entries.push(entry);
}
}
Err(e) => {
if let bincode::ErrorKind::Io(io_err) = e.as_ref() {
if io_err.kind() == io::ErrorKind::UnexpectedEof {
break;
}
}
}
}
}
if !updated {
log::warn!(
"[UPDATE] Key not found: {}. This should not trigger, if it did some cache may be invalid.",
key
);
return Err(io::Error::new(io::ErrorKind::Other, format!("Key not found: {}", key)));
}
drop(reader); let mut writer = io::BufWriter::new(&mut *file);
writer.seek(SeekFrom::Start(0))?;
for entry in updated_entries.iter() {
let serialized = match bincode::serialize(entry) {
Ok(data) => data,
Err(e) => {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("Error serializing data: {:?}", e),
));
}
};
writer.write_all(&serialized)?;
}
writer.get_ref().sync_all()?;
self.cache
.lock()
.unwrap()
.insert(key.to_string(), BinaryKv::new(key.to_string(), value.clone()));
log::debug!("[UPDATE] Cache updated: {}", key);
log::info!("[UPDATE] Key updated: {}", key);
Ok(())
}
pub fn clear(&mut self) -> std::io::Result<()>
{
log::info!("[CLEAR] Clearing database");
let mut file = match self.file.lock() {
Ok(file) => file,
Err(e) => {
return Err(io::Error::new(io::ErrorKind::Other, format!("Error locking file: {:?}", e)));
}
};
let mut writer = io::BufWriter::new(&mut *file);
writer.get_mut().set_len(0)?;
writer.seek(SeekFrom::Start(0))?;
writer.get_ref().sync_all()?;
self.cache.lock().unwrap().clear();
log::debug!("[CLEAR] Cache cleared");
log::info!("[CLEAR] Database cleared");
Ok(())
}
pub fn get_all(&mut self) -> std::io::Result<Vec<BinaryKv<T>>>
{
log::info!("[GET_ALL] Fetching all data in db cache...");
let cache = &self.cache.lock().unwrap();
let all_results: Vec<BinaryKv<T>> = cache
.par_iter() .map(|(_, entry)| entry.clone()) .collect();
log::info!("[GET_ALL] Fetched all data in db");
Ok(all_results)
}
pub fn get_many(&mut self, keys: Vec<String>) -> std::io::Result<Vec<BinaryKv<T>>>
{
log::info!("[GET_MANY] Fetching many keys from db cache...");
let cache_guard = self.cache.lock().unwrap();
let results: Vec<BinaryKv<T>> = keys
.par_iter() .filter_map(|key| cache_guard.get(key).cloned()) .collect();
log::info!("[GET_MANY] Fetched {} keys from db", results.len());
Ok(results)
}
pub fn set_many(&mut self, values: Vec<BinaryKv<T>>) -> std::io::Result<()>
{
log::info!("[SET_MANY] Setting many keys in db...");
let mut to_update = Vec::new();
{
let cache_guard = self.cache.lock().unwrap();
for entry in values.iter() {
if cache_guard.get(&entry.key).is_some() {
to_update.push(entry.clone());
}
}
}
if !to_update.is_empty() {
log::debug!(
"[SET_MANY] Found {} keys that already exist, updating them instead of calling set",
to_update.len()
);
self.update_many(to_update)?;
}
let mut file = match self.file.lock() {
Ok(file) => file,
Err(e) => {
return Err(io::Error::new(io::ErrorKind::Other, format!("Error locking file: {:?}", e)));
}
};
let mut writer = io::BufWriter::new(&mut *file);
let mut serialized = Vec::new();
for entry in values.iter() {
serialized.push(BinaryKv::new(entry.key.clone(), entry.value.clone()))
}
log::debug!("[SET_MANY] Serialized {} keys", serialized.len());
let serialized = match bincode::serialize(&serialized) {
Ok(data) => data,
Err(e) => {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("Error serializing data: {:?}", e),
));
}
};
writer.write_all(&serialized)?;
writer.get_ref().sync_all()?;
log::debug!("[SET_MANY] Wrote {} keys to file", serialized.len());
{
let mut cache_guard = self.cache.lock().unwrap();
for entry in values.iter() {
cache_guard.insert(entry.key.clone(), BinaryKv::new(entry.key.clone(), entry.value.clone()));
}
}
log::info!("[SET_MANY] Set {} keys in db", values.len());
Ok(())
}
pub fn delete_many(&mut self, keys: Vec<String>) -> std::io::Result<()>
{
log::info!("[DELETE_MANY] Deleting many keys from db...");
{
if self.cache.lock().unwrap().is_empty() {
log::debug!("[DELETE_MANY] Cache is empty, nothing to delete");
return Ok(());
}
}
let mut valid_keys = Vec::new();
{
let cache_guard = self.cache.lock().unwrap();
for key in keys {
if cache_guard.get(&key).is_some() {
valid_keys.push(key);
}
}
}
let vkc = valid_keys.clone();
if valid_keys.is_empty() {
log::debug!("[DELETE_MANY] No valid keys found, nothing to delete");
return Ok(());
}
let mut file = match self.file.lock() {
Ok(file) => file,
Err(e) => {
return Err(io::Error::new(io::ErrorKind::Other, format!("Error locking file: {:?}", e)));
}
};
let mut reader = io::BufReader::new(&mut *file);
let mut updated_buffer = Vec::new();
loop {
match deserialize_from::<_, BinaryKv<T>>(&mut reader) {
Ok(BinaryKv { key: entry_key, .. }) if valid_keys.contains(&entry_key) => {
updated_buffer.extend_from_slice(reader.buffer());
}
Ok(_) => {
}
Err(e) => {
if let bincode::ErrorKind::Io(io_err) = e.as_ref() {
if io_err.kind() == io::ErrorKind::UnexpectedEof {
break;
}
}
}
}
}
drop(reader); let mut writer = io::BufWriter::new(&mut *file);
writer.seek(SeekFrom::Start(0))?;
writer.write_all(&updated_buffer)?;
writer.get_ref().sync_all()?;
for key in valid_keys {
self.cache.lock().unwrap().remove(&key);
}
log::info!("[DELETE_MANY] Deleted {} keys from db", vkc.len());
Ok(())
}
pub fn update_many(&mut self, values: Vec<BinaryKv<T>>) -> std::io::Result<()>
{
log::info!("[UPDATE_MANY] Updating many keys in db...");
let mut to_set = Vec::new();
{
let cache_guard = self.cache.lock().unwrap();
for entry in values.iter() {
if cache_guard.get(&entry.key).is_none() {
to_set.push(entry.clone());
}
}
}
if !to_set.is_empty() {
log::debug!(
"[UPDATE_MANY] Found {} keys that dont exist, setting them instead of calling update",
to_set.len()
);
return self.set_many(to_set);
}
let mut file = match self.file.lock() {
Ok(file) => file,
Err(e) => {
return Err(io::Error::new(io::ErrorKind::Other, format!("Error locking file: {:?}", e)));
}
};
let mut reader = io::BufReader::new(&mut *file);
reader.seek(SeekFrom::Start(0))?;
let mut updated_entries = Vec::new();
loop {
match deserialize_from::<_, BinaryKv<T>>(&mut reader) {
Ok(entry) => {
if let Some(value) = values.iter().find(|v| v.key == entry.key) {
let mut updated_entry = entry.clone();
updated_entry.value = value.value.clone();
updated_entries.push(updated_entry);
} else {
updated_entries.push(entry);
}
}
Err(e) => {
if let bincode::ErrorKind::Io(io_err) = e.as_ref() {
if io_err.kind() == io::ErrorKind::UnexpectedEof {
break;
}
}
}
}
}
drop(reader); let mut writer = io::BufWriter::new(&mut *file);
let mut serialized = Vec::new();
for entry in updated_entries.iter() {
serialized.push(BinaryKv::new(entry.key.clone(), entry.value.clone()))
}
let serialized = match bincode::serialize(&serialized) {
Ok(data) => data,
Err(e) => {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("Error serializing data: {:?}", e),
));
}
};
log::debug!("[UPDATE_MANY] Serialized {} keys", serialized.len());
writer.seek(SeekFrom::Start(0))?;
writer.write_all(&serialized)?;
writer.get_ref().sync_all()?;
log::debug!("[UPDATE_MANY] Wrote {} keys to file", serialized.len());
for entry in updated_entries.iter() {
self.cache
.lock()
.unwrap()
.insert(entry.key.clone(), BinaryKv::new(entry.key.clone(), entry.value.clone()));
}
log::info!("[UPDATE_MANY] Updated {} keys in db", values.len());
Ok(())
}
}