use dashmap::{DashMap, DashSet};
use serde_json::{json, Value};
use std::sync::Arc;
use tracing::debug;
use super::common::now_iso;
use super::super::{indexing, StorageBackend};
use super::super::types::{DbError, LogEntry};
pub fn update(
state: &DashMap<String, DashMap<String, crate::engine::types::DocumentState>>,
indexes: &DashMap<String, DashMap<String, DashSet<String>>>,
storage: &Arc<dyn StorageBackend>,
tx: &tokio::sync::broadcast::Sender<String>,
#[cfg(feature = "schema")] schemas: &DashMap<String, Arc<(Value, jsonschema::Validator)>>,
collection: &str,
key: &str,
updates: Value, ) -> Result<bool, DbError> {
let tx_id = uuid::Uuid::new_v4().to_string();
storage.write_entry(&LogEntry::new(
"TX_BEGIN".into(),
collection.into(),
tx_id.clone(),
Value::Null,
))?;
if let Some(col) = state.get(collection) {
if let Some(doc) = {
if let Some(doc_state) = col.get(key) {
Some(match doc_state.value() {
crate::engine::types::DocumentState::Hot(v) => v.clone(),
crate::engine::types::DocumentState::Cold(ptr) => {
let bytes = storage.read_at(ptr.offset, ptr.length)?;
let entry: crate::engine::types::LogEntry = serde_json::from_slice(&bytes)?;
entry.value
}
})
} else {
None
}
} {
let mut doc = doc;
indexing::unindex_doc(indexes, collection, key, &doc);
if let Some(update_obj) = updates.as_object() {
let existing_v = doc.get("_v").and_then(|v| v.as_u64()).unwrap_or(0);
if let Some(guard_v) = update_obj.get("_v").and_then(|v| v.as_u64()) {
if guard_v != existing_v {
debug!("⚡ Conflict error: {}/{} update guard _v={} != stored _v={}", collection, key, guard_v, existing_v);
return Err(DbError::Conflict);
}
}
if let Some(doc_obj) = doc.as_object_mut() {
for (k, v) in update_obj {
if k == "_v" || k == "createdAt" { continue; }
doc_obj.insert(k.clone(), v.clone());
}
doc_obj.insert("_v".to_string(), serde_json::json!(existing_v + 1));
doc_obj.insert("modifiedAt".to_string(), serde_json::json!(now_iso()));
}
}
let new_value = doc.clone();
#[cfg(feature = "schema")]
crate::engine::schema::validate_document(schemas, collection, &new_value)?;
indexing::index_doc(indexes, collection, key, &new_value);
col.insert(key.to_string(), crate::engine::types::DocumentState::Hot(new_value.clone()));
let entry = LogEntry::new(
"INSERT".to_string(),
collection.to_string(),
key.to_string(),
new_value.clone(),
);
storage.write_entry(&entry)?;
storage.write_entry(&LogEntry::new(
"TX_COMMIT".into(),
collection.into(),
tx_id,
Value::Null,
))?;
let new_v = new_value.get("_v").and_then(|v| v.as_u64()).unwrap_or(0);
let _ = tx.send(
json!({
"event": "change",
"collection": collection,
"key": key,
"new_v": new_v
})
.to_string(),
);
return Ok(true); }
}
storage.write_entry(&LogEntry::new(
"TX_COMMIT".into(),
collection.into(),
tx_id,
Value::Null,
))?;
Ok(false) }