use crate::error::JasonError;
use crate::query::Query;
use crate::replica::{Replica, Replicator};
use crate::sources::{FileSource, InMemory, Source};
use crate::util::{indexing, quiet_assert};
use humphrey_json::prelude::*;
use humphrey_json::Value;
use std::borrow::Borrow;
use std::collections::{BTreeSet, HashMap};
use std::marker::PhantomData;
use std::path::Path;
use std::vec::IntoIter;
pub struct Database<T, S = FileSource>
where
T: IntoJson + FromJson + 'static,
S: Source,
{
pub(crate) primary_indexes: HashMap<String, u64>,
pub(crate) secondary_indexes: HashMap<String, HashMap<Value, BTreeSet<u64>>>,
pub(crate) source: S,
pub(crate) replicas: Vec<Replicator<T>>,
marker: PhantomData<T>,
}
impl<T> Database<T, FileSource>
where
T: IntoJson + FromJson,
{
pub fn new(path: impl AsRef<Path>) -> Result<Self, JasonError> {
let source = FileSource::new(path)?;
Self::from_source(source)
}
pub fn create(path: impl AsRef<Path>) -> Result<Self, JasonError> {
let source = FileSource::create(path)?;
Self::from_source(source)
}
pub fn open(path: impl AsRef<Path>) -> Result<Self, JasonError> {
let source = FileSource::open(path)?;
Self::from_source(source)
}
pub fn into_memory(self) -> Result<Database<T, InMemory>, JasonError> {
Ok(Database {
primary_indexes: self.primary_indexes,
secondary_indexes: self.secondary_indexes,
source: self.source.into_memory()?,
replicas: self.replicas,
marker: PhantomData,
})
}
}
impl<T> Database<T, InMemory>
where
T: IntoJson + FromJson,
{
pub fn new_in_memory() -> Self {
Self::default()
}
pub fn into_file(self, path: impl AsRef<Path>) -> Result<Database<T>, JasonError> {
Ok(Database {
primary_indexes: self.primary_indexes,
secondary_indexes: self.secondary_indexes,
source: self.source.into_file(path)?,
replicas: self.replicas,
marker: PhantomData,
})
}
}
impl<T> Default for Database<T, InMemory>
where
T: IntoJson + FromJson,
{
fn default() -> Self {
Self {
primary_indexes: HashMap::new(),
secondary_indexes: HashMap::new(),
source: InMemory::new(),
replicas: Vec::new(),
marker: PhantomData,
}
}
}
impl<T, S> Database<T, S>
where
T: IntoJson + FromJson,
S: Source,
{
pub fn from_source(mut source: S) -> Result<Self, JasonError> {
let indexes = source.load_indexes()?;
Ok(Self {
primary_indexes: indexes,
secondary_indexes: HashMap::new(),
source,
replicas: Vec::new(),
marker: PhantomData,
})
}
pub fn with_compaction(mut self) -> Result<Self, JasonError> {
self.compact()?;
Ok(self)
}
pub fn with_index(mut self, field: impl AsRef<str>) -> Result<Self, JasonError> {
let field = field.as_ref().to_string();
let indexes = self.source.index_on(&field, &self.primary_indexes)?;
self.secondary_indexes.insert(field, indexes);
Ok(self)
}
pub fn with_replica<R>(mut self, replica: R) -> Self
where
R: Replica<T>,
{
self.replicas.push(Replicator::new(replica));
self
}
pub fn with_async_replica<R>(mut self, replica: R) -> Self
where
R: Replica<T>,
{
self.replicas.push(Replicator::new_async(replica));
self
}
pub fn get(&mut self, key: impl AsRef<str>) -> Result<T, JasonError> {
let index = *self
.primary_indexes
.get(key.as_ref())
.ok_or(JasonError::InvalidKey)?;
Ok(self.get_at_index(index)?.1)
}
pub(crate) fn get_at_index(&mut self, index: u64) -> Result<(String, T), JasonError> {
let (k, v) = self.source.read_entry(index)?;
let json = unsafe { String::from_utf8_unchecked(v) };
if json == "null" {
Err(JasonError::InvalidKey)
} else {
Ok((
k,
humphrey_json::from_str(json).map_err(|_| JasonError::JsonError)?,
))
}
}
pub fn set(&mut self, key: impl AsRef<str>, value: impl Borrow<T>) -> Result<(), JasonError> {
let json = humphrey_json::to_string(value.borrow());
let index = self.source.write_entry(key.as_ref(), json.as_bytes())?;
let old_index = self.primary_indexes.insert(key.as_ref().to_string(), index);
let old_value = if let Some(old_index) = old_index {
Some(self.get_at_index(old_index)?.1.to_json())
} else {
None
};
for (index_path, indexes) in &mut self.secondary_indexes {
let indexed_value = indexing::get_value(index_path, &value.borrow().to_json());
let set = indexes
.entry(indexed_value.clone())
.or_insert_with(BTreeSet::new);
if let Some(old_index) = old_index {
set.remove(&old_index);
}
set.insert(index);
if let Some(old_value) = &old_value {
let old_indexed_value = indexing::get_value(index_path, old_value);
if old_indexed_value != indexed_value {
let set = indexes
.entry(old_indexed_value)
.or_insert_with(BTreeSet::new);
set.remove(&old_index.unwrap());
}
}
}
for replica in &mut self.replicas {
replica.set(key.as_ref(), &json)?;
}
Ok(())
}
pub(crate) fn set_raw(&mut self, key: &str, value: &[u8]) -> Result<(), JasonError> {
quiet_assert(self.secondary_indexes.is_empty(), JasonError::Index)?;
let index = self.source.write_entry(key, value)?;
self.primary_indexes.insert(key.to_string(), index);
Ok(())
}
pub fn delete(&mut self, key: impl AsRef<str>) -> Result<(), JasonError> {
let index = self
.primary_indexes
.remove(key.as_ref())
.ok_or(JasonError::InvalidKey)?;
let value = self.get_at_index(index)?.1.to_json();
for (index_path, indexes) in &mut self.secondary_indexes {
let indexed_value = indexing::get_value(index_path, &value);
indexes
.get_mut(&indexed_value)
.ok_or(JasonError::InvalidKey)?
.remove(&index);
}
self.source.write_entry(key.as_ref(), "null")?;
for replica in &mut self.replicas {
replica.set(key.as_ref(), "null")?;
}
Ok(())
}
pub fn query(&mut self, query: Query) -> Result<Iter<T, S>, JasonError> {
query.execute(self)
}
pub fn iter(&mut self) -> Iter<T, S> {
let mut keys = self.primary_indexes.values().cloned().collect::<Vec<_>>();
keys.sort_unstable();
Iter {
database: self,
keys: keys.into_iter(),
}
}
pub fn iter_unordered(&mut self) -> Iter<T, S> {
let keys = self
.primary_indexes
.values()
.cloned()
.collect::<Vec<_>>()
.into_iter();
Iter {
database: self,
keys,
}
}
pub fn compact(&mut self) -> Result<(), JasonError> {
self.source.compact(&self.primary_indexes)?;
self.primary_indexes = self.source.load_indexes()?;
for (k, v) in self.secondary_indexes.iter_mut() {
*v = self.source.index_on(k, &self.primary_indexes)?;
}
Ok(())
}
pub fn migrate<U, F>(mut self, f: F) -> Result<Database<U, S>, JasonError>
where
U: IntoJson + FromJson,
F: Fn(T) -> U,
{
self.source.migrate(&self.primary_indexes, f)?;
Database::from_source(self.source)
}
}
pub struct Iter<'a, T, S>
where
T: IntoJson + FromJson + 'static,
S: Source,
{
pub(crate) database: &'a mut Database<T, S>,
pub(crate) keys: IntoIter<u64>,
}
impl<'a, T, S> Iterator for Iter<'a, T, S>
where
T: IntoJson + FromJson,
S: Source,
{
type Item = Result<(String, T), JasonError>;
fn next(&mut self) -> Option<Self::Item> {
let index = self.keys.next()?;
let value = self.database.get_at_index(index);
Some(value)
}
}
impl<'a, T, S> DoubleEndedIterator for Iter<'a, T, S>
where
T: IntoJson + FromJson,
S: Source,
{
fn next_back(&mut self) -> Option<Self::Item> {
let index = self.keys.next_back()?;
let value = self.database.get_at_index(index);
Some(value)
}
}
impl<'a, T, S> ExactSizeIterator for Iter<'a, T, S>
where
T: IntoJson + FromJson,
S: Source,
{
fn len(&self) -> usize {
self.keys.len()
}
}