use crate::identity::AgentId;
use crate::kv::{KvEntry, KvStore};
use saorsa_gossip_crdt_sync::DeltaCrdt;
use saorsa_gossip_types::PeerId;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
pub type UniqueTag = (PeerId, u64);
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KvStoreDelta {
pub added: HashMap<String, (KvEntry, UniqueTag)>,
pub removed: HashMap<String, HashSet<UniqueTag>>,
pub updated: HashMap<String, KvEntry>,
pub name_update: Option<String>,
pub allowlist_additions: Option<Vec<AgentId>>,
pub allowlist_removals: Option<Vec<AgentId>>,
pub version: u64,
}
impl KvStoreDelta {
#[must_use]
pub fn new(version: u64) -> Self {
Self {
added: HashMap::new(),
removed: HashMap::new(),
updated: HashMap::new(),
name_update: None,
allowlist_additions: None,
allowlist_removals: None,
version,
}
}
#[must_use]
pub fn for_put(key: String, entry: KvEntry, tag: UniqueTag, version: u64) -> Self {
let mut delta = Self::new(version);
delta.added.insert(key, (entry, tag));
delta
}
#[must_use]
pub fn for_update(key: String, entry: KvEntry, version: u64) -> Self {
let mut delta = Self::new(version);
delta.updated.insert(key, entry);
delta
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.added.is_empty()
&& self.removed.is_empty()
&& self.updated.is_empty()
&& self.name_update.is_none()
&& self.allowlist_additions.is_none()
&& self.allowlist_removals.is_none()
}
}
impl DeltaCrdt for KvStore {
type Delta = KvStoreDelta;
fn merge(&mut self, delta: &Self::Delta) -> anyhow::Result<()> {
let peer_id = PeerId::new([0u8; 32]);
self.merge_delta(delta, peer_id, None)
.map_err(|e| anyhow::anyhow!("KvStore delta merge failed: {e}"))
}
fn delta(&self, since_version: u64) -> Option<Self::Delta> {
let current = self.current_version();
if since_version >= current {
return None;
}
Some(self.full_delta())
}
fn version(&self) -> u64 {
self.current_version()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::identity::AgentId;
use crate::kv::store::AccessPolicy;
fn agent(n: u8) -> AgentId {
AgentId([n; 32])
}
fn peer(n: u8) -> PeerId {
PeerId::new([n; 32])
}
fn store_id(n: u8) -> crate::kv::KvStoreId {
crate::kv::KvStoreId::new([n; 32])
}
#[test]
fn test_empty_delta() {
let delta = KvStoreDelta::new(1);
assert!(delta.is_empty());
assert_eq!(delta.version, 1);
}
#[test]
fn test_delta_with_put() {
let entry = KvEntry::new("k".to_string(), b"v".to_vec(), "text/plain".to_string());
let delta = KvStoreDelta::for_put("k".to_string(), entry, (peer(1), 1), 1);
assert!(!delta.is_empty());
assert_eq!(delta.added.len(), 1);
}
#[test]
fn test_delta_with_update() {
let entry = KvEntry::new("k".to_string(), b"v".to_vec(), "text/plain".to_string());
let delta = KvStoreDelta::for_update("k".to_string(), entry, 2);
assert!(!delta.is_empty());
assert_eq!(delta.updated.len(), 1);
}
#[test]
fn test_merge_delta_with_new_entry() {
let owner = agent(1);
let writer = agent(2);
let id = store_id(1);
let mut store = KvStore::new(id, "Store".to_string(), owner, AccessPolicy::Allowlisted);
store.allow_writer(writer, &owner).expect("allow");
let entry = KvEntry::new(
"newkey".to_string(),
b"value".to_vec(),
"text/plain".to_string(),
);
let delta = KvStoreDelta::for_put("newkey".to_string(), entry, (peer(2), 1), 1);
store
.merge_delta(&delta, peer(2), Some(&writer))
.expect("merge delta");
assert!(store.get("newkey").is_some());
assert_eq!(store.get("newkey").expect("entry").value, b"value");
}
#[test]
fn test_merge_delta_with_name_update() {
let owner = agent(1);
let id = store_id(1);
let mut store = KvStore::new(id, "Original".to_string(), owner, AccessPolicy::Signed);
let mut delta = KvStoreDelta::new(1);
delta.name_update = Some("Updated".to_string());
store
.merge_delta(&delta, peer(1), Some(&owner))
.expect("merge delta");
assert_eq!(store.name(), "Updated");
}
#[test]
fn test_delta_crdt_trait() {
let owner = agent(1);
let id = store_id(1);
let mut s1 = KvStore::new(
id,
"Store".to_string(),
owner,
AccessPolicy::Encrypted {
group_id: vec![1, 2, 3],
},
);
let mut s2 = KvStore::new(
id,
"Store".to_string(),
owner,
AccessPolicy::Encrypted {
group_id: vec![1, 2, 3],
},
);
s2.put(
"key".to_string(),
b"val".to_vec(),
"text/plain".to_string(),
peer(2),
)
.expect("put");
let delta = DeltaCrdt::delta(&s2, 0).expect("delta");
DeltaCrdt::merge(&mut s1, &delta).expect("merge");
assert!(DeltaCrdt::version(&s1) > 0);
}
#[test]
fn test_delta_serialization() {
let mut delta = KvStoreDelta::new(5);
delta.allowlist_additions = Some(vec![agent(1), agent(2)]);
let bytes = bincode::serialize(&delta).expect("serialize");
let restored: KvStoreDelta = bincode::deserialize(&bytes).expect("deserialize");
assert_eq!(delta.version, restored.version);
assert!(restored.allowlist_additions.is_some());
assert_eq!(restored.allowlist_additions.as_ref().map(Vec::len), Some(2));
}
}