use crate::gossip::PubSubManager;
use crate::kv::{KvStore, KvStoreDelta, Result};
use saorsa_gossip_crdt_sync::AntiEntropyManager;
use saorsa_gossip_types::PeerId;
use std::sync::Arc;
use tokio::sync::RwLock;
pub struct KvStoreSync {
store: Arc<RwLock<KvStore>>,
#[allow(dead_code)]
anti_entropy: AntiEntropyManager<KvStore>,
pubsub: Arc<PubSubManager>,
topic: String,
}
impl KvStoreSync {
pub fn new(
store: KvStore,
pubsub: Arc<PubSubManager>,
topic: String,
sync_interval_secs: u64,
) -> Result<Self> {
let store = Arc::new(RwLock::new(store));
let anti_entropy = AntiEntropyManager::new(Arc::clone(&store), sync_interval_secs);
Ok(Self {
store,
anti_entropy,
pubsub,
topic,
})
}
pub async fn start(&self) -> Result<()> {
let mut sub = self.pubsub.subscribe(self.topic.clone()).await;
let store = Arc::clone(&self.store);
tokio::spawn(async move {
while let Some(msg) = sub.recv().await {
let decoded = {
use bincode::Options;
bincode::options()
.with_fixint_encoding()
.with_limit(crate::network::MAX_MESSAGE_DESERIALIZE_SIZE)
.allow_trailing_bytes()
.deserialize::<(PeerId, KvStoreDelta)>(&msg.payload)
};
match decoded {
Ok((peer_id, delta)) => {
let mut s = store.write().await;
let writer = msg.sender.as_ref();
if let Err(e) = s.merge_delta(&delta, peer_id, writer) {
tracing::warn!("Failed to merge KvStore delta: {e}");
}
}
Err(e) => {
tracing::warn!("Failed to deserialize KvStore delta: {e}");
}
}
}
});
Ok(())
}
pub async fn stop(&self) -> Result<()> {
self.pubsub.unsubscribe(&self.topic).await;
Ok(())
}
pub async fn publish_delta(&self, local_peer_id: PeerId, delta: KvStoreDelta) -> Result<()> {
let serialized = bincode::serialize(&(local_peer_id, delta))
.map_err(|e| crate::kv::KvError::Gossip(format!("serialize delta failed: {e}")))?;
self.pubsub
.publish(self.topic.clone(), bytes::Bytes::from(serialized))
.await
.map_err(|e| crate::kv::KvError::Gossip(format!("publish delta failed: {e}")))?;
Ok(())
}
pub async fn read(&self) -> tokio::sync::RwLockReadGuard<'_, KvStore> {
self.store.read().await
}
pub async fn write(&self) -> tokio::sync::RwLockWriteGuard<'_, KvStore> {
self.store.write().await
}
#[must_use]
pub fn topic(&self) -> &str {
&self.topic
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::identity::AgentId;
use crate::kv::store::AccessPolicy;
use crate::kv::{KvEntry, KvStoreId};
fn agent(n: u8) -> AgentId {
AgentId([n; 32])
}
fn peer(n: u8) -> PeerId {
PeerId::new([n; 32])
}
fn store_id(n: u8) -> KvStoreId {
KvStoreId::new([n; 32])
}
#[tokio::test]
async fn test_kv_store_sync_creation() {
let owner = agent(1);
let store = KvStore::new(store_id(1), "Test".to_string(), owner, AccessPolicy::Signed);
let _store_for_sync = store;
}
#[tokio::test]
async fn test_apply_delta_directly() {
let owner = agent(1);
let writer = agent(2);
let p2 = peer(2);
let mut store = KvStore::new(
store_id(1),
"Test".to_string(),
owner,
AccessPolicy::Allowlisted,
);
store.allow_writer(writer, &owner).expect("allow");
let store_arc = Arc::new(RwLock::new(store));
let entry = KvEntry::new(
"newkey".to_string(),
b"value".to_vec(),
"text/plain".to_string(),
);
let mut delta = KvStoreDelta::new(1);
delta.added.insert("newkey".to_string(), (entry, (p2, 1)));
{
let mut s = store_arc.write().await;
s.merge_delta(&delta, p2, Some(&writer)).expect("merge");
}
{
let s = store_arc.read().await;
assert!(s.get("newkey").is_some());
}
}
#[tokio::test]
async fn test_concurrent_reads() {
let owner = agent(1);
let store = KvStore::new(store_id(1), "Test".to_string(), owner, AccessPolicy::Signed);
let store_arc = Arc::new(RwLock::new(store));
let s1 = store_arc.read().await;
let s2 = store_arc.read().await;
assert_eq!(s1.name(), "Test");
assert_eq!(s2.name(), "Test");
}
}