use redb::{Database, TableDefinition};
use serde_json::Value as JsonValue;
use std::sync::Arc;
use crate::errors::DataStackError;
const DATA_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("datastack_docs");
#[derive(Clone)]
pub struct DataStack {
pub(crate) db: Arc<Database>,
}
impl DataStack {
pub async fn new(path: &str) -> Result<Self, DataStackError> {
let db = Database::builder().create(path)?;
{
let write_tx = db.begin_write()?;
{ let _ = write_tx.open_table(DATA_TABLE)?;}
write_tx.commit()?;
}
Ok(Self { db: Arc::new(db) })
}
fn key(col: &str, id: &str) -> String {
format!("{}:{}", col, id)
}
pub async fn add(&self, collection: &str, id: &str, value: &JsonValue) -> Result<(), DataStackError> {
let bytes = serde_json::to_vec(value)?;
let write_tx = self.db.begin_write()?;
{
let mut table = write_tx.open_table(DATA_TABLE)?;
table.insert(Self::key(collection, id).as_str(), bytes.as_slice())?;
}
write_tx.commit()?;
Ok(())
}
pub async fn get(&self, collection: &str, id: &str) -> Result<Option<JsonValue>, DataStackError> {
let read_tx = self.db.begin_read()?;
let table = read_tx.open_table(DATA_TABLE)?;
let res = table.get(Self::key(collection, id).as_str())?;
if let Some(access) = res {
let json: JsonValue = serde_json::from_slice(access.value())?;
return Ok(Some(json));
}
Ok(None)
}
pub async fn delete(&self, collection: &str, id: &str) -> Result<(), DataStackError> {
let write_tx = self.db.begin_write()?;
{
let mut table = write_tx.open_table(DATA_TABLE)?;
table.remove(Self::key(collection, id).as_str())?;
}
write_tx.commit()?;
Ok(())
}
pub async fn batch_add(&self, collection: &str, items: &JsonValue) -> Result<(), DataStackError> {
if let JsonValue::Object(map) = items {
let write_tx = self.db.begin_write()?;
{
let mut table = write_tx.open_table(DATA_TABLE)?;
for (id, val) in map {
let bytes = serde_json::to_vec(val)?;
table.insert(Self::key(collection, id).as_str(), bytes.as_slice())?;
}
}
write_tx.commit()?;
}
Ok(())
}
pub async fn batch_get(
&self,
collection: &str,
ids: &JsonValue,
) -> Result<JsonValue, DataStackError> {
let JsonValue::Array(list) = ids else {
return Ok(JsonValue::Object(Default::default()));
};
let read_tx = self.db.begin_read()?;
let table = read_tx.open_table(DATA_TABLE)?;
let mut results = serde_json::Map::new();
for id in list {
if let Some(id_str) = id.as_str() {
let key = Self::key(collection, id_str);
if let Some(access) = table.get(key.as_str())? {
let val: JsonValue = serde_json::from_slice(access.value())?;
results.insert(id_str.to_string(), val);
}
}
}
Ok(JsonValue::Object(results))
}
pub async fn batch_delete(
&self,
collection: &str,
ids: &JsonValue,
) -> Result<(), DataStackError> {
let JsonValue::Array(list) = ids else {
return Ok(());
};
let write_tx = self.db.begin_write()?;
{
let mut table = write_tx.open_table(DATA_TABLE)?;
for id in list {
if let Some(id_str) = id.as_str() {
let key = Self::key(collection, id_str);
table.remove(key.as_str())?;
}
}
}
write_tx.commit()?;
Ok(())
}
pub async fn update(&self, collection: &str, id: &str, fields: &JsonValue) -> Result<(), DataStackError> {
let mut data = self.get(collection, id).await?.unwrap_or(serde_json::json!({}));
if let JsonValue::Object(ref mut obj) = data {
if let JsonValue::Object(new_fields) = fields {
for (k, v) in new_fields {
if let Some(op) = v.get("__op") {
match op.as_str().unwrap_or("") {
"inc" => {
let amount = v["amount"].as_f64().unwrap_or(0.0);
let current = crate::utils::get_deep(obj, k).and_then(|v| v.as_f64()).unwrap_or(0.0);
crate::utils::set_deep(obj, k, serde_json::json!(current + amount));
}
"remove" => { crate::utils::remove_deep(obj, k); }
"array_union" => {
let mut existing = crate::utils::get_deep(obj, k).and_then(|v| v.as_array().cloned()).unwrap_or_default();
if let Some(new_vals) = v["values"].as_array() {
for nv in new_vals {
if !existing.contains(nv) { existing.push(nv.clone()); }
}
}
crate::utils::set_deep(obj, k, JsonValue::Array(existing));
}
_ => { crate::utils::set_deep(obj, k, v.clone()); }
}
} else {
crate::utils::set_deep(obj, k, v.clone());
}
}
}
}
self.add(collection, id, &data).await?;
Ok(())
}
pub async fn scan(&self, collection: &str, limit: u32, cursor: &str, order: &str) -> Result<JsonValue, DataStackError> {
let prefix = format!("{}:", collection);
let read_tx = self.db.begin_read()?;
let table = read_tx.open_table(DATA_TABLE)?;
let mut out = serde_json::Map::new();
let mut count = 0;
if order == "d" {
let range_start = prefix.clone();
let range_end = if cursor.is_empty() {
format!("{}:\u{10FFFF}", collection)
} else {
format!("{}:{}", collection, cursor)
};
let iter = table.range(range_start.as_str()..=range_end.as_str())?.rev();
for item in iter {
if count >= limit { break; }
let (k, v) = item?;
let key_str = k.value();
let doc_id = key_str.strip_prefix(&prefix).unwrap_or(key_str).to_string();
out.insert(doc_id, serde_json::from_slice(v.value())?);
count += 1;
}
} else {
let range_start = if cursor.is_empty() { prefix.clone() } else { format!("{}:{}", collection, cursor) };
let range_end = format!("{}:\u{10FFFF}", collection);
let iter = table.range(range_start.as_str()..range_end.as_str())?;
for item in iter {
if count >= limit { break; }
let (k, v) = item?;
let key_str = k.value();
if !key_str.starts_with(&prefix) { break; }
let doc_id = key_str.strip_prefix(&prefix).unwrap_or(key_str).to_string();
out.insert(doc_id, serde_json::from_slice(v.value())?);
count += 1;
}
}
Ok(JsonValue::Object(out))
}
}