use crate::error::{FlowError, Result};
use crate::jsondb::TransactionMode;
use crate::jsondb::db::JsonDB;
use crate::jsondb::encoding::*;
use crate::jsondb::helpers::*;
use crate::jsondb::schema::*;
use crate::record::{InternalRecord, Record, ScanRange};
use serde_json::Value;
use std::collections::HashMap;
use std::fmt;
use std::ops::Bound;
pub struct Transaction<'db> {
pub(crate) db: &'db JsonDB,
pub(crate) mode: TransactionMode,
pub(crate) writes: HashMap<(String, Vec<u8>), Option<Vec<u8>>>,
pub(crate) counter_updates: Vec<InternalRecord>,
pub(crate) next_ids: HashMap<String, u64>,
pub(crate) committed: bool,
}
impl fmt::Debug for Transaction<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Transaction")
.field("mode", &self.mode)
.field("writes_count", &self.writes.len())
.field("committed", &self.committed)
.finish()
}
}
impl<'db> Transaction<'db> {
pub fn put(&mut self, store: &str, doc: Value) -> Result<Value> {
self.require_read_write()?;
let def = self.require_store(store)?;
let key_val = extract_field(&doc, &def.key_path).ok_or_else(|| {
FlowError::JsonDb(format!(
"document missing key_path '{}' for store '{}'",
def.key_path, store
))
})?;
let key_bytes = encode_primary_key(&key_val)?;
let doc_bytes = encode_doc(&doc)?;
self.writes
.insert((store.to_string(), key_bytes), Some(doc_bytes));
Ok(key_val)
}
pub fn add(&mut self, store: &str, doc: Value) -> Result<Value> {
self.require_read_write()?;
let def = self.require_store(store)?;
let key_val = extract_field(&doc, &def.key_path).ok_or_else(|| {
FlowError::JsonDb(format!(
"document missing key_path '{}' for store '{}'",
def.key_path, store
))
})?;
let key_bytes = encode_primary_key(&key_val)?;
if let Some(doc_opt) = self.writes.get(&(store.to_string(), key_bytes.clone())) {
if doc_opt.is_some() {
return Err(FlowError::JsonDb(format!(
"key already exists in store '{}'",
store
)));
}
} else if self.db.engine.get_bytes(&doc_key(store, &key_bytes), 0).is_some() {
return Err(FlowError::JsonDb(format!(
"key already exists in store '{}'",
store
)));
}
let doc_bytes = encode_doc(&doc)?;
self.writes
.insert((store.to_string(), key_bytes), Some(doc_bytes));
Ok(key_val)
}
pub fn clear(&mut self, store: &str) -> Result<()> {
self.require_read_write()?;
let def = self.require_store(store)?;
let pfx = doc_prefix(store);
let iter = self.db.engine.scan(prefix_range(&pfx))?;
for r in iter {
let rec = r?;
let key_bytes = rec.key[pfx.len()..].to_vec();
self.writes.insert((store.to_string(), key_bytes), None);
}
let _ = def;
let keys_to_delete: Vec<Vec<u8>> = self
.writes
.iter()
.filter(|((s, _), opt)| s == store && opt.is_some())
.map(|((_, k), _)| k.clone())
.collect();
for k in keys_to_delete {
self.writes.insert((store.to_string(), k), None);
}
Ok(())
}
pub fn get(&self, store: &str, key: &Value) -> Result<Option<Value>> {
let _ = self.require_store(store)?;
let key_bytes = encode_primary_key(key)?;
if let Some(doc_opt) = self.writes.get(&(store.to_string(), key_bytes.clone())) {
return match doc_opt {
Some(bytes) => Ok(Some(decode_doc(bytes)?)),
None => Ok(None),
};
}
let rec = self.db.engine.get_bytes(&doc_key(store, &key_bytes), 0);
match rec {
Some(r) => Ok(Some(decode_doc(&r.value)?)),
None => Ok(None),
}
}
pub fn delete(&mut self, store: &str, key: &Value) -> Result<()> {
self.require_read_write()?;
let _ = self.require_store(store)?;
let key_bytes = encode_primary_key(key)?;
self.writes.insert((store.to_string(), key_bytes), None);
Ok(())
}
pub fn count(&self, store: &str) -> Result<usize> {
let _ = self.require_store(store)?;
let pfx = doc_prefix(store);
let iter = self.db.engine.scan(prefix_range(&pfx))?;
let mut count = 0usize;
for r in iter {
let rec = r?;
let key_bytes = rec.key[doc_prefix(store).len()..].to_vec();
if let Some(doc_opt) = self.writes.get(&(store.to_string(), key_bytes))
&& doc_opt.is_none()
{
continue; }
count += 1;
}
for ((s, k), doc_opt) in &self.writes {
if s != store {
continue;
}
if doc_opt.is_none() {
continue;
}
if self.db.engine.get_bytes(&doc_key(store, k), 0).is_none() {
count += 1;
}
}
Ok(count)
}
pub fn scan(&self, store: &str) -> Result<Vec<Value>> {
let _ = self.require_store(store)?;
let pfx = doc_prefix(store);
let iter = self.db.engine.scan(prefix_range(&pfx))?;
let mut docs = Vec::new();
for r in iter {
let rec = r?;
let key_bytes = rec.key[doc_prefix(store).len()..].to_vec();
if let Some(doc_opt) = self.writes.get(&(store.to_string(), key_bytes)) {
if let Some(bytes) = doc_opt {
docs.push(decode_doc(bytes)?);
}
} else {
docs.push(decode_doc(&rec.value)?);
}
}
for ((s, k), doc_opt) in &self.writes {
if s != store {
continue;
}
if let Some(bytes) = doc_opt
&& self.db.engine.get_bytes(&doc_key(store, k), 0).is_none()
{
docs.push(decode_doc(bytes)?);
}
}
Ok(docs)
}
pub fn get_by_index(&self, store: &str, index: &str, value: &Value) -> Result<Vec<Value>> {
let def = self.require_store(store)?;
let _ = def
.indexes
.iter()
.find(|i| i.name == index)
.ok_or_else(|| {
FlowError::JsonDb(format!("index '{}' not found on '{}'", index, store))
})?;
let encoded = encode_index_value(value);
let pfx = idx_value_prefix(store, index, &encoded);
let iter = self.db.engine.scan(prefix_range(&pfx))?;
let mut docs = Vec::new();
let idx_key_paths = def
.indexes
.iter()
.find(|i| i.name == index)
.map(|i| i.key_paths.clone())
.unwrap_or_default();
let first_path = idx_key_paths.first().map(|s| s.as_str()).unwrap_or("");
for r in iter {
let rec = r?;
let key_bytes = &rec.value;
if let Some(doc_opt) = self.writes.get(&(store.to_string(), key_bytes.clone())) {
if let Some(bytes) = doc_opt {
let buffered_doc = decode_doc(bytes)?;
if extract_field(&buffered_doc, first_path) == Some(value.clone()) {
docs.push(buffered_doc);
}
}
} else if let Some(doc) = self.db.engine.get_bytes(&doc_key(store, key_bytes), 0) {
docs.push(decode_doc(&doc.value)?);
}
}
for ((s, _k), doc_opt) in &self.writes {
if s != store {
continue;
}
if let Some(bytes) = doc_opt {
let doc: Value = decode_doc(bytes)?;
if extract_field(&doc, first_path) == Some(value.clone()) {
let already = docs.iter().any(|d| {
extract_field(d, &def.key_path) == extract_field(&doc, &def.key_path)
});
if !already {
docs.push(doc);
}
}
}
}
Ok(docs)
}
pub fn range_by_index(
&self,
store: &str,
index: &str,
start: &Value,
end: &Value,
) -> Result<Vec<Value>> {
let store_def = self.require_store(store)?;
let first_path = store_def
.indexes
.iter()
.find(|i| i.name == index)
.ok_or_else(|| {
FlowError::JsonDb(format!("index '{}' not found on '{}'", index, store))
})?
.key_paths
.first()
.cloned()
.unwrap_or_default();
let pfx = idx_prefix(store, index);
let enc_start = encode_index_value(start);
let enc_end = encode_index_value(end);
let range = ScanRange {
key_start: Bound::Included([pfx.as_slice(), &enc_start].concat()),
key_end: Bound::Excluded([pfx.as_slice(), &enc_end].concat()),
ts_start: Bound::Unbounded,
ts_end: Bound::Unbounded,
};
let iter = self.db.engine.scan(range)?;
let mut docs = Vec::new();
for r in iter {
let rec = r?;
let key_bytes = &rec.value;
if let Some(doc_opt) = self.writes.get(&(store.to_string(), key_bytes.clone())) {
if let Some(bytes) = doc_opt {
let buffered_doc = decode_doc(bytes)?;
if let Some(index_val) = extract_field(&buffered_doc, &first_path) {
let enc = encode_index_value(&index_val);
if enc.as_slice() >= enc_start.as_slice()
&& enc.as_slice() < enc_end.as_slice()
{
docs.push(buffered_doc);
}
}
}
} else if let Some(doc) = self.db.engine.get_bytes(&doc_key(store, key_bytes), 0) {
docs.push(decode_doc(&doc.value)?);
}
}
for ((s, key_bytes), doc_opt) in &self.writes {
if s != store {
continue;
}
if let Some(bytes) = doc_opt {
if self
.db
.engine
.get_bytes(&doc_key(store, key_bytes), 0)
.is_some()
{
continue;
}
let buffered_doc = decode_doc(bytes)?;
if let Some(index_val) = extract_field(&buffered_doc, &first_path) {
let enc = encode_index_value(&index_val);
if enc.as_slice() >= enc_start.as_slice() && enc.as_slice() < enc_end.as_slice()
{
docs.push(buffered_doc);
}
}
}
}
Ok(docs)
}
pub fn put_auto(&mut self, store: &str, mut doc: Value) -> Result<Value> {
self.require_read_write()?;
let def = self.require_store(store)?;
if !def.auto_increment {
return Err(FlowError::JsonDb(format!(
"store '{}' is not auto-increment",
store
)));
}
let next_id = match self.next_ids.get(store) {
Some(&existing) => {
self.next_ids.insert(store.to_string(), existing + 1);
existing + 1
}
None => {
let (id, counter_rec) = prepare_counter(&self.db.engine, store)?;
self.counter_updates.push(counter_rec);
self.next_ids.insert(store.to_string(), id);
id
}
};
let key_val = Value::Number(next_id.into());
if let Value::Object(ref mut map) = doc {
map.insert(def.key_path.clone(), key_val.clone());
}
let key_bytes = next_id.to_string().into_bytes();
let doc_bytes = encode_doc(&doc)?;
self.writes
.insert((store.to_string(), key_bytes), Some(doc_bytes));
Ok(key_val)
}
pub fn commit(mut self) -> Result<()> {
if self.committed {
return Ok(());
}
let mut records = Vec::new();
records.append(&mut self.counter_updates);
for ((store_name, key_bytes), doc_opt) in &self.writes {
let def =
self.db.schema.get(store_name).ok_or_else(|| {
FlowError::JsonDb(format!("store '{}' not found", store_name))
})?;
let old_doc_str = self
.db
.engine
.get_bytes(&doc_key(store_name, key_bytes), 0)
.and_then(|r| decode_doc(&r.value).ok());
if let Some(ref old_doc_val) = old_doc_str {
for idx in &def.indexes {
let old_values = extract_index_values(old_doc_val, idx);
for vals in old_values {
let encoded = encode_composite_value(&vals);
records.push(InternalRecord::delete(
idx_key(store_name, &idx.name, &encoded, key_bytes),
0,
0,
));
}
}
}
match doc_opt {
Some(doc_bytes) => {
records.push(InternalRecord::from_record(
&Record::new(doc_key(store_name, key_bytes), 0, doc_bytes.clone()),
0,
));
let new_doc = decode_doc(doc_bytes)?;
for idx in &def.indexes {
let new_values = extract_index_values(&new_doc, idx);
if idx.unique {
for vals in &new_values {
let encoded = encode_composite_value(vals);
let val_pfx = idx_value_prefix(store_name, &idx.name, &encoded);
let iter = self.db.engine.scan(prefix_range(&val_pfx))?;
for r in iter {
let rec = r?;
if rec.value.as_slice() != key_bytes.as_slice() {
return Err(FlowError::JsonDb(format!(
"unique constraint violation: index '{}' value '{:?}' already exists",
idx.name, vals
)));
}
}
for ((other_store, other_key), other_doc) in &self.writes {
if other_store != store_name {
continue;
}
if other_key == key_bytes {
continue;
}
if let Some(other_bytes) = other_doc {
let other_doc_val = decode_doc(other_bytes)?;
let other_vals = extract_index_values(&other_doc_val, idx);
for ov in other_vals {
if encode_composite_value(&ov) == encoded {
return Err(FlowError::JsonDb(format!(
"unique constraint violation in transaction: index '{}' value '{:?}'",
idx.name, vals
)));
}
}
}
}
}
}
for vals in &new_values {
let encoded = encode_composite_value(vals);
records.push(InternalRecord::from_record(
&Record::new(
idx_key(store_name, &idx.name, &encoded, key_bytes),
0,
key_bytes.clone(),
),
0,
));
}
}
}
None => {
records.push(InternalRecord::delete(doc_key(store_name, key_bytes), 0, 0));
}
}
}
if !records.is_empty() {
self.db.engine.write_internal(records)?;
}
self.committed = true;
Ok(())
}
pub fn abort(self) {
}
fn require_read_write(&self) -> Result<()> {
if self.mode == TransactionMode::ReadOnly {
return Err(FlowError::JsonDb(
"cannot write in a read-only transaction".into(),
));
}
Ok(())
}
fn require_store(&self, name: &str) -> Result<StoreDef> {
self.db
.schema
.get(name)
.ok_or_else(|| FlowError::JsonDb(format!("store '{}' not found", name)))
}
}
impl<'db> Drop for Transaction<'db> {
fn drop(&mut self) {
if !self.committed && self.mode == TransactionMode::ReadWrite {
}
}
}