use dashmap::{DashMap, DashSet};
use serde_json::{json, Value};
use std::sync::Arc;
use super::common::now_iso;
use super::super::{indexing, StorageBackend};
use super::super::types::{DbError, LogEntry};
pub fn insert(
state: &DashMap<String, DashMap<String, crate::engine::types::DocumentState>>,
indexes: &DashMap<String, DashMap<String, DashSet<String>>>,
storage: &Arc<dyn StorageBackend>,
tx: &tokio::sync::broadcast::Sender<String>,
#[cfg(feature = "schema")] schemas: &DashMap<String, Arc<(Value, jsonschema::Validator)>>,
collection: &str,
items: Vec<(String, Value)>,
) -> Result<(), DbError> {
let col = state
.entry(collection.to_string())
.or_insert_with(DashMap::new);
let tx_id = uuid::Uuid::new_v4().to_string();
storage.write_entry(&LogEntry::new(
"TX_BEGIN".into(),
collection.into(),
tx_id.clone(),
Value::Null,
))?;
for (key, mut value) in items {
let now = now_iso();
let mut existing_val = None;
if let Some(doc_state) = col.get(&key) {
match doc_state.value() {
crate::engine::types::DocumentState::Hot(v) => {
existing_val = Some(v.clone());
}
crate::engine::types::DocumentState::Cold(ptr) => {
if let Ok(bytes) = storage.read_at(ptr.offset, ptr.length) {
if let Ok(entry) = serde_json::from_slice::<crate::engine::types::LogEntry>(&bytes) {
existing_val = Some(entry.value);
}
}
}
}
}
if let Some(existing) = existing_val {
let existing_v = existing.get("_v").and_then(|v| v.as_u64()).unwrap_or(0);
let incoming_v = value.get("_v").and_then(|v| v.as_u64());
if let Some(iv) = incoming_v {
if iv <= existing_v {
tracing::debug!("⚡ Conflict error: {}/{} incoming _v={} <= stored _v={}", collection, key, iv, existing_v);
return Err(DbError::Conflict);
}
}
let orig_created = existing.get("createdAt").and_then(|v| v.as_str()).unwrap_or(&now).to_string();
let new_v = existing_v + 1;
if let Some(obj) = value.as_object_mut() {
obj.insert("_v".to_string(), serde_json::json!(new_v));
obj.insert("createdAt".to_string(), serde_json::json!(orig_created));
obj.insert("modifiedAt".to_string(), serde_json::json!(now));
}
#[cfg(feature = "schema")]
crate::engine::schema::validate_document(schemas, collection, &value)?;
indexing::unindex_doc(indexes, collection, &key, &existing);
} else {
if let Some(obj) = value.as_object_mut() {
if obj.get("_v").is_none() {
obj.insert("_v".to_string(), serde_json::json!(1u64));
}
obj.insert("createdAt".to_string(), serde_json::json!(now.clone()));
obj.insert("modifiedAt".to_string(), serde_json::json!(now));
}
#[cfg(feature = "schema")]
crate::engine::schema::validate_document(schemas, collection, &value)?;
}
col.insert(key.clone(), crate::engine::types::DocumentState::Hot(value.clone()));
indexing::index_doc(indexes, collection, &key, &value);
let entry = LogEntry::new(
"INSERT".to_string(),
collection.to_string(),
key.clone(),
value.clone(),
);
storage.write_entry(&entry)?;
let new_v = value.get("_v").and_then(|v| v.as_u64()).unwrap_or(0);
let _ = tx.send(
json!({
"event": "change",
"collection": collection,
"key": key,
"new_v": new_v
})
.to_string(),
);
}
storage.write_entry(&LogEntry::new(
"TX_COMMIT".into(),
collection.into(),
tx_id,
Value::Null,
))?;
Ok(())
}