use std::collections::HashMap;
use std::fs::{self, OpenOptions};
use std::io::Write;
use std::path::PathBuf;
use super::{KnowledgeUnit, effective_confidence, epoch_secs, semantic_key};
fn hive_dir() -> PathBuf {
let home = std::env::var("HOME").unwrap_or_else(|_| "/tmp".into());
PathBuf::from(home).join(".claudectl").join("hive")
}
fn knowledge_path() -> PathBuf {
hive_dir().join("knowledge.jsonl")
}
fn conflicts_path() -> PathBuf {
hive_dir().join("conflicts.jsonl")
}
pub struct HiveStore {
units: HashMap<String, KnowledgeUnit>,
semantic_index: HashMap<String, String>,
}
impl HiveStore {
pub fn load() -> Self {
let path = knowledge_path();
let mut store = HiveStore {
units: HashMap::new(),
semantic_index: HashMap::new(),
};
if let Ok(content) = fs::read_to_string(&path) {
for line in content.lines() {
if let Ok(unit) = serde_json::from_str::<KnowledgeUnit>(line) {
let sk = semantic_key(&unit);
store.semantic_index.insert(sk, unit.id.clone());
store.units.insert(unit.id.clone(), unit);
}
}
}
store
}
pub fn load_from(path: &std::path::Path) -> Self {
let mut store = HiveStore {
units: HashMap::new(),
semantic_index: HashMap::new(),
};
if let Ok(content) = fs::read_to_string(path) {
for line in content.lines() {
if let Ok(unit) = serde_json::from_str::<KnowledgeUnit>(line) {
let sk = semantic_key(&unit);
store.semantic_index.insert(sk, unit.id.clone());
store.units.insert(unit.id.clone(), unit);
}
}
}
store
}
pub fn insert(&mut self, unit: KnowledgeUnit) -> bool {
let sk = semantic_key(&unit);
let is_new = !self.units.contains_key(&unit.id);
if let Some(old_id) = self.semantic_index.get(&sk) {
if *old_id != unit.id {
self.units.remove(&old_id.clone());
}
}
self.semantic_index.insert(sk, unit.id.clone());
self.units.insert(unit.id.clone(), unit);
is_new
}
pub fn get(&self, id: &str) -> Option<&KnowledgeUnit> {
self.units.get(id)
}
pub fn find_by_semantic_key(&self, key: &str) -> Option<&KnowledgeUnit> {
self.semantic_index
.get(key)
.and_then(|id| self.units.get(id))
}
pub fn semantic_key_for(&self, unit: &KnowledgeUnit) -> String {
semantic_key(unit)
}
pub fn all_units(&self) -> Vec<&KnowledgeUnit> {
self.units.values().collect()
}
pub fn units_since(&self, epoch: u64) -> Vec<&KnowledgeUnit> {
self.units
.values()
.filter(|u| u.last_validated_at >= epoch)
.collect()
}
pub fn by_scope(&self, scope: &super::KnowledgeScope) -> Vec<&KnowledgeUnit> {
self.units.values().filter(|u| &u.scope == scope).collect()
}
pub fn by_source(&self, peer: &str) -> Vec<&KnowledgeUnit> {
self.units
.values()
.filter(|u| u.source_peer == peer)
.collect()
}
pub fn remove(&mut self, id: &str) -> bool {
if let Some(unit) = self.units.remove(id) {
let sk = semantic_key(&unit);
self.semantic_index.remove(&sk);
true
} else {
false
}
}
pub fn len(&self) -> usize {
self.units.len()
}
pub fn is_empty(&self) -> bool {
self.units.is_empty()
}
pub fn compact(
&mut self,
ttl_days: u32,
max_units: usize,
stale_peer_days: u32,
trust_store: Option<&super::trust::TrustStore>,
) -> Vec<super::KnowledgeUnit> {
let now = super::epoch_secs();
let ttl_secs = ttl_days as u64 * 86400;
let stale_secs = stale_peer_days as u64 * 86400;
let mut evicted = Vec::new();
let expired_ids: Vec<String> = self
.units
.values()
.filter(|u| now.saturating_sub(u.last_validated_at) > ttl_secs)
.map(|u| u.id.clone())
.collect();
for id in &expired_ids {
if let Some(unit) = self.units.get(id).cloned() {
evicted.push(unit);
}
self.remove(id);
}
if let Some(ts) = trust_store {
let stale_peers: Vec<String> = self
.units
.values()
.map(|u| u.source_peer.clone())
.collect::<std::collections::HashSet<_>>()
.into_iter()
.filter(|peer| {
ts.get(peer)
.is_some_and(|t| now.saturating_sub(t.last_sync) > stale_secs)
})
.collect();
for peer in &stale_peers {
let ids: Vec<String> = self
.units
.values()
.filter(|u| &u.source_peer == peer)
.map(|u| u.id.clone())
.collect();
for id in &ids {
if let Some(unit) = self.units.get(id).cloned() {
evicted.push(unit);
}
self.remove(id);
}
}
}
if max_units > 0 && self.units.len() > max_units {
let now = epoch_secs();
let mut scored: Vec<(String, f64)> = self
.units
.values()
.map(|u| {
(
u.id.clone(),
effective_confidence(u, now) * u.evidence_count as f64,
)
})
.collect();
scored.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
let to_evict = self.units.len() - max_units;
for (id, _) in scored.into_iter().take(to_evict) {
if let Some(unit) = self.units.get(&id).cloned() {
evicted.push(unit);
}
self.remove(&id);
}
}
evicted
}
pub fn save(&self) -> std::io::Result<()> {
let dir = hive_dir();
fs::create_dir_all(&dir)?;
let path = knowledge_path();
let tmp_path = path.with_extension("jsonl.tmp");
let lines: Vec<String> = self
.units
.values()
.filter_map(|u| serde_json::to_string(u).ok())
.collect();
fs::write(&tmp_path, lines.join("\n") + "\n")?;
fs::rename(&tmp_path, &path)
}
pub fn save_to(&self, path: &std::path::Path) -> std::io::Result<()> {
let lines: Vec<String> = self
.units
.values()
.filter_map(|u| serde_json::to_string(u).ok())
.collect();
fs::write(path, lines.join("\n") + "\n")
}
pub fn append(&self, unit: &KnowledgeUnit) -> std::io::Result<()> {
let dir = hive_dir();
fs::create_dir_all(&dir)?;
let path = knowledge_path();
let mut file = OpenOptions::new().create(true).append(true).open(&path)?;
let json = serde_json::to_string(unit)
.map_err(|e| std::io::Error::other(format!("serialize: {e}")))?;
writeln!(file, "{json}")
}
pub fn export_json(&self) -> String {
let units: Vec<&KnowledgeUnit> = self.units.values().collect();
serde_json::to_string_pretty(&units).unwrap_or_else(|_| "[]".into())
}
pub fn import_json(&mut self, json: &str) -> Result<u32, String> {
let units: Vec<KnowledgeUnit> =
serde_json::from_str(json).map_err(|e| format!("parse error: {e}"))?;
let mut count = 0;
for unit in units {
if self.insert(unit) {
count += 1;
}
}
Ok(count)
}
}
pub fn log_conflict(local: &KnowledgeUnit, incoming: &KnowledgeUnit) {
let dir = hive_dir();
let _ = fs::create_dir_all(&dir);
let path = conflicts_path();
let record = serde_json::json!({
"ts": super::epoch_secs(),
"local_id": local.id,
"local_peer": local.source_peer,
"local_confidence": local.confidence,
"incoming_id": incoming.id,
"incoming_peer": incoming.source_peer,
"incoming_confidence": incoming.confidence,
"semantic_key": semantic_key(local),
});
if let Ok(mut file) = OpenOptions::new().create(true).append(true).open(&path) {
let _ = writeln!(
file,
"{}",
serde_json::to_string(&record).unwrap_or_default()
);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::hive::{KnowledgeContent, KnowledgeScope};
fn make_unit(id: &str, tool: &str, peer: &str) -> KnowledgeUnit {
KnowledgeUnit {
id: id.into(),
scope: KnowledgeScope::Universal,
category: crate::hive::KnowledgeCategory::BestPractice,
content: KnowledgeContent::Pattern {
tool: tool.into(),
command_pattern: Some("test".into()),
preferred_action: "approve".into(),
accept_rate: 0.9,
sample_count: 10,
conditions: vec![],
},
evidence_count: 10,
confidence: 0.9,
source_peer: peer.into(),
originated_at: 1000,
last_validated_at: 2000,
propagation_count: 0,
version: 1,
revalidation_interval_secs: 0,
injection_state: crate::hive::InjectionState::Live,
injection_stats: crate::hive::InjectionStats {
injected_count: 0,
accepted_count: 0,
overridden_count: 0,
last_injected_at: 0,
last_outcome_at: 0,
},
sharing_consent: None,
}
}
#[test]
fn insert_and_get() {
let mut store = HiveStore {
units: HashMap::new(),
semantic_index: HashMap::new(),
};
let unit = make_unit("ku_1", "Bash", "peer-a");
assert!(store.insert(unit));
assert_eq!(store.len(), 1);
assert!(store.get("ku_1").is_some());
}
#[test]
fn find_by_semantic_key() {
let mut store = HiveStore {
units: HashMap::new(),
semantic_index: HashMap::new(),
};
let unit = make_unit("ku_1", "Bash", "peer-a");
store.insert(unit);
let found = store.find_by_semantic_key("universal/pattern:Bash:test");
assert!(found.is_some());
assert_eq!(found.unwrap().id, "ku_1");
assert!(
store
.find_by_semantic_key("universal/pattern:Read:test")
.is_none()
);
}
#[test]
fn remove_unit() {
let mut store = HiveStore {
units: HashMap::new(),
semantic_index: HashMap::new(),
};
store.insert(make_unit("ku_1", "Bash", "peer-a"));
assert_eq!(store.len(), 1);
assert!(store.remove("ku_1"));
assert_eq!(store.len(), 0);
assert!(store.get("ku_1").is_none());
assert!(
store
.find_by_semantic_key("universal/pattern:Bash:test")
.is_none()
);
assert!(!store.remove("nonexistent"));
}
#[test]
fn duplicate_insert_not_new() {
let mut store = HiveStore {
units: HashMap::new(),
semantic_index: HashMap::new(),
};
let unit = make_unit("ku_1", "Bash", "peer-a");
assert!(store.insert(unit.clone()));
assert!(!store.insert(unit)); }
#[test]
fn by_source_filters() {
let mut store = HiveStore {
units: HashMap::new(),
semantic_index: HashMap::new(),
};
store.insert(make_unit("ku_1", "Bash", "peer-a"));
store.insert(make_unit("ku_2", "Read", "peer-b"));
store.insert(make_unit("ku_3", "Write", "peer-a"));
assert_eq!(store.by_source("peer-a").len(), 2);
assert_eq!(store.by_source("peer-b").len(), 1);
assert_eq!(store.by_source("peer-c").len(), 0);
}
#[test]
fn export_import_roundtrip() {
let mut store = HiveStore {
units: HashMap::new(),
semantic_index: HashMap::new(),
};
store.insert(make_unit("ku_1", "Bash", "peer-a"));
store.insert(make_unit("ku_2", "Read", "peer-b"));
let json = store.export_json();
let mut store2 = HiveStore {
units: HashMap::new(),
semantic_index: HashMap::new(),
};
let imported = store2.import_json(&json).unwrap();
assert_eq!(imported, 2);
assert_eq!(store2.len(), 2);
assert!(store2.get("ku_1").is_some());
assert!(store2.get("ku_2").is_some());
}
#[test]
fn import_deduplicates() {
let mut store = HiveStore {
units: HashMap::new(),
semantic_index: HashMap::new(),
};
store.insert(make_unit("ku_1", "Bash", "peer-a"));
let json = store.export_json();
let imported = store.import_json(&json).unwrap();
assert_eq!(imported, 0); assert_eq!(store.len(), 1);
}
#[test]
fn save_and_load_roundtrip() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("test.jsonl");
let mut store = HiveStore {
units: HashMap::new(),
semantic_index: HashMap::new(),
};
store.insert(make_unit("ku_1", "Bash", "peer-a"));
store.insert(make_unit("ku_2", "Read", "peer-b"));
store.save_to(&path).unwrap();
let loaded = HiveStore::load_from(&path);
assert_eq!(loaded.len(), 2);
assert!(loaded.get("ku_1").is_some());
assert!(loaded.get("ku_2").is_some());
}
#[test]
fn units_since_filters_by_time() {
let mut store = HiveStore {
units: HashMap::new(),
semantic_index: HashMap::new(),
};
let mut old = make_unit("ku_1", "Bash", "peer-a");
old.last_validated_at = 500;
let mut new = make_unit("ku_2", "Read", "peer-b");
new.last_validated_at = 2000;
store.insert(old);
store.insert(new);
assert_eq!(store.units_since(1000).len(), 1);
assert_eq!(store.units_since(1000)[0].id, "ku_2");
}
#[test]
fn is_empty_and_len() {
let store = HiveStore {
units: HashMap::new(),
semantic_index: HashMap::new(),
};
assert!(store.is_empty());
assert_eq!(store.len(), 0);
}
}