use crate::identity::AgentId;
use crate::kv::{KvEntry, KvError, KvStoreDelta, Result};
use saorsa_gossip_crdt_sync::{LwwRegister, OrSet};
use saorsa_gossip_types::PeerId;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum AccessPolicy {
Signed,
Allowlisted,
Encrypted {
group_id: Vec<u8>,
},
}
impl std::fmt::Display for AccessPolicy {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Signed => write!(f, "signed"),
Self::Allowlisted => write!(f, "allowlisted"),
Self::Encrypted { .. } => write!(f, "encrypted"),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct KvStoreId([u8; 32]);
impl KvStoreId {
#[must_use]
pub fn new(bytes: [u8; 32]) -> Self {
Self(bytes)
}
#[must_use]
pub fn as_bytes(&self) -> &[u8; 32] {
&self.0
}
#[must_use]
pub fn from_content(name: &str, creator: &AgentId) -> Self {
let mut hasher = blake3::Hasher::new();
hasher.update(b"x0x.store");
hasher.update(name.as_bytes());
hasher.update(creator.as_bytes());
Self(*hasher.finalize().as_bytes())
}
}
impl std::fmt::Display for KvStoreId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", hex::encode(self.0))
}
}
fn default_seq_counter() -> Arc<AtomicU64> {
Arc::new(AtomicU64::new(0))
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KvStore {
id: KvStoreId,
keys: OrSet<String>,
entries: HashMap<String, KvEntry>,
name: LwwRegister<String>,
#[serde(default = "default_policy")]
policy: AccessPolicy,
#[serde(default)]
owner: Option<AgentId>,
#[serde(default)]
allowed_writers: HashSet<AgentId>,
#[serde(default)]
version: u64,
#[serde(skip, default = "default_seq_counter")]
seq_counter: Arc<AtomicU64>,
}
fn default_policy() -> AccessPolicy {
AccessPolicy::Signed
}
impl KvStore {
#[must_use]
pub fn new(id: KvStoreId, name: String, owner: AgentId, policy: AccessPolicy) -> Self {
Self {
id,
keys: OrSet::new(),
entries: HashMap::new(),
name: LwwRegister::new(name),
policy,
owner: Some(owner),
allowed_writers: HashSet::new(),
version: 0,
seq_counter: Arc::new(AtomicU64::new(0)),
}
}
pub fn next_seq(&self) -> u64 {
self.seq_counter.fetch_add(1, Ordering::Relaxed) + 1
}
#[must_use]
pub fn current_version(&self) -> u64 {
self.version
}
#[must_use]
pub fn id(&self) -> &KvStoreId {
&self.id
}
#[must_use]
pub fn name(&self) -> &str {
self.name.get()
}
#[must_use]
pub fn policy(&self) -> &AccessPolicy {
&self.policy
}
#[must_use]
pub fn owner(&self) -> Option<&AgentId> {
self.owner.as_ref()
}
#[must_use]
pub fn allowed_writers(&self) -> &HashSet<AgentId> {
&self.allowed_writers
}
#[must_use]
pub fn len(&self) -> usize {
self.entries.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
#[must_use]
pub fn is_authorized(&self, agent_id: &AgentId) -> bool {
match &self.policy {
AccessPolicy::Signed => {
self.owner.as_ref().is_some_and(|o| o == agent_id)
}
AccessPolicy::Allowlisted => {
self.owner.as_ref().is_some_and(|o| o == agent_id)
|| self.allowed_writers.contains(agent_id)
}
AccessPolicy::Encrypted { .. } => {
true
}
}
}
pub fn allow_writer(&mut self, writer: AgentId, caller: &AgentId) -> Result<()> {
if !self.owner.as_ref().is_some_and(|o| o == caller) {
return Err(KvError::Unauthorized(
"only the store owner can modify the allowlist".to_string(),
));
}
self.allowed_writers.insert(writer);
self.version += 1;
Ok(())
}
pub fn deny_writer(&mut self, writer: &AgentId, caller: &AgentId) -> Result<()> {
if !self.owner.as_ref().is_some_and(|o| o == caller) {
return Err(KvError::Unauthorized(
"only the store owner can modify the allowlist".to_string(),
));
}
self.allowed_writers.remove(writer);
self.version += 1;
Ok(())
}
pub fn put(
&mut self,
key: String,
value: Vec<u8>,
content_type: String,
peer_id: PeerId,
) -> Result<()> {
if value.len() > crate::kv::entry::MAX_INLINE_SIZE {
return Err(KvError::ValueTooLarge {
size: value.len(),
max: crate::kv::entry::MAX_INLINE_SIZE,
});
}
let seq = self.next_seq();
self.keys
.add(key.clone(), (peer_id, seq))
.map_err(|e| KvError::Merge(format!("OR-Set add failed: {e}")))?;
if let Some(existing) = self.entries.get_mut(&key) {
existing.update_value(value, content_type);
} else {
self.entries
.insert(key.clone(), KvEntry::new(key, value, content_type));
}
self.version += 1;
Ok(())
}
#[must_use]
pub fn get(&self, key: &str) -> Option<&KvEntry> {
let key_string = key.to_string();
if self.keys.elements().contains(&&key_string) {
self.entries.get(key)
} else {
None
}
}
pub fn remove(&mut self, key: &str) -> Result<()> {
if !self.entries.contains_key(key) {
return Err(KvError::KeyNotFound(key.to_string()));
}
self.keys
.remove(&key.to_string())
.map_err(|e| KvError::Merge(format!("OR-Set remove failed: {e}")))?;
self.entries.remove(key);
self.version += 1;
Ok(())
}
#[must_use]
pub fn active_keys(&self) -> Vec<&String> {
self.keys.elements().into_iter().collect()
}
#[must_use]
pub fn active_entries(&self) -> Vec<&KvEntry> {
let active: HashSet<String> = self.keys.elements().into_iter().cloned().collect();
self.entries
.values()
.filter(|e| active.contains(&e.key))
.collect()
}
pub fn update_name(&mut self, name: String, peer_id: PeerId) {
self.name.set(name, peer_id);
self.version += 1;
}
pub fn merge_delta(
&mut self,
delta: &KvStoreDelta,
peer_id: PeerId,
writer: Option<&AgentId>,
) -> Result<()> {
if let Some(writer_id) = writer {
if !self.is_authorized(writer_id) {
tracing::debug!(
"rejected delta from unauthorized writer {} for store {}",
hex::encode(writer_id.as_bytes()),
self.id
);
return Ok(()); }
} else {
match &self.policy {
AccessPolicy::Encrypted { .. } => {} _ => {
tracing::debug!(
"rejected anonymous delta for non-encrypted store {}",
self.id
);
return Ok(());
}
}
}
if let Some(ref additions) = delta.allowlist_additions {
if writer.is_some_and(|w| self.owner.as_ref().is_some_and(|o| o == w)) {
for agent in additions {
self.allowed_writers.insert(*agent);
}
}
}
if let Some(ref removals) = delta.allowlist_removals {
if writer.is_some_and(|w| self.owner.as_ref().is_some_and(|o| o == w)) {
for agent in removals {
self.allowed_writers.remove(agent);
}
}
}
for (key, (entry, tag)) in &delta.added {
self.keys
.add(key.clone(), *tag)
.map_err(|e| KvError::Merge(format!("OR-Set add failed: {e}")))?;
if let Some(existing) = self.entries.get_mut(key) {
existing.merge(entry);
} else {
self.entries.insert(key.clone(), entry.clone());
}
}
for key in delta.removed.keys() {
let _ = self.keys.remove(&key.to_string());
self.entries.remove(key.as_str());
}
for (key, entry) in &delta.updated {
if let Some(existing) = self.entries.get_mut(key) {
existing.merge(entry);
} else {
self.keys
.add(key.clone(), (peer_id, 0))
.map_err(|e| KvError::Merge(format!("OR-Set add failed: {e}")))?;
self.entries.insert(key.clone(), entry.clone());
}
}
if let Some(ref new_name) = delta.name_update {
self.name.set(new_name.clone(), peer_id);
}
self.version += 1;
Ok(())
}
pub fn merge(&mut self, other: &KvStore) -> Result<()> {
if self.id != other.id {
return Err(KvError::StoreIdMismatch);
}
self.keys
.merge_state(&other.keys)
.map_err(|e| KvError::Merge(format!("OR-Set merge failed: {e}")))?;
for (key, other_entry) in &other.entries {
if let Some(our_entry) = self.entries.get_mut(key) {
our_entry.merge(other_entry);
} else {
self.entries.insert(key.clone(), other_entry.clone());
}
}
for writer in &other.allowed_writers {
self.allowed_writers.insert(*writer);
}
self.name.merge(&other.name);
self.version += 1;
Ok(())
}
#[must_use]
pub fn full_delta(&self) -> KvStoreDelta {
let mut delta = KvStoreDelta::new(self.version);
let active: HashSet<String> = self.keys.elements().into_iter().cloned().collect();
for (key, entry) in &self.entries {
if active.contains(key) {
let tag = (PeerId::new([0u8; 32]), 0);
delta.added.insert(key.clone(), (entry.clone(), tag));
}
}
delta.name_update = Some(self.name().to_string());
if !self.allowed_writers.is_empty() {
delta.allowlist_additions = Some(self.allowed_writers.iter().copied().collect());
}
delta
}
}
#[cfg(test)]
mod tests {
use super::*;
fn agent(n: u8) -> AgentId {
AgentId([n; 32])
}
fn peer(n: u8) -> PeerId {
PeerId::new([n; 32])
}
fn store_id(n: u8) -> KvStoreId {
KvStoreId::new([n; 32])
}
#[test]
fn test_new_store() {
let owner = agent(1);
let store = KvStore::new(store_id(1), "Test".to_string(), owner, AccessPolicy::Signed);
assert_eq!(store.name(), "Test");
assert_eq!(store.len(), 0);
assert!(store.is_empty());
assert_eq!(store.owner(), Some(&owner));
assert_eq!(*store.policy(), AccessPolicy::Signed);
}
#[test]
fn test_put_and_get() {
let p = peer(1);
let mut store = KvStore::new(
store_id(1),
"Test".to_string(),
agent(1),
AccessPolicy::Signed,
);
store
.put(
"key1".to_string(),
b"hello".to_vec(),
"text/plain".to_string(),
p,
)
.expect("put");
let entry = store.get("key1").expect("get");
assert_eq!(entry.value, b"hello");
assert_eq!(store.len(), 1);
}
#[test]
fn test_put_update() {
let p = peer(1);
let mut store = KvStore::new(
store_id(1),
"Test".to_string(),
agent(1),
AccessPolicy::Signed,
);
store
.put(
"key1".to_string(),
b"old".to_vec(),
"text/plain".to_string(),
p,
)
.expect("put");
store
.put(
"key1".to_string(),
b"new".to_vec(),
"text/plain".to_string(),
p,
)
.expect("put");
assert_eq!(store.get("key1").expect("get").value, b"new");
assert_eq!(store.len(), 1);
}
#[test]
fn test_remove() {
let p = peer(1);
let mut store = KvStore::new(
store_id(1),
"Test".to_string(),
agent(1),
AccessPolicy::Signed,
);
store
.put(
"key1".to_string(),
b"val".to_vec(),
"text/plain".to_string(),
p,
)
.expect("put");
store.remove("key1").expect("remove");
assert!(store.get("key1").is_none());
}
#[test]
fn test_remove_nonexistent() {
let mut store = KvStore::new(
store_id(1),
"Test".to_string(),
agent(1),
AccessPolicy::Signed,
);
assert!(store.remove("nope").is_err());
}
#[test]
fn test_value_too_large() {
let p = peer(1);
let mut store = KvStore::new(
store_id(1),
"Test".to_string(),
agent(1),
AccessPolicy::Signed,
);
let big = vec![0u8; 100_000];
let result = store.put(
"big".to_string(),
big,
"application/octet-stream".to_string(),
p,
);
assert!(result.is_err());
}
#[test]
fn test_active_keys() {
let p = peer(1);
let mut store = KvStore::new(
store_id(1),
"Test".to_string(),
agent(1),
AccessPolicy::Signed,
);
store
.put("a".to_string(), b"1".to_vec(), "text/plain".to_string(), p)
.expect("put");
store
.put("b".to_string(), b"2".to_vec(), "text/plain".to_string(), p)
.expect("put");
store
.put("c".to_string(), b"3".to_vec(), "text/plain".to_string(), p)
.expect("put");
assert_eq!(store.active_keys().len(), 3);
}
#[test]
fn test_merge_stores() {
let p1 = peer(1);
let p2 = peer(2);
let id = store_id(1);
let owner = agent(1);
let mut s1 = KvStore::new(id, "Store".to_string(), owner, AccessPolicy::Signed);
let mut s2 = KvStore::new(id, "Store".to_string(), owner, AccessPolicy::Signed);
s1.put("a".to_string(), b"1".to_vec(), "text/plain".to_string(), p1)
.expect("put");
s2.put("b".to_string(), b"2".to_vec(), "text/plain".to_string(), p2)
.expect("put");
s1.merge(&s2).expect("merge");
assert_eq!(s1.len(), 2);
}
#[test]
fn test_merge_different_ids_fails() {
let owner = agent(1);
let mut s1 = KvStore::new(store_id(1), "A".to_string(), owner, AccessPolicy::Signed);
let s2 = KvStore::new(store_id(2), "B".to_string(), owner, AccessPolicy::Signed);
assert!(s1.merge(&s2).is_err());
}
#[test]
fn test_version_increments() {
let p = peer(1);
let mut store = KvStore::new(
store_id(1),
"Test".to_string(),
agent(1),
AccessPolicy::Signed,
);
assert_eq!(store.current_version(), 0);
store
.put("k".to_string(), b"v".to_vec(), "text/plain".to_string(), p)
.expect("put");
assert_eq!(store.current_version(), 1);
store.remove("k").expect("remove");
assert_eq!(store.current_version(), 2);
}
#[test]
fn test_store_id_from_content() {
let a = agent(1);
let id1 = KvStoreId::from_content("store1", &a);
let id2 = KvStoreId::from_content("store1", &a);
let id3 = KvStoreId::from_content("store2", &a);
assert_eq!(id1, id2);
assert_ne!(id1, id3);
}
#[test]
fn test_serialization_roundtrip() {
let p = peer(1);
let mut store = KvStore::new(
store_id(1),
"Test".to_string(),
agent(1),
AccessPolicy::Signed,
);
store
.put(
"key1".to_string(),
b"val".to_vec(),
"text/plain".to_string(),
p,
)
.expect("put");
let bytes = bincode::serialize(&store).expect("serialize");
let restored: KvStore = bincode::deserialize(&bytes).expect("deserialize");
assert_eq!(store.id(), restored.id());
assert_eq!(store.name(), restored.name());
assert_eq!(store.len(), restored.len());
}
#[test]
fn test_next_seq_monotonic() {
let store = KvStore::new(
store_id(1),
"Test".to_string(),
agent(1),
AccessPolicy::Signed,
);
let s1 = store.next_seq();
let s2 = store.next_seq();
assert!(s2 > s1);
}
#[test]
fn test_signed_policy_owner_authorized() {
let owner = agent(1);
let store = KvStore::new(store_id(1), "Test".to_string(), owner, AccessPolicy::Signed);
assert!(store.is_authorized(&owner));
}
#[test]
fn test_signed_policy_non_owner_rejected() {
let owner = agent(1);
let other = agent(2);
let store = KvStore::new(store_id(1), "Test".to_string(), owner, AccessPolicy::Signed);
assert!(!store.is_authorized(&other));
}
#[test]
fn test_signed_policy_rejects_unauthorized_delta() {
let owner = agent(1);
let attacker = agent(99);
let mut store = KvStore::new(store_id(1), "Test".to_string(), owner, AccessPolicy::Signed);
let entry = KvEntry::new(
"spam".to_string(),
b"junk".to_vec(),
"text/plain".to_string(),
);
let delta = KvStoreDelta::for_put("spam".to_string(), entry, (peer(99), 1), 1);
store
.merge_delta(&delta, peer(99), Some(&attacker))
.expect("should not error");
assert!(store.get("spam").is_none(), "spam should be rejected");
}
#[test]
fn test_signed_policy_accepts_owner_delta() {
let owner = agent(1);
let mut store = KvStore::new(store_id(1), "Test".to_string(), owner, AccessPolicy::Signed);
let entry = KvEntry::new(
"legit".to_string(),
b"data".to_vec(),
"text/plain".to_string(),
);
let delta = KvStoreDelta::for_put("legit".to_string(), entry, (peer(1), 1), 1);
store
.merge_delta(&delta, peer(1), Some(&owner))
.expect("merge");
assert!(store.get("legit").is_some());
}
#[test]
fn test_allowlisted_policy() {
let owner = agent(1);
let writer = agent(2);
let outsider = agent(3);
let mut store = KvStore::new(
store_id(1),
"Team".to_string(),
owner,
AccessPolicy::Allowlisted,
);
store.allow_writer(writer, &owner).expect("allow");
assert!(store.is_authorized(&owner));
assert!(store.is_authorized(&writer));
assert!(!store.is_authorized(&outsider));
}
#[test]
fn test_allowlisted_rejects_non_owner_allowlist_change() {
let owner = agent(1);
let other = agent(2);
let mut store = KvStore::new(
store_id(1),
"Team".to_string(),
owner,
AccessPolicy::Allowlisted,
);
let result = store.allow_writer(agent(3), &other);
assert!(result.is_err());
}
#[test]
fn test_deny_writer() {
let owner = agent(1);
let writer = agent(2);
let mut store = KvStore::new(
store_id(1),
"Team".to_string(),
owner,
AccessPolicy::Allowlisted,
);
store.allow_writer(writer, &owner).expect("allow");
assert!(store.is_authorized(&writer));
store.deny_writer(&writer, &owner).expect("deny");
assert!(!store.is_authorized(&writer));
}
#[test]
fn test_allowlist_delta_propagation() {
let owner = agent(1);
let writer = agent(2);
let mut store = KvStore::new(
store_id(1),
"Team".to_string(),
owner,
AccessPolicy::Allowlisted,
);
store.allow_writer(writer, &owner).expect("allow");
let delta = store.full_delta();
assert!(delta.allowlist_additions.is_some());
assert!(delta
.allowlist_additions
.as_ref()
.is_some_and(|a| a.contains(&writer)));
}
#[test]
fn test_anonymous_delta_rejected_for_signed_store() {
let owner = agent(1);
let mut store = KvStore::new(store_id(1), "Test".to_string(), owner, AccessPolicy::Signed);
let entry = KvEntry::new(
"anon".to_string(),
b"spam".to_vec(),
"text/plain".to_string(),
);
let delta = KvStoreDelta::for_put("anon".to_string(), entry, (peer(99), 1), 1);
store
.merge_delta(&delta, peer(99), None)
.expect("silent rejection");
assert!(store.get("anon").is_none());
}
#[test]
fn test_policy_display() {
assert_eq!(format!("{}", AccessPolicy::Signed), "signed");
assert_eq!(format!("{}", AccessPolicy::Allowlisted), "allowlisted");
assert_eq!(
format!(
"{}",
AccessPolicy::Encrypted {
group_id: vec![1, 2, 3]
}
),
"encrypted"
);
}
}