use anyhow::{Context, Result};
use negentropy::{Id as NegentropyId, Negentropy, NegentropyStorageVector};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use iroh::EndpointId;
pub const DOC_ID_SIZE: usize = 32;
#[derive(Debug, Clone)]
pub struct SyncItem {
pub doc_key: String,
pub timestamp: u64,
pub id: [u8; DOC_ID_SIZE],
}
impl SyncItem {
pub fn new(doc_key: String, timestamp: u64, content_hash: [u8; DOC_ID_SIZE]) -> Self {
Self {
doc_key,
timestamp,
id: content_hash,
}
}
pub fn from_doc_key(doc_key: &str, timestamp: u64) -> Self {
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
hasher.update(doc_key.as_bytes());
let hash: [u8; 32] = hasher.finalize().into();
Self {
doc_key: doc_key.to_string(),
timestamp,
id: hash,
}
}
}
#[derive(Debug, Default)]
pub struct ReconcileResult {
pub have_keys: Vec<String>,
pub need_keys: Vec<String>,
pub is_complete: bool,
pub next_message: Option<Vec<u8>>,
}
pub struct SyncSession {
pub peer_id: EndpointId,
storage: Option<NegentropyStorageVector>,
negentropy: Option<Negentropy<'static, NegentropyStorageVector>>,
id_to_key: HashMap<[u8; DOC_ID_SIZE], String>,
pub is_initiator: bool,
}
impl SyncSession {
fn new_initiator(peer_id: EndpointId, items: Vec<SyncItem>) -> Result<Self> {
let mut storage = NegentropyStorageVector::new();
let mut id_to_key = HashMap::new();
for item in items {
let neg_id =
NegentropyId::from_slice(&item.id).context("Invalid ID size for Negentropy")?;
storage.insert(item.timestamp, neg_id)?;
id_to_key.insert(item.id, item.doc_key);
}
storage.seal()?;
Ok(Self {
peer_id,
storage: Some(storage),
negentropy: None,
id_to_key,
is_initiator: true,
})
}
fn new_responder(peer_id: EndpointId, items: Vec<SyncItem>) -> Result<Self> {
let mut storage = NegentropyStorageVector::new();
let mut id_to_key = HashMap::new();
for item in items {
let neg_id =
NegentropyId::from_slice(&item.id).context("Invalid ID size for Negentropy")?;
storage.insert(item.timestamp, neg_id)?;
id_to_key.insert(item.id, item.doc_key);
}
storage.seal()?;
Ok(Self {
peer_id,
storage: Some(storage),
negentropy: None,
id_to_key,
is_initiator: false,
})
}
pub fn initiate(&mut self) -> Result<Vec<u8>> {
let storage = self.storage.take().context("Storage already consumed")?;
let mut neg =
Negentropy::owned(storage, 0).context("Failed to create Negentropy instance")?;
let init_msg = neg
.initiate()
.context("Failed to initiate Negentropy sync")?;
self.negentropy = Some(neg);
Ok(init_msg)
}
pub fn reconcile(&mut self, peer_msg: &[u8]) -> Result<ReconcileResult> {
if self.is_initiator {
self.reconcile_initiator(peer_msg)
} else {
self.reconcile_responder(peer_msg)
}
}
fn reconcile_initiator(&mut self, peer_msg: &[u8]) -> Result<ReconcileResult> {
let neg = self.negentropy.as_mut().context("Session not initiated")?;
let mut have_ids: Vec<NegentropyId> = Vec::new();
let mut need_ids: Vec<NegentropyId> = Vec::new();
let response = neg
.reconcile_with_ids(peer_msg, &mut have_ids, &mut need_ids)
.context("Failed to reconcile")?;
let have_keys: Vec<String> = have_ids
.iter()
.filter_map(|id| {
let bytes: &[u8; 32] = id.as_bytes();
self.id_to_key.get(bytes).cloned()
})
.collect();
let need_keys: Vec<String> = need_ids
.iter()
.filter_map(|id| {
let bytes: &[u8; 32] = id.as_bytes();
self.id_to_key.get(bytes).cloned()
})
.collect();
let is_complete = response.is_none();
Ok(ReconcileResult {
have_keys,
need_keys,
is_complete,
next_message: response,
})
}
fn reconcile_responder(&mut self, peer_msg: &[u8]) -> Result<ReconcileResult> {
if self.negentropy.is_none() {
let storage = self.storage.take().context("Storage already consumed")?;
let neg =
Negentropy::owned(storage, 0).context("Failed to create Negentropy instance")?;
self.negentropy = Some(neg);
}
let neg = self.negentropy.as_mut().unwrap();
let response = neg.reconcile(peer_msg).context("Failed to reconcile")?;
Ok(ReconcileResult {
have_keys: Vec::new(),
need_keys: Vec::new(),
is_complete: false,
next_message: Some(response),
})
}
}
pub struct NegentropySync {
sessions: Arc<RwLock<HashMap<EndpointId, SyncSession>>>,
stats: Arc<RwLock<NegentropyStats>>,
}
#[derive(Debug, Default, Clone)]
pub struct NegentropyStats {
pub sessions_initiated: u64,
pub sessions_completed: u64,
pub docs_have: u64,
pub docs_need: u64,
pub bytes_exchanged: u64,
pub round_trips: u64,
}
impl NegentropySync {
pub fn new() -> Self {
Self {
sessions: Arc::new(RwLock::new(HashMap::new())),
stats: Arc::new(RwLock::new(NegentropyStats::default())),
}
}
pub fn initiate_sync(
&self,
peer_id: EndpointId,
local_items: Vec<SyncItem>,
) -> Result<Vec<u8>> {
let mut session = SyncSession::new_initiator(peer_id, local_items)?;
let init_msg = session.initiate()?;
{
let mut sessions = self.sessions.write().unwrap_or_else(|e| e.into_inner());
sessions.insert(peer_id, session);
}
{
let mut stats = self.stats.write().unwrap_or_else(|e| e.into_inner());
stats.sessions_initiated += 1;
stats.bytes_exchanged += init_msg.len() as u64;
}
tracing::debug!(
"Initiated Negentropy sync with peer {:?}, msg_len={}",
peer_id,
init_msg.len()
);
Ok(init_msg)
}
pub fn handle_message(
&self,
peer_id: EndpointId,
message: &[u8],
local_items: Vec<SyncItem>,
) -> Result<ReconcileResult> {
let mut sessions = self.sessions.write().unwrap_or_else(|e| e.into_inner());
let session = if let Some(existing) = sessions.get_mut(&peer_id) {
existing
} else {
let session = SyncSession::new_responder(peer_id, local_items)?;
sessions.insert(peer_id, session);
sessions.get_mut(&peer_id).unwrap()
};
let result = session.reconcile(message)?;
{
let mut stats = self.stats.write().unwrap_or_else(|e| e.into_inner());
stats.bytes_exchanged += message.len() as u64;
stats.round_trips += 1;
if let Some(next) = &result.next_message {
stats.bytes_exchanged += next.len() as u64;
}
stats.docs_have += result.have_keys.len() as u64;
stats.docs_need += result.need_keys.len() as u64;
if result.is_complete {
stats.sessions_completed += 1;
}
}
if result.is_complete {
sessions.remove(&peer_id);
tracing::debug!(
"Negentropy sync complete with {:?}: have={}, need={}",
peer_id,
result.have_keys.len(),
result.need_keys.len()
);
}
Ok(result)
}
pub fn stats(&self) -> NegentropyStats {
self.stats.read().unwrap_or_else(|e| e.into_inner()).clone()
}
pub fn has_session(&self, peer_id: &EndpointId) -> bool {
self.sessions
.read()
.unwrap_or_else(|e| e.into_inner())
.contains_key(peer_id)
}
pub fn cancel_session(&self, peer_id: &EndpointId) {
self.sessions
.write()
.unwrap_or_else(|e| e.into_inner())
.remove(peer_id);
}
}
impl Default for NegentropySync {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_test_items(keys: &[&str], base_timestamp: u64) -> Vec<SyncItem> {
keys.iter()
.enumerate()
.map(|(i, key)| SyncItem::from_doc_key(key, base_timestamp + i as u64))
.collect()
}
fn test_peer_id(seed: u8) -> EndpointId {
use iroh::SecretKey;
let mut key_bytes = [0u8; 32];
key_bytes[0] = seed;
let secret = SecretKey::from_bytes(&key_bytes);
secret.public()
}
#[test]
fn test_sync_item_from_doc_key() {
let item1 = SyncItem::from_doc_key("nodes::node-1", 1000);
let item2 = SyncItem::from_doc_key("nodes::node-1", 1000);
let item3 = SyncItem::from_doc_key("nodes::node-2", 1000);
assert_eq!(item1.id, item2.id);
assert_ne!(item1.id, item3.id);
}
#[test]
fn test_identical_sets_no_differences() {
let peer_a = test_peer_id(1);
let peer_b = test_peer_id(2);
let items = make_test_items(&["doc-1", "doc-2", "doc-3"], 1000);
let sync_a = NegentropySync::new();
let sync_b = NegentropySync::new();
let msg1 = sync_a.initiate_sync(peer_b, items.clone()).unwrap();
let result_b = sync_b.handle_message(peer_a, &msg1, items.clone()).unwrap();
assert!(result_b.next_message.is_some());
let result_a = sync_a
.handle_message(peer_b, &result_b.next_message.unwrap(), items)
.unwrap();
assert!(result_a.is_complete);
assert!(result_a.have_keys.is_empty());
assert!(result_a.need_keys.is_empty());
}
#[test]
fn test_different_sets_finds_differences() {
let peer_a = test_peer_id(1);
let peer_b = test_peer_id(2);
let items_a = make_test_items(&["doc-1", "doc-2"], 1000);
let items_b = make_test_items(&["doc-2", "doc-3"], 1000);
let sync_a = NegentropySync::new();
let sync_b = NegentropySync::new();
let msg1 = sync_a.initiate_sync(peer_b, items_a.clone()).unwrap();
let result_b = sync_b
.handle_message(peer_a, &msg1, items_b.clone())
.unwrap();
let mut current_msg = result_b.next_message;
let mut final_result = None;
while let Some(msg) = current_msg {
let result = sync_a
.handle_message(peer_b, &msg, items_a.clone())
.unwrap();
if result.is_complete {
final_result = Some(result);
break;
}
if let Some(next) = result.next_message {
let resp = sync_b
.handle_message(peer_a, &next, items_b.clone())
.unwrap();
current_msg = resp.next_message;
} else {
break;
}
}
if let Some(result) = final_result {
assert!(result.is_complete);
}
}
#[test]
fn test_stats_tracking() {
let peer_b = test_peer_id(2);
let items = make_test_items(&["doc-1"], 1000);
let sync = NegentropySync::new();
let _msg = sync.initiate_sync(peer_b, items).unwrap();
let stats = sync.stats();
assert_eq!(stats.sessions_initiated, 1);
assert!(stats.bytes_exchanged > 0);
}
}