datastack 0.4.0

A document-based acid local database.
Documentation
use redb::{Database, TableDefinition};
use serde_json::Value as JsonValue;
use std::sync::Arc;
use crate::errors::DataStackError;

// Define the table: Key is String (collection:id), Value is Vec<u8> (JSON)
const DATA_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("datastack_docs");

#[derive(Clone)]
pub struct DataStack {
    pub(crate) db: Arc<Database>,
}

impl DataStack {
    pub async fn new(path: &str) -> Result<Self, DataStackError> {
        let db = Database::builder().create(path)?;

        {
            let write_tx = db.begin_write()?;
            { let _ = write_tx.open_table(DATA_TABLE)?;}
            write_tx.commit()?;
        }

        Ok(Self { db: Arc::new(db) })
    }

    fn key(col: &str, id: &str) -> String {
        format!("{}:{}", col, id)
    }

    pub async fn add(&self, collection: &str, id: &str, value: &JsonValue) -> Result<(), DataStackError> {
        let bytes = serde_json::to_vec(value)?;
        let write_tx = self.db.begin_write()?;
        {
            let mut table = write_tx.open_table(DATA_TABLE)?;
            table.insert(Self::key(collection, id).as_str(), bytes.as_slice())?;
        }
        write_tx.commit()?;
        Ok(())
    }

    pub async fn get(&self, collection: &str, id: &str) -> Result<Option<JsonValue>, DataStackError> {
        let read_tx = self.db.begin_read()?;
        let table = read_tx.open_table(DATA_TABLE)?;
        let res = table.get(Self::key(collection, id).as_str())?;
        
        if let Some(access) = res {
            let json: JsonValue = serde_json::from_slice(access.value())?;
            return Ok(Some(json));
        }
        Ok(None)
    }

    pub async fn delete(&self, collection: &str, id: &str) -> Result<(), DataStackError> {
        let write_tx = self.db.begin_write()?;
        {
            let mut table = write_tx.open_table(DATA_TABLE)?;
            table.remove(Self::key(collection, id).as_str())?;
        }
        write_tx.commit()?;
        Ok(())
    }

    pub async fn batch_add(&self, collection: &str, items: &JsonValue) -> Result<(), DataStackError> {
        if let JsonValue::Object(map) = items {
            let write_tx = self.db.begin_write()?;
            {
                let mut table = write_tx.open_table(DATA_TABLE)?;
                for (id, val) in map {
                    let bytes = serde_json::to_vec(val)?;
                    table.insert(Self::key(collection, id).as_str(), bytes.as_slice())?;
                }
            }
            write_tx.commit()?;
        }
        Ok(())
    }

 pub async fn batch_get(
    &self,
    collection: &str,
    ids: &JsonValue,
) -> Result<JsonValue, DataStackError> {
    let JsonValue::Array(list) = ids else {
        return Ok(JsonValue::Object(Default::default()));
    };

    let read_tx = self.db.begin_read()?;
    let table = read_tx.open_table(DATA_TABLE)?;
    let mut results = serde_json::Map::new();

    for id in list {
        if let Some(id_str) = id.as_str() {
            let key = Self::key(collection, id_str);
            if let Some(access) = table.get(key.as_str())? {
                let val: JsonValue = serde_json::from_slice(access.value())?;
                results.insert(id_str.to_string(), val);
            }
        }
    }

    Ok(JsonValue::Object(results))
}


pub async fn batch_delete(
    &self,
    collection: &str,
    ids: &JsonValue,
) -> Result<(), DataStackError> {
    let JsonValue::Array(list) = ids else {
        return Ok(());
    };

    let write_tx = self.db.begin_write()?;
    {
        let mut table = write_tx.open_table(DATA_TABLE)?;

        for id in list {
            if let Some(id_str) = id.as_str() {
                let key = Self::key(collection, id_str);
                table.remove(key.as_str())?;
            }
        }
    }
    write_tx.commit()?;
    Ok(())
}


    pub async fn update(&self, collection: &str, id: &str, fields: &JsonValue) -> Result<(), DataStackError> {
        let mut data = self.get(collection, id).await?.unwrap_or(serde_json::json!({}));
        
        if let JsonValue::Object(ref mut obj) = data {
            if let JsonValue::Object(new_fields) = fields {
                for (k, v) in new_fields {
                    if let Some(op) = v.get("__op") {
                        match op.as_str().unwrap_or("") {
                            "inc" => {
                                let amount = v["amount"].as_f64().unwrap_or(0.0);
                                let current = crate::utils::get_deep(obj, k).and_then(|v| v.as_f64()).unwrap_or(0.0);
                                crate::utils::set_deep(obj, k, serde_json::json!(current + amount));
                            }
                            "remove" => { crate::utils::remove_deep(obj, k); }
                            "array_union" => {
                                let mut existing = crate::utils::get_deep(obj, k).and_then(|v| v.as_array().cloned()).unwrap_or_default();
                                if let Some(new_vals) = v["values"].as_array() {
                                    for nv in new_vals {
                                        if !existing.contains(nv) { existing.push(nv.clone()); }
                                    }
                                }
                                crate::utils::set_deep(obj, k, JsonValue::Array(existing));
                            }
                            _ => { crate::utils::set_deep(obj, k, v.clone()); }
                        }
                    } else {
                        crate::utils::set_deep(obj, k, v.clone());
                    }
                }
            }
        }
        self.add(collection, id, &data).await?;
        Ok(())
    }

    pub async fn scan(&self, collection: &str, limit: u32, cursor: &str, order: &str) -> Result<JsonValue, DataStackError> {
    let prefix = format!("{}:", collection);
    let read_tx = self.db.begin_read()?;
    let table = read_tx.open_table(DATA_TABLE)?;
    
    let mut out = serde_json::Map::new();
    let mut count = 0;

    if order == "d" {
        // For descending, we want everything from the prefix start up to the cursor
        let range_start = prefix.clone();
        let range_end = if cursor.is_empty() { 
            format!("{}:\u{10FFFF}", collection) 
        } else { 
            format!("{}:{}", collection, cursor) 
        };

        // Use range and then reverse it
        let iter = table.range(range_start.as_str()..=range_end.as_str())?.rev();
        for item in iter {
            if count >= limit { break; }
            let (k, v) = item?;
            let key_str = k.value();
            let doc_id = key_str.strip_prefix(&prefix).unwrap_or(key_str).to_string();
            out.insert(doc_id, serde_json::from_slice(v.value())?);
            count += 1;
        }
    } else {
        // Forward scan (ascending)
        let range_start = if cursor.is_empty() { prefix.clone() } else { format!("{}:{}", collection, cursor) };
        let range_end = format!("{}:\u{10FFFF}", collection);

        let iter = table.range(range_start.as_str()..range_end.as_str())?;
        for item in iter {
            if count >= limit { break; }
            let (k, v) = item?;
            let key_str = k.value();
            if !key_str.starts_with(&prefix) { break; }
            let doc_id = key_str.strip_prefix(&prefix).unwrap_or(key_str).to_string();
            out.insert(doc_id, serde_json::from_slice(v.value())?);
            count += 1;
        }
    }

    Ok(JsonValue::Object(out))
}
}