use dashmap::{DashMap, DashSet};
use serde_json::{json, Value};
use std::collections::HashMap;
use std::sync::Arc;
use tracing::debug;
use super::types::{DbError, LogEntry};
use super::{indexing, StorageBackend};
fn now_iso() -> String {
use web_time::{SystemTime, UNIX_EPOCH};
let secs = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
let (s, m, h) = (secs % 60, (secs / 60) % 60, (secs / 3600) % 24);
let mut d = secs / 86400; let mut y = 1970u64;
loop { let dy = if (y%4==0 && y%100!=0)||y%400==0{366}else{365}; if d<dy{break;} d-=dy; y+=1; }
let lp = (y%4==0&&y%100!=0)||y%400==0;
let md:[u64;12]=[31,if lp{29}else{28},31,30,31,30,31,31,30,31,30,31];
let mut mo=1u64; for &x in &md{if d<x{break;} d-=x; mo+=1;}
format!("{:04}-{:02}-{:02}T{:02}:{:02}:{:02}Z",y,mo,d+1,h,m,s)
}
pub fn get(
state: &DashMap<String, DashMap<String, crate::engine::types::DocumentState>>,
storage: &Arc<dyn StorageBackend>,
collection: &str,
key: &str,
) -> Option<Value> {
let col = state.get(collection)?;
let doc_state = col.get(key)?;
match doc_state.value() {
crate::engine::types::DocumentState::Hot(v) => Some(v.clone()),
crate::engine::types::DocumentState::Cold(ptr) => {
if let Ok(bytes) = storage.read_at(ptr.offset, ptr.length) {
if let Ok(entry) = serde_json::from_slice::<crate::engine::types::LogEntry>(&bytes) {
return Some(entry.value);
}
}
None
}
}
}
pub fn get_all(
state: &DashMap<String, DashMap<String, crate::engine::types::DocumentState>>,
storage: &Arc<dyn StorageBackend>,
collection: &str,
) -> HashMap<String, Value> {
let mut results = HashMap::new();
if let Some(col) = state.get(collection) {
for entry in col.iter() {
let key = entry.key();
match entry.value() {
crate::engine::types::DocumentState::Hot(v) => {
results.insert(key.clone(), v.clone());
}
crate::engine::types::DocumentState::Cold(ptr) => {
if let Ok(bytes) = storage.read_at(ptr.offset, ptr.length) {
if let Ok(log_entry) = serde_json::from_slice::<crate::engine::types::LogEntry>(&bytes) {
results.insert(key.clone(), log_entry.value);
}
}
}
}
}
}
results
}
pub fn get_batch(
state: &DashMap<String, DashMap<String, crate::engine::types::DocumentState>>,
storage: &Arc<dyn StorageBackend>,
collection: &str,
keys: Vec<String>,
) -> HashMap<String, Value> {
let mut results = HashMap::new();
if let Some(col) = state.get(collection) {
for key in keys {
if let Some(entry) = col.get(&key) {
match entry.value() {
crate::engine::types::DocumentState::Hot(v) => {
results.insert(key, v.clone());
}
crate::engine::types::DocumentState::Cold(ptr) => {
if let Ok(bytes) = storage.read_at(ptr.offset, ptr.length) {
if let Ok(log_entry) = serde_json::from_slice::<crate::engine::types::LogEntry>(&bytes) {
results.insert(key, log_entry.value);
}
}
}
}
}
}
}
results
}
pub fn insert_batch(
state: &DashMap<String, DashMap<String, crate::engine::types::DocumentState>>,
indexes: &DashMap<String, DashMap<String, DashSet<String>>>,
storage: &Arc<dyn StorageBackend>,
tx: &tokio::sync::broadcast::Sender<String>,
#[cfg(feature = "schema")] schemas: &DashMap<String, Arc<(Value, jsonschema::Validator)>>,
collection: &str,
items: Vec<(String, Value)>,
) -> Result<(), DbError> {
let col = state
.entry(collection.to_string())
.or_insert_with(DashMap::new);
let tx_id = uuid::Uuid::new_v4().to_string();
storage.write_entry(&LogEntry::new(
"TX_BEGIN".into(),
collection.into(),
tx_id.clone(),
Value::Null,
))?;
for (key, mut value) in items {
let now = now_iso();
let mut existing_val = None;
if let Some(doc_state) = col.get(&key) {
match doc_state.value() {
crate::engine::types::DocumentState::Hot(v) => {
existing_val = Some(v.clone());
}
crate::engine::types::DocumentState::Cold(ptr) => {
if let Ok(bytes) = storage.read_at(ptr.offset, ptr.length) {
if let Ok(entry) = serde_json::from_slice::<crate::engine::types::LogEntry>(&bytes) {
existing_val = Some(entry.value);
}
}
}
}
}
if let Some(existing) = existing_val {
let existing_v = existing.get("_v").and_then(|v| v.as_u64()).unwrap_or(0);
let incoming_v = value.get("_v").and_then(|v| v.as_u64());
if let Some(iv) = incoming_v {
if iv <= existing_v {
debug!("⚡ Conflict error: {}/{} incoming _v={} <= stored _v={}", collection, key, iv, existing_v);
return Err(DbError::Conflict);
}
}
let orig_created = existing.get("createdAt").and_then(|v| v.as_str()).unwrap_or(&now).to_string();
let new_v = existing_v + 1;
if let Some(obj) = value.as_object_mut() {
obj.insert("_v".to_string(), serde_json::json!(new_v));
obj.insert("createdAt".to_string(), serde_json::json!(orig_created));
obj.insert("modifiedAt".to_string(), serde_json::json!(now));
}
#[cfg(feature = "schema")]
crate::engine::schema::validate_document(schemas, collection, &value)?;
indexing::unindex_doc(indexes, collection, &key, &existing);
} else {
if let Some(obj) = value.as_object_mut() {
if obj.get("_v").is_none() {
obj.insert("_v".to_string(), serde_json::json!(1u64));
}
obj.insert("createdAt".to_string(), serde_json::json!(now.clone()));
obj.insert("modifiedAt".to_string(), serde_json::json!(now));
}
#[cfg(feature = "schema")]
crate::engine::schema::validate_document(schemas, collection, &value)?;
}
col.insert(key.clone(), crate::engine::types::DocumentState::Hot(value.clone()));
indexing::index_doc(indexes, collection, &key, &value);
let entry = LogEntry::new(
"INSERT".to_string(),
collection.to_string(),
key.clone(),
value.clone(),
);
storage.write_entry(&entry)?;
let new_v = value.get("_v").and_then(|v| v.as_u64()).unwrap_or(0);
let _ = tx.send(
json!({
"event": "change",
"collection": collection,
"key": key,
"new_v": new_v
})
.to_string(),
);
}
storage.write_entry(&LogEntry::new(
"TX_COMMIT".into(),
collection.into(),
tx_id,
Value::Null,
))?;
Ok(())
}
pub fn update(
state: &DashMap<String, DashMap<String, crate::engine::types::DocumentState>>,
indexes: &DashMap<String, DashMap<String, DashSet<String>>>,
storage: &Arc<dyn StorageBackend>,
tx: &tokio::sync::broadcast::Sender<String>,
#[cfg(feature = "schema")] schemas: &DashMap<String, Arc<(Value, jsonschema::Validator)>>,
collection: &str,
key: &str,
updates: Value, ) -> Result<bool, DbError> {
let tx_id = uuid::Uuid::new_v4().to_string();
storage.write_entry(&LogEntry::new(
"TX_BEGIN".into(),
collection.into(),
tx_id.clone(),
Value::Null,
))?;
if let Some(col) = state.get(collection) {
if let Some(doc) = {
if let Some(doc_state) = col.get(key) {
Some(match doc_state.value() {
crate::engine::types::DocumentState::Hot(v) => v.clone(),
crate::engine::types::DocumentState::Cold(ptr) => {
let bytes = storage.read_at(ptr.offset, ptr.length)?;
let entry: crate::engine::types::LogEntry = serde_json::from_slice(&bytes)?;
entry.value
}
})
} else {
None
}
} {
let mut doc = doc;
indexing::unindex_doc(indexes, collection, key, &doc);
if let Some(update_obj) = updates.as_object() {
let existing_v = doc.get("_v").and_then(|v| v.as_u64()).unwrap_or(0);
if let Some(guard_v) = update_obj.get("_v").and_then(|v| v.as_u64()) {
if guard_v != existing_v {
debug!("⚡ Conflict error: {}/{} update guard _v={} != stored _v={}", collection, key, guard_v, existing_v);
return Err(DbError::Conflict);
}
}
if let Some(doc_obj) = doc.as_object_mut() {
for (k, v) in update_obj {
if k == "_v" || k == "createdAt" { continue; }
doc_obj.insert(k.clone(), v.clone());
}
doc_obj.insert("_v".to_string(), serde_json::json!(existing_v + 1));
doc_obj.insert("modifiedAt".to_string(), serde_json::json!(now_iso()));
}
}
let new_value = doc.clone();
#[cfg(feature = "schema")]
crate::engine::schema::validate_document(schemas, collection, &new_value)?;
indexing::index_doc(indexes, collection, key, &new_value);
col.insert(key.to_string(), crate::engine::types::DocumentState::Hot(new_value.clone()));
let entry = LogEntry::new(
"INSERT".to_string(),
collection.to_string(),
key.to_string(),
new_value.clone(),
);
storage.write_entry(&entry)?;
storage.write_entry(&LogEntry::new(
"TX_COMMIT".into(),
collection.into(),
tx_id,
Value::Null,
))?;
let new_v = new_value.get("_v").and_then(|v| v.as_u64()).unwrap_or(0);
let _ = tx.send(
json!({
"event": "change",
"collection": collection,
"key": key,
"new_v": new_v
})
.to_string(),
);
return Ok(true); }
}
storage.write_entry(&LogEntry::new(
"TX_COMMIT".into(),
collection.into(),
tx_id,
Value::Null,
))?;
Ok(false) }
pub fn delete(
state: &DashMap<String, DashMap<String, crate::engine::types::DocumentState>>,
indexes: &DashMap<String, DashMap<String, DashSet<String>>>,
storage: &Arc<dyn StorageBackend>,
tx: &tokio::sync::broadcast::Sender<String>,
collection: &str,
key: &str,
) -> Result<(), DbError> {
let tx_id = uuid::Uuid::new_v4().to_string();
storage.write_entry(&LogEntry::new(
"TX_BEGIN".into(),
collection.into(),
tx_id.clone(),
Value::Null,
))?;
if let Some(col) = state.get(collection) {
if let Some(val) = {
if let Some(doc_state) = col.get(key) {
Some(match doc_state.value() {
crate::engine::types::DocumentState::Hot(v) => v.clone(),
crate::engine::types::DocumentState::Cold(ptr) => {
let bytes = storage.read_at(ptr.offset, ptr.length)?;
let entry: crate::engine::types::LogEntry = serde_json::from_slice(&bytes)?;
entry.value
}
})
} else {
None
}
} {
indexing::unindex_doc(indexes, collection, key, &val);
}
col.remove(key);
}
let entry = LogEntry::new(
"DELETE".to_string(),
collection.to_string(),
key.to_string(),
json!(null),
);
storage.write_entry(&entry)?;
storage.write_entry(&LogEntry::new(
"TX_COMMIT".into(),
collection.into(),
tx_id,
Value::Null,
))?;
let _ = tx.send(
json!({
"event": "change",
"collection": collection,
"key": key,
"new_v": null
})
.to_string(),
);
Ok(())
}
pub fn delete_batch(
state: &DashMap<String, DashMap<String, crate::engine::types::DocumentState>>,
indexes: &DashMap<String, DashMap<String, DashSet<String>>>,
storage: &Arc<dyn StorageBackend>,
tx: &tokio::sync::broadcast::Sender<String>,
collection: &str,
keys: Vec<String>,
) -> Result<(), DbError> {
let tx_id = uuid::Uuid::new_v4().to_string();
storage.write_entry(&LogEntry::new(
"TX_BEGIN".into(),
collection.into(),
tx_id.clone(),
Value::Null,
))?;
if let Some(col) = state.get(collection) {
for key in keys {
if let Some(val) = {
if let Some(doc_state) = col.get(&key) {
Some(match doc_state.value() {
crate::engine::types::DocumentState::Hot(v) => v.clone(),
crate::engine::types::DocumentState::Cold(ptr) => {
let bytes = storage.read_at(ptr.offset, ptr.length)?;
let entry: crate::engine::types::LogEntry = serde_json::from_slice(&bytes)?;
entry.value
}
})
} else {
None
}
} {
indexing::unindex_doc(indexes, collection, &key, &val);
}
col.remove(&key);
let entry = LogEntry::new(
"DELETE".to_string(),
collection.to_string(),
key.clone(),
json!(null),
);
storage.write_entry(&entry)?;
let event = json!({
"event": "change",
"collection": collection,
"key": key,
"new_v": null
})
.to_string();
let _ = tx.send(event);
}
}
storage.write_entry(&LogEntry::new(
"TX_COMMIT".into(),
collection.into(),
tx_id,
Value::Null,
))?;
Ok(())
}
pub fn delete_collection(
state: &DashMap<String, DashMap<String, crate::engine::types::DocumentState>>,
indexes: &DashMap<String, DashMap<String, DashSet<String>>>,
storage: &Arc<dyn StorageBackend>,
tx: &tokio::sync::broadcast::Sender<String>,
collection: &str,
) -> Result<(), DbError> {
let tx_id = uuid::Uuid::new_v4().to_string();
storage.write_entry(&LogEntry::new(
"TX_BEGIN".into(),
collection.into(),
tx_id.clone(),
Value::Null,
))?;
state.remove(collection);
indexes.retain(|k, _| !k.starts_with(&format!("{}:", collection)));
let entry = LogEntry::new(
"DROP".to_string(),
collection.to_string(),
"*".to_string(),
json!(null),
);
storage.write_entry(&entry)?;
storage.write_entry(&LogEntry::new(
"TX_COMMIT".into(),
collection.into(),
tx_id,
Value::Null,
))?;
let event = json!({
"event": "change",
"collection": collection,
"key": "*",
"new_v": null
})
.to_string();
let _ = tx.send(event);
Ok(())
}