use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::collections::hash_map::DefaultHasher;
use std::collections::{HashMap, HashSet, VecDeque};
use std::fs;
use std::hash::{Hash, Hasher};
use std::path::PathBuf;
const DEFAULT_MAX_HASHES: usize = 50_000;
#[derive(Debug, Clone, Default)]
pub struct SeenHashSet {
order: VecDeque<u64>,
set: HashSet<u64>,
}
impl SeenHashSet {
pub fn len(&self) -> usize {
self.set.len()
}
pub fn is_empty(&self) -> bool {
self.set.is_empty()
}
pub fn contains(&self, hash: &u64) -> bool {
self.set.contains(hash)
}
pub fn insert(&mut self, hash: u64) {
if self.set.remove(&hash) {
self.order.retain(|existing| *existing != hash);
}
self.set.insert(hash);
self.order.push_back(hash);
}
pub fn prune_oldest(&mut self, limit: usize) {
while self.set.len() > limit {
let Some(hash) = self.order.pop_front() else {
break;
};
self.set.remove(&hash);
}
}
}
impl Serialize for SeenHashSet {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
self.order.serialize(serializer)
}
}
impl<'de> Deserialize<'de> for SeenHashSet {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let hashes = Vec::<u64>::deserialize(deserializer)?;
let mut out = SeenHashSet::default();
for hash in hashes {
out.insert(hash);
}
Ok(out)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RunRecord {
pub timestamp: DateTime<Utc>,
pub entries_added: usize,
pub sources: Vec<String>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct StateManager {
pub last_processed: HashMap<String, DateTime<Utc>>,
pub seen_hashes: HashMap<String, SeenHashSet>,
pub runs: Vec<RunRecord>,
}
impl StateManager {
fn state_path() -> Result<PathBuf> {
let base = crate::store::store_base_dir()?;
Ok(base.join("state.json"))
}
pub fn load() -> Self {
let path = match Self::state_path() {
Ok(p) => p,
Err(_) => return Self::default(),
};
if !path.exists() {
return Self::default();
}
let contents = match fs::read_to_string(&path) {
Ok(c) => c,
Err(_) => return Self::default(),
};
serde_json::from_str(&contents).unwrap_or_default()
}
pub fn save(&self) -> Result<()> {
let path = Self::state_path()?;
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)
.with_context(|| format!("Failed to create config dir: {}", parent.display()))?;
}
let json = serde_json::to_string_pretty(self).context("Failed to serialize state")?;
fs::write(&path, json)
.with_context(|| format!("Failed to write state file: {}", path.display()))?;
Ok(())
}
pub fn content_hash(agent: &str, timestamp: i64, message: &str) -> u64 {
let mut hasher = DefaultHasher::new();
agent.hash(&mut hasher);
timestamp.hash(&mut hasher);
message.hash(&mut hasher);
hasher.finish()
}
pub fn overlap_hash(timestamp: i64, message: &str) -> u64 {
let mut hasher = DefaultHasher::new();
let bucket = timestamp / 60; bucket.hash(&mut hasher);
message.hash(&mut hasher);
hasher.finish()
}
pub fn is_new(&self, project: &str, hash: u64) -> bool {
self.seen_hashes
.get(project)
.is_none_or(|set| !set.contains(&hash))
}
pub fn mark_seen(&mut self, project: &str, hash: u64) {
self.seen_hashes
.entry(project.to_string())
.or_default()
.insert(hash);
}
pub fn get_watermark(&self, source: &str) -> Option<DateTime<Utc>> {
self.last_processed.get(source).copied()
}
pub fn migrate_watermark_aliases(&mut self, canonical: &str, aliases: &[String]) -> bool {
if self.last_processed.contains_key(canonical) {
return false;
}
let migrated = aliases
.iter()
.filter_map(|alias| self.last_processed.get(alias).copied())
.max();
if let Some(ts) = migrated {
self.last_processed.insert(canonical.to_string(), ts);
true
} else {
false
}
}
pub fn update_watermark(&mut self, source: &str, ts: DateTime<Utc>) {
let entry = self.last_processed.entry(source.to_string()).or_insert(ts);
if ts > *entry {
*entry = ts;
}
}
pub fn record_run(&mut self, entries: usize, sources: Vec<String>) {
self.runs.push(RunRecord {
timestamp: Utc::now(),
entries_added: entries,
sources,
});
}
pub fn prune_old_hashes(&mut self, max_per_project: usize) {
let limit = if max_per_project == 0 {
DEFAULT_MAX_HASHES
} else {
max_per_project
};
for set in self.seen_hashes.values_mut() {
set.prune_oldest(limit);
}
}
pub fn reset_project(&mut self, project: &str) {
self.seen_hashes.remove(project);
}
pub fn reset_all(&mut self) {
self.seen_hashes.clear();
}
pub fn total_hashes(&self) -> usize {
self.seen_hashes.values().map(|s| s.len()).sum()
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::TimeZone;
#[test]
fn test_default_state_is_empty() {
let state = StateManager::default();
assert!(state.last_processed.is_empty());
assert!(state.seen_hashes.is_empty());
assert!(state.runs.is_empty());
}
#[test]
fn test_content_hash_deterministic() {
let h1 = StateManager::content_hash("claude", 1700000000, "hello world");
let h2 = StateManager::content_hash("claude", 1700000000, "hello world");
assert_eq!(h1, h2);
}
#[test]
fn test_content_hash_varies_with_input() {
let h1 = StateManager::content_hash("claude", 1700000000, "hello");
let h2 = StateManager::content_hash("claude", 1700000000, "world");
assert_ne!(h1, h2, "different message → different hash");
let h3 = StateManager::content_hash("codex", 1700000000, "hello");
assert_ne!(h1, h3, "different agent → different hash");
let h5 = StateManager::content_hash("claude", 1700000001, "hello");
assert_ne!(h1, h5, "different timestamp → different hash");
}
#[test]
fn test_overlap_hash_ignores_agent() {
let prompt = "Deploy the new auth module to staging and run integration tests";
let ts = 1700000000i64;
let h_claude = StateManager::overlap_hash(ts, prompt);
let h_codex = StateManager::overlap_hash(ts, prompt);
assert_eq!(
h_claude, h_codex,
"same message + same bucket → SAME overlap hash"
);
}
#[test]
fn test_overlap_hash_buckets_60s() {
let prompt = "Identical broadcast prompt";
let base = 1700000040i64; let same_bucket = base + 19;
let h1 = StateManager::overlap_hash(base, prompt);
let h2 = StateManager::overlap_hash(same_bucket, prompt);
assert_eq!(h1, h2, "within same 60s bucket → SAME hash");
let next_bucket = base - (base % 60) + 60; let h3 = StateManager::overlap_hash(next_bucket, prompt);
assert_ne!(h1, h3, "different 60s bucket → different hash");
}
#[test]
fn test_overlap_hash_different_message() {
let ts = 1700000000i64;
let h1 = StateManager::overlap_hash(ts, "prompt A");
let h2 = StateManager::overlap_hash(ts, "prompt B");
assert_ne!(h1, h2, "different message → different overlap hash");
}
#[test]
fn test_is_new_and_mark_seen_per_project() {
let mut state = StateManager::default();
let hash = StateManager::content_hash("claude", 100, "msg");
assert!(state.is_new("projA", hash));
assert!(state.is_new("projB", hash));
state.mark_seen("projA", hash);
assert!(!state.is_new("projA", hash));
assert!(state.is_new("projB", hash));
state.mark_seen("projB", hash);
assert!(!state.is_new("projB", hash));
}
#[test]
fn test_watermark_none_for_unknown_source() {
let state = StateManager::default();
assert_eq!(state.get_watermark("nonexistent"), None);
}
#[test]
fn test_watermark_update_only_if_newer() {
let mut state = StateManager::default();
let t1 = Utc.with_ymd_and_hms(2026, 1, 1, 10, 0, 0).unwrap();
let t2 = Utc.with_ymd_and_hms(2026, 1, 1, 12, 0, 0).unwrap();
let t0 = Utc.with_ymd_and_hms(2026, 1, 1, 8, 0, 0).unwrap();
state.update_watermark("claude:CodeScribe", t1);
assert_eq!(state.get_watermark("claude:CodeScribe"), Some(t1));
state.update_watermark("claude:CodeScribe", t2);
assert_eq!(state.get_watermark("claude:CodeScribe"), Some(t2));
state.update_watermark("claude:CodeScribe", t0);
assert_eq!(state.get_watermark("claude:CodeScribe"), Some(t2));
}
#[test]
fn test_record_run() {
let mut state = StateManager::default();
assert!(state.runs.is_empty());
state.record_run(
42,
vec!["claude:Proj".to_string(), "codex:global".to_string()],
);
assert_eq!(state.runs.len(), 1);
assert_eq!(state.runs[0].entries_added, 42);
assert_eq!(state.runs[0].sources, vec!["claude:Proj", "codex:global"]);
}
#[test]
fn test_prune_old_hashes_below_limit() {
let mut state = StateManager::default();
for i in 0..10u64 {
state.mark_seen("proj", i);
}
state.prune_old_hashes(100);
assert_eq!(state.seen_hashes["proj"].len(), 10);
}
#[test]
fn test_prune_old_hashes_above_limit() {
let mut state = StateManager::default();
for i in 0..100u64 {
state.mark_seen("proj", i);
}
state.prune_old_hashes(30);
assert_eq!(state.seen_hashes["proj"].len(), 30);
}
#[test]
fn lru_evicts_oldest_first() {
let mut state = StateManager::default();
for i in 0..10u64 {
state.mark_seen("proj", i);
}
state.prune_old_hashes(5);
for old in 0..5u64 {
assert!(
state.is_new("proj", old),
"old hash {old} should be evicted"
);
}
for fresh in 5..10u64 {
assert!(
!state.is_new("proj", fresh),
"fresh hash {fresh} should remain"
);
}
}
#[test]
fn watermark_migration_carries_timestamp_forward() {
let mut state = StateManager::default();
let ts = Utc.with_ymd_and_hms(2026, 5, 6, 11, 0, 0).unwrap();
state.update_watermark("claude+codex+gemini:all", ts);
let migrated = state.migrate_watermark_aliases(
"claude+codex+gemini+junie:all",
&["claude+codex+gemini:all".to_string()],
);
assert!(migrated);
assert_eq!(
state.get_watermark("claude+codex+gemini+junie:all"),
Some(ts)
);
}
#[test]
fn test_prune_old_hashes_default_limit() {
let mut state = StateManager::default();
state.prune_old_hashes(0);
assert_eq!(state.total_hashes(), 0);
}
#[test]
fn test_reset_project() {
let mut state = StateManager::default();
state.mark_seen("projA", 1);
state.mark_seen("projA", 2);
state.mark_seen("projB", 3);
state.reset_project("projA");
assert!(state.is_new("projA", 1));
assert!(!state.is_new("projB", 3));
}
#[test]
fn test_reset_all() {
let mut state = StateManager::default();
state.mark_seen("projA", 1);
state.mark_seen("projB", 2);
state.reset_all();
assert!(state.is_new("projA", 1));
assert!(state.is_new("projB", 2));
assert_eq!(state.total_hashes(), 0);
}
#[test]
fn test_serialization_roundtrip() {
let mut state = StateManager::default();
let t = Utc.with_ymd_and_hms(2026, 1, 20, 15, 30, 0).unwrap();
state.update_watermark("claude:TestProject", t);
state.mark_seen("myproj", 123456789);
state.mark_seen("myproj", 987654321);
state.record_run(5, vec!["claude:TestProject".to_string()]);
let json = serde_json::to_string_pretty(&state).unwrap();
let restored: StateManager = serde_json::from_str(&json).unwrap();
assert_eq!(restored.get_watermark("claude:TestProject"), Some(t));
assert!(!restored.is_new("myproj", 123456789));
assert!(!restored.is_new("myproj", 987654321));
assert!(restored.is_new("myproj", 111111111));
assert!(restored.is_new("other", 123456789)); assert_eq!(restored.runs.len(), 1);
assert_eq!(restored.runs[0].entries_added, 5);
}
#[test]
fn test_state_path_is_under_store() {
if let Ok(path) = StateManager::state_path() {
assert!(path.to_string_lossy().contains(".aicx"));
assert!(path.to_string_lossy().ends_with("state.json"));
}
}
}