use dashmap::{DashMap, DashSet};
use serde_json::{json, Value};
use std::collections::HashMap;
use std::sync::Arc;
use tracing::debug;
use super::types::{DbError, LogEntry};
use super::{indexing, StorageBackend};
fn now_iso() -> String {
use web_time::{SystemTime, UNIX_EPOCH};
let secs = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
let (s, m, h) = (secs % 60, (secs / 60) % 60, (secs / 3600) % 24);
let mut d = secs / 86400; let mut y = 1970u64;
loop { let dy = if (y%4==0 && y%100!=0)||y%400==0{366}else{365}; if d<dy{break;} d-=dy; y+=1; }
let lp = (y%4==0&&y%100!=0)||y%400==0;
let md:[u64;12]=[31,if lp{29}else{28},31,30,31,30,31,31,30,31,30,31];
let mut mo=1u64; for &x in &md{if d<x{break;} d-=x; mo+=1;}
format!("{:04}-{:02}-{:02}T{:02}:{:02}:{:02}Z",y,mo,d+1,h,m,s)
}
pub fn get(
state: &DashMap<String, DashMap<String, crate::engine::types::DocumentState>>,
storage: &Arc<dyn StorageBackend>,
collection: &str,
key: &str,
) -> Option<Value> {
let col = state.get(collection)?;
let doc_state = col.get(key)?;
match doc_state.value() {
crate::engine::types::DocumentState::Hot(v) => Some(v.clone()),
crate::engine::types::DocumentState::Cold(ptr) => {
if let Ok(bytes) = storage.read_at(ptr.offset, ptr.length) {
if let Ok(entry) = serde_json::from_slice::<crate::engine::types::LogEntry>(&bytes) {
return Some(entry.value);
}
}
None
}
}
}
pub fn get_all(
state: &DashMap<String, DashMap<String, crate::engine::types::DocumentState>>,
storage: &Arc<dyn StorageBackend>,
collection: &str,
) -> HashMap<String, Value> {
let mut results = HashMap::new();
if let Some(col) = state.get(collection) {
for entry in col.iter() {
let key = entry.key();
match entry.value() {
crate::engine::types::DocumentState::Hot(v) => {
results.insert(key.clone(), v.clone());
}
crate::engine::types::DocumentState::Cold(ptr) => {
if let Ok(bytes) = storage.read_at(ptr.offset, ptr.length) {
if let Ok(log_entry) = serde_json::from_slice::<crate::engine::types::LogEntry>(&bytes) {
results.insert(key.clone(), log_entry.value);
}
}
}
}
}
}
results
}
pub fn get_batch(
state: &DashMap<String, DashMap<String, crate::engine::types::DocumentState>>,
storage: &Arc<dyn StorageBackend>,
collection: &str,
keys: Vec<String>,
) -> HashMap<String, Value> {
let mut results = HashMap::new();
if let Some(col) = state.get(collection) {
for key in keys {
if let Some(entry) = col.get(&key) {
match entry.value() {
crate::engine::types::DocumentState::Hot(v) => {
results.insert(key, v.clone());
}
crate::engine::types::DocumentState::Cold(ptr) => {
if let Ok(bytes) = storage.read_at(ptr.offset, ptr.length) {
if let Ok(log_entry) = serde_json::from_slice::<crate::engine::types::LogEntry>(&bytes) {
results.insert(key, log_entry.value);
}
}
}
}
}
}
}
results
}
pub fn insert_batch(
state: &DashMap<String, DashMap<String, crate::engine::types::DocumentState>>,
indexes: &DashMap<String, DashMap<String, DashSet<String>>>,
storage: &Arc<dyn StorageBackend>,
tx: &tokio::sync::broadcast::Sender<String>,
collection: &str,
items: Vec<(String, Value)>,
) -> Result<(), DbError> {
let col = state
.entry(collection.to_string())
.or_insert_with(DashMap::new);
for (key, mut value) in items {
let now = now_iso();
let mut existing_val = None;
if let Some(doc_state) = col.get(&key) {
match doc_state.value() {
crate::engine::types::DocumentState::Hot(v) => {
existing_val = Some(v.clone());
}
crate::engine::types::DocumentState::Cold(ptr) => {
if let Ok(bytes) = storage.read_at(ptr.offset, ptr.length) {
if let Ok(entry) = serde_json::from_slice::<crate::engine::types::LogEntry>(&bytes) {
existing_val = Some(entry.value);
}
}
}
}
}
if let Some(existing) = existing_val {
let existing_v = existing.get("_v").and_then(|v| v.as_u64()).unwrap_or(0);
let incoming_v = value.get("_v").and_then(|v| v.as_u64());
if let Some(iv) = incoming_v {
if iv <= existing_v {
debug!("⚡ Conflict skip: {}/{} incoming _v={} <= stored _v={}", collection, key, iv, existing_v);
continue;
}
}
let orig_created = existing.get("createdAt").and_then(|v| v.as_str()).unwrap_or(&now).to_string();
let new_v = existing_v + 1;
if let Some(obj) = value.as_object_mut() {
obj.insert("_v".to_string(), serde_json::json!(new_v));
obj.insert("createdAt".to_string(), serde_json::json!(orig_created));
obj.insert("modifiedAt".to_string(), serde_json::json!(now));
}
indexing::unindex_doc(indexes, collection, &key, &existing);
} else {
if let Some(obj) = value.as_object_mut() {
if obj.get("_v").is_none() {
obj.insert("_v".to_string(), serde_json::json!(1u64));
}
obj.insert("createdAt".to_string(), serde_json::json!(now.clone()));
obj.insert("modifiedAt".to_string(), serde_json::json!(now));
}
}
col.insert(key.clone(), crate::engine::types::DocumentState::Hot(value.clone()));
indexing::index_doc(indexes, collection, &key, &value);
let entry = LogEntry {
cmd: "INSERT".to_string(),
collection: collection.to_string(),
key: key.clone(),
value: value.clone(),
};
storage.write_entry(&entry)?;
let new_v = value.get("_v").and_then(|v| v.as_u64()).unwrap_or(0);
let _ = tx.send(
json!({
"event": "change",
"collection": collection,
"key": key,
"new_v": new_v
})
.to_string(),
);
}
Ok(())
}
pub fn update(
state: &DashMap<String, DashMap<String, crate::engine::types::DocumentState>>,
indexes: &DashMap<String, DashMap<String, DashSet<String>>>,
storage: &Arc<dyn StorageBackend>,
tx: &tokio::sync::broadcast::Sender<String>,
collection: &str,
key: &str,
updates: Value, ) -> Result<bool, DbError> {
if let Some(col) = state.get(collection) {
if let Some(doc_state) = col.get(key) {
let mut doc = match doc_state.value() {
crate::engine::types::DocumentState::Hot(v) => v.clone(),
crate::engine::types::DocumentState::Cold(ptr) => {
let bytes = storage.read_at(ptr.offset, ptr.length)?;
let entry: crate::engine::types::LogEntry = serde_json::from_slice(&bytes)?;
entry.value
}
};
indexing::unindex_doc(indexes, collection, key, &doc);
if let Some(update_obj) = updates.as_object() {
if let Some(doc_obj) = doc.as_object_mut() {
for (k, v) in update_obj {
if k == "_v" || k == "createdAt" { continue; }
doc_obj.insert(k.clone(), v.clone());
}
let old_v = doc_obj.get("_v").and_then(|v| v.as_u64()).unwrap_or(0);
doc_obj.insert("_v".to_string(), serde_json::json!(old_v + 1));
doc_obj.insert("modifiedAt".to_string(), serde_json::json!(now_iso()));
}
}
let new_value = doc.clone();
indexing::index_doc(indexes, collection, key, &new_value);
col.insert(key.to_string(), crate::engine::types::DocumentState::Hot(new_value.clone()));
let entry = LogEntry {
cmd: "INSERT".to_string(),
collection: collection.to_string(),
key: key.to_string(),
value: new_value.clone(),
};
storage.write_entry(&entry)?;
let new_v = new_value.get("_v").and_then(|v| v.as_u64()).unwrap_or(0);
let _ = tx.send(
json!({
"event": "change",
"collection": collection,
"key": key,
"new_v": new_v
})
.to_string(),
);
return Ok(true); }
}
Ok(false) }
pub fn delete(
state: &DashMap<String, DashMap<String, crate::engine::types::DocumentState>>,
indexes: &DashMap<String, DashMap<String, DashSet<String>>>,
storage: &Arc<dyn StorageBackend>,
tx: &tokio::sync::broadcast::Sender<String>,
collection: &str,
key: &str,
) -> Result<(), DbError> {
if let Some(col) = state.get(collection) {
if let Some(doc_state) = col.get(key) {
let val = match doc_state.value() {
crate::engine::types::DocumentState::Hot(v) => v.clone(),
crate::engine::types::DocumentState::Cold(ptr) => {
let bytes = storage.read_at(ptr.offset, ptr.length)?;
let entry: crate::engine::types::LogEntry = serde_json::from_slice(&bytes)?;
entry.value
}
};
indexing::unindex_doc(indexes, collection, key, &val);
}
col.remove(key);
}
let entry = LogEntry {
cmd: "DELETE".to_string(),
collection: collection.to_string(),
key: key.to_string(),
value: json!(null),
};
storage.write_entry(&entry)?;
let _ = tx.send(
json!({
"event": "change",
"collection": collection,
"key": key,
"new_v": null
})
.to_string(),
);
Ok(())
}
pub fn delete_batch(
state: &DashMap<String, DashMap<String, crate::engine::types::DocumentState>>,
indexes: &DashMap<String, DashMap<String, DashSet<String>>>,
storage: &Arc<dyn StorageBackend>,
tx: &tokio::sync::broadcast::Sender<String>,
collection: &str,
keys: Vec<String>,
) -> Result<(), DbError> {
if let Some(col) = state.get(collection) {
for key in keys {
if let Some(doc_state) = col.get(&key) {
let val = match doc_state.value() {
crate::engine::types::DocumentState::Hot(v) => v.clone(),
crate::engine::types::DocumentState::Cold(ptr) => {
let bytes = storage.read_at(ptr.offset, ptr.length)?;
let entry: crate::engine::types::LogEntry = serde_json::from_slice(&bytes)?;
entry.value
}
};
indexing::unindex_doc(indexes, collection, &key, &val);
}
col.remove(&key);
let entry = LogEntry {
cmd: "DELETE".to_string(),
collection: collection.to_string(),
key: key.clone(),
value: json!(null),
};
storage.write_entry(&entry)?;
let event = json!({
"event": "change",
"collection": collection,
"key": key,
"new_v": null
})
.to_string();
let _ = tx.send(event);
}
}
Ok(())
}
pub fn delete_collection(
state: &DashMap<String, DashMap<String, crate::engine::types::DocumentState>>,
indexes: &DashMap<String, DashMap<String, DashSet<String>>>,
storage: &Arc<dyn StorageBackend>,
tx: &tokio::sync::broadcast::Sender<String>,
collection: &str,
) -> Result<(), DbError> {
state.remove(collection);
indexes.retain(|k, _| !k.starts_with(&format!("{}:", collection)));
let entry = LogEntry {
cmd: "DROP".to_string(),
collection: collection.to_string(),
key: "*".to_string(),
value: json!(null),
};
storage.write_entry(&entry)?;
let event = json!({
"event": "change",
"collection": collection,
"key": "*",
"new_v": null
})
.to_string();
let _ = tx.send(event);
Ok(())
}