use dashmap::{DashMap, DashSet};
use serde_json::{json, Value};
use std::sync::Arc;
use super::super::{indexing, StorageBackend};
use super::super::types::{DbError, LogEntry};
pub fn delete(
state: &DashMap<String, DashMap<String, crate::engine::types::DocumentState>>,
indexes: &DashMap<String, DashMap<String, DashSet<String>>>,
storage: &Arc<dyn StorageBackend>,
tx: &tokio::sync::broadcast::Sender<String>,
collection: &str,
keys: Vec<String>,
) -> Result<(), DbError> {
let tx_id = uuid::Uuid::new_v4().to_string();
storage.write_entry(&LogEntry::new(
"TX_BEGIN".into(),
collection.into(),
tx_id.clone(),
Value::Null,
))?;
if let Some(col) = state.get(collection) {
for key in keys {
if let Some(val) = {
if let Some(doc_state) = col.get(&key) {
Some(match doc_state.value() {
crate::engine::types::DocumentState::Hot(v) => v.clone(),
crate::engine::types::DocumentState::Cold(ptr) => {
let bytes = storage.read_at(ptr.offset, ptr.length)?;
let entry: crate::engine::types::LogEntry = serde_json::from_slice(&bytes)?;
entry.value
}
})
} else {
None
}
} {
indexing::unindex_doc(indexes, collection, &key, &val);
}
col.remove(&key);
let entry = LogEntry::new(
"DELETE".to_string(),
collection.to_string(),
key.clone(),
json!(null),
);
storage.write_entry(&entry)?;
let event = json!({
"event": "change",
"collection": collection,
"key": key,
"new_v": null
})
.to_string();
let _ = tx.send(event);
}
}
storage.write_entry(&LogEntry::new(
"TX_COMMIT".into(),
collection.into(),
tx_id,
Value::Null,
))?;
Ok(())
}
pub fn delete_collection(
state: &DashMap<String, DashMap<String, crate::engine::types::DocumentState>>,
indexes: &DashMap<String, DashMap<String, DashSet<String>>>,
storage: &Arc<dyn StorageBackend>,
tx: &tokio::sync::broadcast::Sender<String>,
collection: &str,
) -> Result<(), DbError> {
let tx_id = uuid::Uuid::new_v4().to_string();
storage.write_entry(&LogEntry::new(
"TX_BEGIN".into(),
collection.into(),
tx_id.clone(),
Value::Null,
))?;
state.remove(collection);
indexes.retain(|k, _| !k.starts_with(&format!("{}:", collection)));
let entry = LogEntry::new(
"DROP".to_string(),
collection.to_string(),
"*".to_string(),
json!(null),
);
storage.write_entry(&entry)?;
storage.write_entry(&LogEntry::new(
"TX_COMMIT".into(),
collection.into(),
tx_id,
Value::Null,
))?;
let event = json!({
"event": "change",
"collection": collection,
"key": "*",
"new_v": null
})
.to_string();
let _ = tx.send(event);
Ok(())
}