use crate::engine::Engine;
use crate::error::{FlowError, Result};
use crate::jsondb::cursor::{Cursor, CursorDirection, IndexCursor};
use crate::jsondb::encoding::*;
use crate::jsondb::helpers::*;
use crate::jsondb::keyrange::KeyRange;
use crate::jsondb::query::QueryBuilder;
use crate::jsondb::schema::*;
use crate::jsondb::{KeyArg, ObjectStore, Transaction, TransactionMode};
use crate::record::{Config, InternalRecord, Record, ScanRange};
use serde_json::Value;
use std::collections::HashMap;
use std::fmt;
use std::ops::Bound;
pub struct JsonDB {
pub(crate) engine: Engine,
pub(crate) schema: Schema,
write_lock: std::sync::Mutex<()>,
}
impl fmt::Debug for JsonDB {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("JsonDB")
.field("store_names", &self.store_names())
.finish()
}
}
impl JsonDB {
pub fn open(config: Config) -> Result<Self> {
let engine = Engine::open(config)?;
Self::from_engine(engine)
}
pub fn from_engine(engine: Engine) -> Result<Self> {
let schema = load_schemas(|range| {
let iter = engine.scan(range)?;
iter.collect()
})?;
Ok(Self {
engine,
schema,
write_lock: std::sync::Mutex::new(()),
})
}
pub fn shutdown(self) -> Result<()> {
self.engine.shutdown()
}
pub fn close(&self) -> Result<()> {
self.engine.close()
}
pub fn engine(&self) -> &Engine {
&self.engine
}
pub fn create_object_store(&self, name: &str, key_path: &str) -> Result<()> {
validate_store_def(name, key_path)?;
let mut def = self.schema.get(name);
match &mut def {
Some(d) => {
if d.key_path != key_path {
return Err(FlowError::JsonDb(format!(
"store '{}' already exists with a different key_path",
name
)));
}
Ok(())
}
None => {
let entry = StoreDef {
name: name.to_string(),
key_path: key_path.to_string(),
auto_increment: false,
indexes: vec![],
next_auto_id: 0,
};
self.engine
.write_internal(vec![InternalRecord::from_record(
&schema_record(&entry)?,
0,
)])?;
self.schema.insert(entry);
Ok(())
}
}
}
pub fn delete_object_store(&self, name: &str) -> Result<()> {
let def = self.schema.get(name);
if def.is_none() {
return Err(FlowError::JsonDb(format!("store '{}' not found", name)));
}
let def = def.unwrap();
let doc_pfx = doc_prefix(name);
let mut records = Vec::new();
for index in &def.indexes {
let pfx = idx_prefix(name, &index.name);
let end = crate::record::increment_prefix_bytes(&pfx);
records.push(InternalRecord::delete_range(pfx, end, 0));
}
let doc_end = crate::record::increment_prefix_bytes(&doc_pfx);
records.push(InternalRecord::delete_range(doc_pfx, doc_end, 0));
records.push(schema_delete_record(name));
records.push(InternalRecord::delete(counter_key(name), 0, 0));
self.engine.write_internal(records)?;
self.schema.remove(name);
Ok(())
}
pub fn create_index(
&self,
store: &str,
name: &str,
key_paths: &[&str],
unique: bool,
multi_entry: bool,
) -> Result<()> {
let mut def = self
.schema
.get(store)
.ok_or_else(|| FlowError::JsonDb(format!("store '{}' not found", store)))?;
validate_index_def(&def, name, key_paths)?;
let index = IndexDef {
name: name.to_string(),
key_paths: key_paths.iter().map(|s| s.to_string()).collect(),
unique,
multi_entry,
};
def.indexes.push(index.clone());
let mut records = Vec::new();
records.push(InternalRecord::from_record(&schema_record(&def)?, 0));
let doc_pfx = doc_prefix(store);
let docs = self.engine.scan(prefix_range(&doc_pfx))?;
for rec in docs {
let doc = decode_doc(&rec?.value)?;
let index_vals = extract_index_values(&doc, &index);
for vals in index_vals {
let key_bytes =
encode_primary_key(&extract_field(&doc, &def.key_path).unwrap_or(Value::Null))?;
let encoded = encode_composite_value(&vals);
records.push(InternalRecord::from_record(
&Record::new(
idx_key(store, name, &encoded, &key_bytes),
0,
key_bytes.clone(),
),
0,
));
}
}
self.engine.write_internal(records)?;
self.schema.insert(def);
Ok(())
}
pub fn create_index_on(
&self,
store: &str,
name: &str,
key_path: &str,
unique: bool,
) -> Result<()> {
self.create_index(store, name, &[key_path], unique, false)
}
pub fn delete_index(&self, store: &str, name: &str) -> Result<()> {
let mut def = self
.schema
.get(store)
.ok_or_else(|| FlowError::JsonDb(format!("store '{}' not found", store)))?;
let pos = def.indexes.iter().position(|i| i.name == name);
if pos.is_none() {
return Err(FlowError::JsonDb(format!(
"index '{}' not found on store '{}'",
name, store
)));
}
def.indexes.remove(pos.unwrap());
let pfx = idx_prefix(store, name);
let end = crate::record::increment_prefix_bytes(&pfx);
let mut records = vec![InternalRecord::delete_range(pfx, end, 0)];
records.push(InternalRecord::from_record(&schema_record(&def)?, 0));
self.engine.write_internal(records)?;
self.schema.insert(def);
Ok(())
}
pub fn store_names(&self) -> Vec<String> {
self.schema.list().into_iter().map(|s| s.name).collect()
}
pub fn get_store(&self, name: &str) -> Option<StoreDef> {
self.schema.get(name)
}
pub fn apply_store(&self, def: &StoreDef) -> Result<()> {
let existing = self.schema.get(&def.name);
match existing {
None => {
self.create_object_store(&def.name, &def.key_path)?;
if def.auto_increment {
let mut current = self.schema.get(&def.name).unwrap();
current.auto_increment = true;
self.engine.write_internal(vec![
InternalRecord::from_record(&schema_record(¤t)?, 0),
])?;
self.schema.insert(current);
}
for idx in &def.indexes {
let paths: Vec<&str> = idx.key_paths.iter().map(|s| s.as_str()).collect();
self.create_index(&def.name, &idx.name, &paths, idx.unique, idx.multi_entry)?;
}
Ok(())
}
Some(existing) => {
if existing.key_path != def.key_path {
return Err(FlowError::JsonDb(format!(
"store '{}' already exists with a different key_path ('{}' vs '{}')",
def.name, existing.key_path, def.key_path
)));
}
for idx in &def.indexes {
if let Some(ex_idx) = existing.indexes.iter().find(|i| i.name == idx.name)
&& (ex_idx.key_paths != idx.key_paths || ex_idx.unique != idx.unique)
{
return Err(FlowError::JsonDb(format!(
"index '{}' on store '{}' already exists with different \
definition (key_paths={:?}, unique={}) vs requested \
(key_paths={:?}, unique={})",
idx.name, def.name, ex_idx.key_paths, ex_idx.unique,
idx.key_paths, idx.unique,
)));
}
}
for idx in &def.indexes {
if !existing.indexes.iter().any(|i| i.name == idx.name) {
let paths: Vec<&str> = idx.key_paths.iter().map(|s| s.as_str()).collect();
self.create_index(&def.name, &idx.name, &paths, idx.unique, idx.multi_entry)?;
}
}
for ex_idx in &existing.indexes {
if !def.indexes.iter().any(|i| i.name == ex_idx.name) {
self.delete_index(&def.name, &ex_idx.name)?;
}
}
Ok(())
}
}
}
pub fn apply_schemas(&self, stores: &[StoreDef]) -> Result<()> {
for def in stores {
self.apply_store(def)?;
}
Ok(())
}
pub fn apply_schema<T: ObjectStore>(&self) -> Result<()> {
self.apply_store(&T::store_def())
}
pub fn put(&self, store: &str, doc: Value) -> Result<Value> {
let _lock = self.write_lock.lock().unwrap();
let def = self
.schema
.get(store)
.ok_or_else(|| FlowError::JsonDb(format!("store '{}' not found", store)))?;
let key_val = extract_field(&doc, &def.key_path).ok_or_else(|| {
FlowError::JsonDb(format!(
"document missing key_path '{}' for store '{}'",
def.key_path, store
))
})?;
let key_bytes = encode_primary_key(&key_val)?;
let doc_bytes = encode_doc(&doc)?;
let batch = build_put_batch(&def, store, &key_bytes, &doc_bytes, &doc, &self.engine)?;
self.engine.write_internal(batch)?;
Ok(key_val)
}
pub fn get(&self, store: &str, key: &Value) -> Result<Option<Value>> {
let _def = self
.schema
.get(store)
.ok_or_else(|| FlowError::JsonDb(format!("store '{}' not found", store)))?;
let key_bytes = encode_primary_key(key)?;
let rec = self.engine.get_bytes(&doc_key(store, &key_bytes), 0);
match rec {
Some(r) => Ok(Some(decode_doc(&r.value)?)),
None => Ok(None),
}
}
pub fn get_with_meta(&self, store: &str, key: &Value) -> Result<Option<(Value, i64, i64)>> {
let _def = self
.schema
.get(store)
.ok_or_else(|| FlowError::JsonDb(format!("store '{}' not found", store)))?;
let key_bytes = encode_primary_key(key)?;
let rec = self.engine.get_bytes(&doc_key(store, &key_bytes), 0);
match rec {
Some(r) => Ok(Some((decode_doc(&r.value)?, r.ts, r.expire_at))),
None => Ok(None),
}
}
pub fn delete(&self, store: &str, key: &Value) -> Result<()> {
let _lock = self.write_lock.lock().unwrap();
let def = self
.schema
.get(store)
.ok_or_else(|| FlowError::JsonDb(format!("store '{}' not found", store)))?;
let key_bytes = encode_primary_key(key)?;
let batch = build_delete_batch(&def, store, &key_bytes, &self.engine)?;
self.engine.write_internal(batch)?;
Ok(())
}
pub fn put_auto(&self, store: &str, mut doc: Value) -> Result<Value> {
let _lock = self.write_lock.lock().unwrap();
let def = self
.schema
.get(store)
.ok_or_else(|| FlowError::JsonDb(format!("store '{}' not found", store)))?;
if !def.auto_increment {
return Err(FlowError::JsonDb(format!(
"store '{}' is not auto-increment",
store
)));
}
let (next_id, counter_rec) = prepare_counter(&self.engine, store)?;
let key_val = Value::Number(next_id.into());
let key_bytes = next_id.to_string().into_bytes();
if let Value::Object(ref mut map) = doc {
map.insert(def.key_path.clone(), key_val.clone());
}
let doc_bytes = encode_doc(&doc)?;
let mut batch = build_put_batch(&def, store, &key_bytes, &doc_bytes, &doc, &self.engine)?;
batch.push(counter_rec); self.engine.write_internal(batch)?;
Ok(key_val)
}
pub fn count(&self, store: &str) -> Result<usize> {
let _ = self
.schema
.get(store)
.ok_or_else(|| FlowError::JsonDb(format!("store '{}' not found", store)))?;
let pfx = doc_prefix(store);
let iter = self.engine.scan(prefix_range(&pfx))?;
let mut count = 0;
for r in iter {
let _ = r?;
count += 1;
}
Ok(count)
}
pub fn scan(&self, store: &str) -> Result<Vec<Value>> {
let _ = self
.schema
.get(store)
.ok_or_else(|| FlowError::JsonDb(format!("store '{}' not found", store)))?;
let pfx = doc_prefix(store);
let iter = self.engine.scan(prefix_range(&pfx))?;
let mut docs = Vec::new();
for r in iter {
let rec = r?;
docs.push(decode_doc(&rec.value)?);
}
Ok(docs)
}
pub fn scan_with_meta(&self, store: &str) -> Result<Vec<(Value, i64, i64)>> {
let _ = self
.schema
.get(store)
.ok_or_else(|| FlowError::JsonDb(format!("store '{}' not found", store)))?;
let pfx = doc_prefix(store);
let iter = self.engine.scan(prefix_range(&pfx))?;
let mut docs = Vec::new();
for r in iter {
let rec = r?;
docs.push((decode_doc(&rec.value)?, rec.ts, rec.expire_at));
}
Ok(docs)
}
pub fn add(&self, store: &str, doc: Value) -> Result<Value> {
let _lock = self.write_lock.lock().unwrap();
let def = self
.schema
.get(store)
.ok_or_else(|| FlowError::JsonDb(format!("store '{}' not found", store)))?;
let key_val = extract_field(&doc, &def.key_path).ok_or_else(|| {
FlowError::JsonDb(format!(
"document missing key_path '{}' for store '{}'",
def.key_path, store
))
})?;
let key_bytes = encode_primary_key(&key_val)?;
if self.engine.get_bytes(&doc_key(store, &key_bytes), 0).is_some() {
return Err(FlowError::JsonDb(format!(
"a record with key '{:?}' already exists in store '{}'",
key_val, store
)));
}
let doc_bytes = encode_doc(&doc)?;
let batch = build_put_batch(&def, store, &key_bytes, &doc_bytes, &doc, &self.engine)?;
self.engine.write_internal(batch)?;
Ok(key_val)
}
pub fn clear(&self, store: &str) -> Result<usize> {
let _lock = self.write_lock.lock().unwrap();
let def = self
.schema
.get(store)
.ok_or_else(|| FlowError::JsonDb(format!("store '{}' not found", store)))?;
let count = self.count(store)?;
let mut records = Vec::new();
for index in &def.indexes {
let pfx = idx_prefix(store, &index.name);
let end = crate::record::increment_prefix_bytes(&pfx);
records.push(InternalRecord::delete_range(pfx, end, 0));
}
let doc_pfx = doc_prefix(store);
let doc_end = crate::record::increment_prefix_bytes(&doc_pfx);
records.push(InternalRecord::delete_range(doc_pfx, doc_end, 0));
self.engine.write_internal(records)?;
Ok(count)
}
pub fn get_all(
&self,
store: &str,
query: Option<&KeyRange>,
count: Option<usize>,
) -> Result<Vec<Value>> {
let _ = self
.schema
.get(store)
.ok_or_else(|| FlowError::JsonDb(format!("store '{}' not found", store)))?;
let range = match query {
Some(kr) => kr.to_doc_scan_range(store)?,
None => prefix_range(&doc_prefix(store)),
};
let iter = self.engine.scan(range)?;
let mut docs = Vec::new();
for r in iter {
let rec = r?;
docs.push(decode_doc(&rec.value)?);
if let Some(n) = count {
if docs.len() >= n {
break;
}
}
}
Ok(docs)
}
pub fn get_all_keys(
&self,
store: &str,
query: Option<&KeyRange>,
count: Option<usize>,
) -> Result<Vec<Value>> {
let def = self
.schema
.get(store)
.ok_or_else(|| FlowError::JsonDb(format!("store '{}' not found", store)))?;
let range = match query {
Some(kr) => kr.to_doc_scan_range(store)?,
None => prefix_range(&doc_prefix(store)),
};
let iter = self.engine.scan(range)?;
let mut keys = Vec::new();
for r in iter {
let rec = r?;
let doc = decode_doc(&rec.value)?;
if let Some(key_val) = extract_field(&doc, &def.key_path) {
keys.push(key_val);
}
if let Some(n) = count {
if keys.len() >= n {
break;
}
}
}
Ok(keys)
}
pub fn get_key(&self, store: &str, key: &Value) -> Result<Option<Value>> {
let _ = self
.schema
.get(store)
.ok_or_else(|| FlowError::JsonDb(format!("store '{}' not found", store)))?;
let key_bytes = encode_primary_key(key)?;
if self.engine.get_bytes(&doc_key(store, &key_bytes), 0).is_some() {
Ok(Some(key.clone()))
} else {
Ok(None)
}
}
pub fn count_with_query(&self, store: &str, query: Option<&KeyRange>) -> Result<usize> {
let _ = self
.schema
.get(store)
.ok_or_else(|| FlowError::JsonDb(format!("store '{}' not found", store)))?;
let range = match query {
Some(kr) => kr.to_doc_scan_range(store)?,
None => prefix_range(&doc_prefix(store)),
};
let iter = self.engine.scan(range)?;
let mut count = 0;
for r in iter {
let _ = r?;
count += 1;
}
Ok(count)
}
pub fn get_by_index(&self, store: &str, index: &str, value: &Value) -> Result<Vec<Value>> {
let _def = self
.schema
.get(store)
.ok_or_else(|| FlowError::JsonDb(format!("store '{}' not found", store)))?;
let idx_def = _def
.indexes
.iter()
.find(|i| i.name == index)
.ok_or_else(|| {
FlowError::JsonDb(format!("index '{}' not found on '{}'", index, store))
})?;
let is_composite = idx_def.key_paths.len() > 1;
let encoded = if is_composite {
match value {
Value::Array(arr) => encode_composite_value(arr),
_ => encode_index_value(value),
}
} else {
encode_index_value(value)
};
let pfx = idx_value_prefix(store, index, &encoded);
let iter = self.engine.scan(prefix_range(&pfx))?;
let mut docs = Vec::new();
for r in iter {
let rec = r?;
if let Some(doc) = self.engine.get_bytes(&doc_key(store, &rec.value), 0) {
docs.push(decode_doc(&doc.value)?);
}
}
Ok(docs)
}
pub fn range_by_index(
&self,
store: &str,
index: &str,
start: &Value,
end: &Value,
) -> Result<Vec<Value>> {
let _def = self
.schema
.get(store)
.ok_or_else(|| FlowError::JsonDb(format!("store '{}' not found", store)))?;
let idx_def = _def
.indexes
.iter()
.find(|i| i.name == index)
.ok_or_else(|| {
FlowError::JsonDb(format!("index '{}' not found on '{}'", index, store))
})?;
let is_composite = idx_def.key_paths.len() > 1;
let enc_start = if is_composite {
match start {
Value::Array(arr) => encode_composite_value(arr),
_ => encode_index_value(start),
}
} else {
encode_index_value(start)
};
let enc_end = if is_composite {
match end {
Value::Array(arr) => encode_composite_value(arr),
_ => encode_index_value(end),
}
} else {
encode_index_value(end)
};
let pfx = idx_prefix(store, index);
let range = ScanRange {
key_start: Bound::Included([pfx.as_slice(), &enc_start].concat()),
key_end: Bound::Excluded([pfx.as_slice(), &enc_end].concat()),
ts_start: Bound::Unbounded,
ts_end: Bound::Unbounded,
};
let iter = self.engine.scan(range)?;
let mut docs = Vec::new();
for r in iter {
let rec = r?;
if let Some(doc) = self.engine.get_bytes(&doc_key(store, &rec.value), 0) {
docs.push(decode_doc(&doc.value)?);
}
}
Ok(docs)
}
pub fn transaction<'db>(
&'db self,
stores: &[&str],
mode: TransactionMode,
) -> Result<Transaction<'db>> {
for name in stores {
if self.schema.get(name).is_none() {
return Err(FlowError::JsonDb(format!("store '{}' not found", name)));
}
}
Ok(Transaction {
db: self,
mode,
writes: HashMap::new(),
counter_updates: Vec::new(),
next_ids: HashMap::new(),
committed: false,
})
}
pub fn query<'a>(&'a self, store: &'a str) -> QueryBuilder<'a> {
QueryBuilder::new(self, store)
}
pub fn put_doc<T: serde::Serialize>(&self, store: &str, doc: &T) -> Result<Value> {
let json = serde_json::to_value(doc).map_err(FlowError::from)?;
self.put(store, json)
}
pub fn get_doc<T: serde::de::DeserializeOwned>(
&self,
store: &str,
key: impl KeyArg,
) -> Result<Option<T>> {
let val = self.get(store, &key.into_value())?;
match val {
Some(v) => {
let t: T = serde_json::from_value(v).map_err(FlowError::from)?;
Ok(Some(t))
}
None => Ok(None),
}
}
pub fn delete_doc(&self, store: &str, key: impl KeyArg) -> Result<()> {
self.delete(store, &key.into_value())
}
pub fn open_cursor(
&self,
store: &str,
range: Option<&KeyRange>,
direction: CursorDirection,
) -> Result<Cursor> {
Cursor::open(self, store, range, direction)
}
pub fn open_cursor_on_index(
&self,
store: &str,
index: &str,
range: Option<&KeyRange>,
direction: CursorDirection,
) -> Result<IndexCursor> {
IndexCursor::open(self, store, index, range, direction)
}
}