use std::collections::{HashMap, HashSet};
use std::fs;
use std::path::PathBuf;
use super::merger::{self, MergeStats};
use super::store::HiveStore;
use super::{KnowledgeUnit, epoch_secs};
use crate::relay::{MessageType, PeerId, RelayMessage, epoch_ms, gen_msg_id};
const MAX_SNAPSHOT_SIZE: usize = 500 * 1024;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct PeerSyncState {
pub peer_id: String,
pub last_sync_epoch: u64,
pub units_sent: HashSet<String>,
}
pub struct GossipEngine {
sync_states: HashMap<String, PeerSyncState>,
max_propagation: u32,
knowledge_ttl_days: u32,
local_peer_id: String,
sharing_filter: super::SharingFilter,
share_mode: super::exposure::ShareMode,
}
impl GossipEngine {
pub fn new(local_peer_id: &str, max_propagation: u32, knowledge_ttl_days: u32) -> Self {
let sync_states = load_sync_states();
GossipEngine {
sync_states,
max_propagation,
knowledge_ttl_days,
local_peer_id: local_peer_id.to_string(),
sharing_filter: super::SharingFilter::default(),
share_mode: super::exposure::ShareMode::Auto,
}
}
pub fn set_sharing_filter(&mut self, filter: super::SharingFilter) {
self.sharing_filter = filter;
}
pub fn set_share_mode(&mut self, mode: super::exposure::ShareMode) {
self.share_mode = mode;
}
#[cfg(test)]
pub fn new_empty(local_peer_id: &str, max_propagation: u32, knowledge_ttl_days: u32) -> Self {
GossipEngine {
sync_states: HashMap::new(),
max_propagation,
knowledge_ttl_days,
local_peer_id: local_peer_id.to_string(),
sharing_filter: super::SharingFilter::default(),
share_mode: super::exposure::ShareMode::Auto,
}
}
pub fn generate_sync_messages(
&mut self,
store: &HiveStore,
connected_peers: &[PeerId],
) -> Vec<(PeerId, RelayMessage)> {
let mut messages = Vec::new();
let now = epoch_secs();
let max_prop = self.max_propagation;
let ttl_secs = self.knowledge_ttl_days as u64 * 86400;
let identity = self.local_peer_id.clone();
let filter = self.sharing_filter.clone();
let exposure = super::exposure::ExposureStore::load();
let share_mode = self.share_mode;
let trust = super::trust::TrustStore::load();
for peer in connected_peers {
let peer_id = peer.0.clone();
let target_rank = peer_consent_rank(&trust, &peer_id);
let sync_state =
self.sync_states
.entry(peer_id.clone())
.or_insert_with(|| PeerSyncState {
peer_id: peer_id.clone(),
last_sync_epoch: 0,
units_sent: HashSet::new(),
});
let unsent: Vec<&KnowledgeUnit> = store
.all_units()
.into_iter()
.filter(|u| {
!sync_state.units_sent.contains(&u.id)
&& u.source_peer != peer_id && is_propagatable_static(u, max_prop, ttl_secs, &filter)
&& is_exposed_for_peer(u, &identity, &exposure, share_mode)
&& consent_allows(u, &peer_id, target_rank, now)
})
.collect();
if unsent.is_empty() {
continue;
}
let units: Vec<KnowledgeUnit> = unsent.into_iter().cloned().collect();
let msg = build_sync_message(&units, &identity, now);
for unit in &units {
sync_state.units_sent.insert(unit.id.clone());
}
sync_state.last_sync_epoch = now;
messages.push((peer.clone(), msg));
}
let _ = save_sync_states(&self.sync_states);
messages
}
pub fn handle_sync(
&mut self,
store: &mut HiveStore,
msg: &RelayMessage,
) -> (MergeStats, Vec<KnowledgeUnit>) {
let units = parse_units_from_payload(msg);
sybil_check(store, &msg.from_peer, &units, &self.local_peer_id);
let stats = merger::merge_batch(store, &units, &self.local_peer_id);
let accepted: Vec<KnowledgeUnit> = units
.into_iter()
.filter(|u| store.get(&u.id).is_some() && self.is_propagatable(u))
.collect();
let _ = store.save();
(stats, accepted)
}
pub fn handle_request(&self, store: &HiveStore, msg: &RelayMessage) -> Vec<RelayMessage> {
let since_epoch = msg
.payload
.get("since_epoch")
.and_then(|v| v.as_u64())
.unwrap_or(0);
let units: Vec<&KnowledgeUnit> = if since_epoch == 0 {
store.all_units()
} else {
store.units_since(since_epoch)
};
let mut pages = Vec::new();
let mut current_page: Vec<KnowledgeUnit> = Vec::new();
let mut current_size: usize = 0;
for unit in units {
let unit_json = serde_json::to_string(unit).unwrap_or_default();
let unit_size = unit_json.len();
if current_size + unit_size > MAX_SNAPSHOT_SIZE && !current_page.is_empty() {
pages.push(std::mem::take(&mut current_page));
current_size = 0;
}
current_page.push(unit.clone());
current_size += unit_size;
}
if !current_page.is_empty() {
pages.push(current_page);
}
let total_pages = pages.len();
pages
.into_iter()
.enumerate()
.map(|(i, units)| {
build_snapshot_message(&units, &self.local_peer_id, i + 1, total_pages)
})
.collect()
}
pub fn handle_snapshot(
&mut self,
store: &mut HiveStore,
msg: &RelayMessage,
) -> (MergeStats, Vec<KnowledgeUnit>) {
let units = parse_units_from_payload(msg);
sybil_check(store, &msg.from_peer, &units, &self.local_peer_id);
let stats = merger::merge_batch(store, &units, &self.local_peer_id);
let merged: Vec<KnowledgeUnit> = units
.into_iter()
.filter(|u| store.get(&u.id).is_some())
.collect();
let _ = store.save();
(stats, merged)
}
pub fn build_request_message(&self, since_epoch: u64) -> RelayMessage {
RelayMessage {
id: gen_msg_id(),
msg_type: MessageType::KnowledgeRequest,
from_peer: self.local_peer_id.clone(),
timestamp: epoch_ms(),
payload: serde_json::json!({
"since_epoch": since_epoch,
}),
}
}
pub fn propagate(
&mut self,
accepted_units: &[KnowledgeUnit],
source_peer: &PeerId,
connected_peers: &[PeerId],
) -> Vec<(PeerId, RelayMessage)> {
let now = epoch_secs();
let mut messages = Vec::new();
let target_peers: Vec<&PeerId> = connected_peers
.iter()
.filter(|p| p.0 != source_peer.0)
.collect();
if target_peers.is_empty() {
return messages;
}
let propagatable: Vec<&KnowledgeUnit> = accepted_units
.iter()
.filter(|u| self.is_propagatable(u))
.collect();
if propagatable.is_empty() {
return messages;
}
let trust = super::trust::TrustStore::load();
for peer in target_peers {
let peer_id_str = peer.0.clone();
let target_rank = peer_consent_rank(&trust, &peer_id_str);
let sync_state =
self.sync_states
.entry(peer.0.clone())
.or_insert_with(|| PeerSyncState {
peer_id: peer.0.clone(),
last_sync_epoch: 0,
units_sent: HashSet::new(),
});
let unsent: Vec<KnowledgeUnit> = propagatable
.iter()
.filter(|u| !sync_state.units_sent.contains(&u.id))
.filter(|u| consent_allows(u, &peer_id_str, target_rank, now))
.map(|u| (*u).clone())
.collect();
if unsent.is_empty() {
continue;
}
let msg = build_sync_message(&unsent, &self.local_peer_id, now);
for unit in &unsent {
sync_state.units_sent.insert(unit.id.clone());
}
messages.push((peer.clone(), msg));
}
let _ = save_sync_states(&self.sync_states);
messages
}
fn is_propagatable(&self, unit: &KnowledgeUnit) -> bool {
let ttl_secs = self.knowledge_ttl_days as u64 * 86400;
is_propagatable_static(unit, self.max_propagation, ttl_secs, &self.sharing_filter)
}
pub fn get_sync_state(&self, peer_id: &str) -> Option<&PeerSyncState> {
self.sync_states.get(peer_id)
}
pub fn all_sync_states(&self) -> &HashMap<String, PeerSyncState> {
&self.sync_states
}
}
pub fn is_exposed_for_peer(
unit: &KnowledgeUnit,
local_peer_id: &str,
exposure: &super::exposure::ExposureStore,
mode: super::exposure::ShareMode,
) -> bool {
if unit.source_peer != local_peer_id {
return true;
}
exposure.is_exposed(&unit.id, mode)
}
fn is_propagatable_static(
unit: &KnowledgeUnit,
max_propagation: u32,
ttl_secs: u64,
filter: &super::SharingFilter,
) -> bool {
if !unit.category.is_shareable() {
return false;
}
if !filter.allows(unit) {
return false;
}
if unit.propagation_count >= max_propagation {
return false;
}
let age = epoch_secs().saturating_sub(unit.last_validated_at);
if age > ttl_secs {
return false;
}
true
}
fn build_sync_message(units: &[KnowledgeUnit], identity: &str, sync_epoch: u64) -> RelayMessage {
RelayMessage {
id: gen_msg_id(),
msg_type: MessageType::KnowledgeSync,
from_peer: identity.to_string(),
timestamp: epoch_ms(),
payload: serde_json::json!({
"units": units,
"sync_epoch": sync_epoch,
}),
}
}
fn build_snapshot_message(
units: &[KnowledgeUnit],
identity: &str,
page: usize,
total_pages: usize,
) -> RelayMessage {
RelayMessage {
id: gen_msg_id(),
msg_type: MessageType::KnowledgeSnapshot,
from_peer: identity.to_string(),
timestamp: epoch_ms(),
payload: serde_json::json!({
"units": units,
"page": page,
"total_pages": total_pages,
}),
}
}
fn parse_units_from_payload(msg: &RelayMessage) -> Vec<KnowledgeUnit> {
msg.payload
.get("units")
.and_then(|v| serde_json::from_value::<Vec<KnowledgeUnit>>(v.clone()).ok())
.unwrap_or_default()
}
fn peer_consent_rank(trust: &super::trust::TrustStore, peer_id: &str) -> u8 {
trust
.get(peer_id)
.map(|p| match p.tier() {
super::trust::TrustTier::Confirmed => 4,
super::trust::TrustTier::Suggested => 3,
super::trust::TrustTier::Unverified => 2,
super::trust::TrustTier::Ignored => 1,
})
.unwrap_or(3)
}
fn consent_allows(unit: &KnowledgeUnit, target_peer: &str, target_rank: u8, now: u64) -> bool {
match &unit.sharing_consent {
None => true,
Some(c) => c.allows(target_peer, target_rank, now),
}
}
fn sybil_check(store: &HiveStore, from_peer: &str, units: &[KnowledgeUnit], local_peer_id: &str) {
if units.is_empty() {
return;
}
let mut trust = super::trust::TrustStore::load();
if trust.record_received(from_peer, units.len() as u32) {
let now = epoch_secs();
let received = trust
.get(from_peer)
.map(|p| p.received_today(now))
.unwrap_or(0);
trust.freeze(
from_peer,
&format!("rate cap exceeded: {received} units in 24h"),
);
}
let collisions = super::trust::detect_collisions(store, units);
if !collisions.is_empty() {
let _ = super::trust::apply_collisions(&mut trust, local_peer_id, &collisions);
}
let _ = trust.save();
}
fn sync_state_path() -> PathBuf {
let home = std::env::var("HOME").unwrap_or_else(|_| "/tmp".into());
PathBuf::from(home)
.join(".claudectl")
.join("hive")
.join("sync_state.json")
}
fn load_sync_states() -> HashMap<String, PeerSyncState> {
let path = sync_state_path();
let content = match fs::read_to_string(&path) {
Ok(c) => c,
Err(_) => return HashMap::new(),
};
serde_json::from_str(&content).unwrap_or_default()
}
fn save_sync_states(states: &HashMap<String, PeerSyncState>) -> Result<(), String> {
let path = sync_state_path();
if let Some(parent) = path.parent() {
fs::create_dir_all(parent).map_err(|e| format!("create dir: {e}"))?;
}
let json =
serde_json::to_string_pretty(states).map_err(|e| format!("serialize sync state: {e}"))?;
fs::write(&path, json).map_err(|e| format!("write sync state: {e}"))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::hive::{KnowledgeContent, KnowledgeScope};
fn make_unit(id: &str, tool: &str, peer: &str) -> KnowledgeUnit {
KnowledgeUnit {
id: id.into(),
scope: KnowledgeScope::Universal,
category: crate::hive::KnowledgeCategory::BestPractice,
content: KnowledgeContent::Pattern {
tool: tool.into(),
command_pattern: Some("test".into()),
preferred_action: "approve".into(),
accept_rate: 0.9,
sample_count: 10,
conditions: vec![],
},
evidence_count: 10,
confidence: 0.9,
source_peer: peer.into(),
originated_at: epoch_secs(),
last_validated_at: epoch_secs(),
propagation_count: 0,
version: 1,
revalidation_interval_secs: 0,
injection_state: crate::hive::InjectionState::Live,
injection_stats: crate::hive::InjectionStats {
injected_count: 0,
accepted_count: 0,
overridden_count: 0,
last_injected_at: 0,
last_outcome_at: 0,
},
sharing_consent: None,
}
}
fn empty_store() -> HiveStore {
HiveStore::load_from(std::path::Path::new("/nonexistent"))
}
#[test]
fn generate_sync_only_unsent() {
let mut store = empty_store();
store.insert(make_unit("ku_1", "Bash", "local"));
store.insert(make_unit("ku_2", "Read", "local"));
let mut engine = GossipEngine::new_empty("local", 5, 30);
let peers = vec![PeerId("peer-a".into())];
let msgs = engine.generate_sync_messages(&store, &peers);
assert_eq!(msgs.len(), 1);
let units = parse_units_from_payload(&msgs[0].1);
assert_eq!(units.len(), 2);
let msgs = engine.generate_sync_messages(&store, &peers);
assert_eq!(msgs.len(), 0);
store.insert(make_unit("ku_3", "Write", "local"));
let msgs = engine.generate_sync_messages(&store, &peers);
assert_eq!(msgs.len(), 1);
let units = parse_units_from_payload(&msgs[0].1);
assert_eq!(units.len(), 1);
assert_eq!(units[0].id, "ku_3");
}
#[test]
fn is_exposed_local_units_respect_mode() {
let local_unit = make_unit("ku_local", "Bash", "local");
let peer_unit = make_unit("ku_peer", "Bash", "peer-a");
let exposure = crate::hive::exposure::ExposureStore::default();
assert!(is_exposed_for_peer(
&local_unit,
"local",
&exposure,
crate::hive::exposure::ShareMode::Auto
));
assert!(is_exposed_for_peer(
&peer_unit,
"local",
&exposure,
crate::hive::exposure::ShareMode::Manual
));
assert!(!is_exposed_for_peer(
&local_unit,
"local",
&exposure,
crate::hive::exposure::ShareMode::Manual
));
let mut exposure_explicit = exposure;
exposure_explicit.set("ku_local", crate::hive::exposure::ExposureState::Expose);
assert!(is_exposed_for_peer(
&local_unit,
"local",
&exposure_explicit,
crate::hive::exposure::ShareMode::Manual
));
}
#[test]
fn dont_echo_back_to_source() {
let mut store = empty_store();
store.insert(make_unit("ku_1", "Bash", "peer-a"));
let mut engine = GossipEngine::new_empty("local", 5, 30);
let peers = vec![PeerId("peer-a".into())];
let msgs = engine.generate_sync_messages(&store, &peers);
assert_eq!(msgs.len(), 0);
}
#[test]
fn handle_sync_merges_units() {
let mut store = empty_store();
let mut engine = GossipEngine::new_empty("local", 5, 30);
let incoming_units = vec![
make_unit("ku_r1", "Bash", "peer-a"),
make_unit("ku_r2", "Read", "peer-a"),
];
let msg = build_sync_message(&incoming_units, "peer-a", epoch_secs());
let (stats, accepted) = engine.handle_sync(&mut store, &msg);
assert_eq!(stats.accepted, 2);
assert_eq!(accepted.len(), 2);
assert_eq!(store.len(), 2);
}
#[test]
fn handle_request_returns_snapshot() {
let mut store = empty_store();
store.insert(make_unit("ku_1", "Bash", "local"));
store.insert(make_unit("ku_2", "Read", "local"));
let engine = GossipEngine::new_empty("local", 5, 30);
let request = RelayMessage {
id: "req_1".into(),
msg_type: MessageType::KnowledgeRequest,
from_peer: "peer-a".into(),
timestamp: 0,
payload: serde_json::json!({ "since_epoch": 0 }),
};
let snapshots = engine.handle_request(&store, &request);
assert!(!snapshots.is_empty());
let total_units: usize = snapshots
.iter()
.map(|s| parse_units_from_payload(s).len())
.sum();
assert_eq!(total_units, 2);
}
#[test]
fn handle_snapshot_merges() {
let mut store = empty_store();
let mut engine = GossipEngine::new_empty("local", 5, 30);
let units = vec![make_unit("ku_1", "Bash", "peer-a")];
let msg = build_snapshot_message(&units, "peer-a", 1, 1);
let (stats, merged) = engine.handle_snapshot(&mut store, &msg);
assert_eq!(stats.accepted, 1);
assert_eq!(merged.len(), 1);
assert_eq!(store.len(), 1);
}
#[test]
fn propagation_excludes_source() {
let mut engine = GossipEngine::new_empty("local", 5, 30);
let units = vec![make_unit("ku_1", "Bash", "peer-a")];
let source = PeerId("peer-a".into());
let connected = vec![PeerId("peer-a".into()), PeerId("peer-b".into())];
let msgs = engine.propagate(&units, &source, &connected);
assert_eq!(msgs.len(), 1);
assert_eq!(msgs[0].0.0, "peer-b");
}
#[test]
fn propagation_respects_max_hops() {
let mut engine = GossipEngine::new_empty("local", 3, 30);
let mut unit = make_unit("ku_1", "Bash", "peer-a");
unit.propagation_count = 3;
let source = PeerId("peer-a".into());
let connected = vec![PeerId("peer-b".into())];
let msgs = engine.propagate(&[unit], &source, &connected);
assert_eq!(msgs.len(), 0); }
#[test]
fn expired_knowledge_not_propagated() {
let mut engine = GossipEngine::new_empty("local", 5, 30);
let mut unit = make_unit("ku_1", "Bash", "peer-a");
unit.last_validated_at = epoch_secs().saturating_sub(60 * 86400);
let source = PeerId("peer-a".into());
let connected = vec![PeerId("peer-b".into())];
let msgs = engine.propagate(&[unit], &source, &connected);
assert_eq!(msgs.len(), 0);
}
#[test]
fn build_request_message_fields() {
let engine = GossipEngine::new_empty("local", 5, 30);
let msg = engine.build_request_message(1000);
assert_eq!(msg.msg_type, MessageType::KnowledgeRequest);
assert_eq!(
msg.payload.get("since_epoch").and_then(|v| v.as_u64()),
Some(1000)
);
}
#[test]
fn snapshot_pagination() {
let mut store = empty_store();
for i in 0..2000 {
let unit = make_unit(&format!("ku_pag_{i}"), &format!("Tool_{i}_abcdef"), "local");
store.insert(unit);
}
let engine = GossipEngine::new_empty("local", 5, 30);
let request = RelayMessage {
id: "req_1".into(),
msg_type: MessageType::KnowledgeRequest,
from_peer: "peer-a".into(),
timestamp: 0,
payload: serde_json::json!({ "since_epoch": 0 }),
};
let snapshots = engine.handle_request(&store, &request);
assert!(snapshots.len() > 1);
for (i, snap) in snapshots.iter().enumerate() {
let page = snap.payload.get("page").and_then(|v| v.as_u64()).unwrap();
let total = snap
.payload
.get("total_pages")
.and_then(|v| v.as_u64())
.unwrap();
assert_eq!(page, (i + 1) as u64);
assert_eq!(total, snapshots.len() as u64);
}
let total_units: usize = snapshots
.iter()
.map(|s| parse_units_from_payload(s).len())
.sum();
assert_eq!(total_units, 2000);
}
}