Skip to main content

nexus_memory_hooks/
sync_state.rs

1//! Sync state tracking for incremental subconscious retrieval.
2//!
3//! Tracks the last-processed position per session so retrieval hooks only
4//! surface new information.  Persisted as JSON in `.nexus/sessions/{id}/sync_state.json`.
5
6use nexus_core::fsutil::atomic_write;
7use std::fs;
8use std::io;
9use std::path::{Path, PathBuf};
10
11use chrono::{DateTime, Utc};
12use serde::{Deserialize, Serialize};
13
14/// Incremental sync position for a single session.
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct SyncState {
17    /// Session identifier (matches `.nexus/sessions/{session_id}/`).
18    pub session_id: String,
19    /// Index of the last-processed transcript entry (None = never synced).
20    pub last_processed_index: Option<usize>,
21    /// Hash of the last soul.md content seen (detects dream/soul updates).
22    pub last_soul_hash: String,
23    /// Timestamp of the last successful sync.
24    pub last_sync_timestamp: DateTime<Utc>,
25    /// Number of hot-cache entries at last sync (detects cache promotions).
26    pub last_hot_cache_count: usize,
27    /// Hash of the hot cache entries at last sync (detects cache content changes).
28    pub last_hot_cache_hash: String,
29}
30
31impl SyncState {
32    /// Load sync state for a session, creating a fresh one if none exists.
33    pub fn load(project_root: &Path, session_id: &str) -> io::Result<Self> {
34        let path = sync_state_path(project_root, session_id)?;
35        if path.exists() {
36            let data = fs::read_to_string(&path)?;
37            let mut state: SyncState = serde_json::from_str(&data)
38                .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
39            if state.session_id != session_id {
40                return Err(io::Error::new(
41                    io::ErrorKind::InvalidData,
42                    "sync_state session_id does not match requested session",
43                ));
44            }
45            // Use canonical session_id from caller
46            state.session_id = session_id.to_string();
47            Ok(state)
48        } else {
49            Ok(Self::new(session_id))
50        }
51    }
52
53    /// Persist the current sync state to disk.
54    pub fn save(&self, project_root: &Path) -> io::Result<()> {
55        let path = sync_state_path(project_root, &self.session_id)?;
56        if let Some(parent) = path.parent() {
57            fs::create_dir_all(parent)?;
58        }
59        let data = serde_json::to_string_pretty(self).map_err(io::Error::other)?;
60        atomic_write(&path, &data)
61    }
62
63    /// Create a fresh sync state for a new session.
64    pub fn new(session_id: &str) -> Self {
65        Self {
66            session_id: session_id.to_string(),
67            last_processed_index: None,
68            last_soul_hash: String::new(),
69            last_sync_timestamp: Utc::now(),
70            last_hot_cache_count: 0,
71            last_hot_cache_hash: String::new(),
72        }
73    }
74
75    /// Whether there are updates since the last sync (soul changed or cache changed).
76    pub fn has_updates(
77        &self,
78        current_soul_hash: &str,
79        current_hot_count: usize,
80        current_hot_hash: &str,
81    ) -> bool {
82        current_soul_hash != self.last_soul_hash
83            || current_hot_count > self.last_hot_cache_count
84            || current_hot_hash != self.last_hot_cache_hash
85    }
86
87    /// Record a successful sync, advancing all watermarks.
88    pub fn advance(
89        &mut self,
90        soul_hash: String,
91        hot_cache_count: usize,
92        hot_cache_hash: String,
93        new_index: Option<usize>,
94    ) {
95        self.last_soul_hash = soul_hash;
96        self.last_hot_cache_count = hot_cache_count;
97        self.last_hot_cache_hash = hot_cache_hash;
98        if let Some(idx) = new_index {
99            self.last_processed_index = Some(idx);
100        }
101        self.last_sync_timestamp = Utc::now();
102    }
103}
104
105/// Compute the path to a session's sync state file.
106fn sync_state_path(project_root: &Path, session_id: &str) -> io::Result<PathBuf> {
107    // Validate session_id to prevent path traversal
108    if session_id.is_empty()
109        || session_id == "."
110        || session_id.len() > 128
111        || session_id.contains('/')
112        || session_id.contains('\\')
113        || session_id.contains("..")
114    {
115        return Err(io::Error::new(
116            io::ErrorKind::InvalidInput,
117            "session_id contains invalid characters",
118        ));
119    }
120    Ok(project_root
121        .join(".nexus")
122        .join("sessions")
123        .join(session_id)
124        .join("sync_state.json"))
125}
126
127/// Compute a quick hash of soul.md content for change detection.
128/// Uses a simple rolling hash (FxHash-style) instead of pulling in a full
129/// crypto hash — we only need to detect *change*, not verify integrity.
130pub fn soul_content_hash(content: &str) -> String {
131    // FxHash-like: multiply by a large odd constant and XOR with each byte group.
132    let mut hash: u64 = 0;
133    for chunk in content.as_bytes().chunks(8) {
134        let mut buf = [0u8; 8];
135        buf[..chunk.len()].copy_from_slice(chunk);
136        let val = u64::from_le_bytes(buf);
137        hash = hash.wrapping_mul(0x517cc1b727220a95).wrapping_add(val);
138    }
139    format!("{:016x}", hash)
140}
141
142/// Compute a quick hash of hot cache entry IDs for change detection.
143/// Uses an FxHash-style approach similar to soul_content_hash.
144pub fn hot_cache_hash(entry_ids: &[String]) -> String {
145    let mut sorted = entry_ids.to_vec();
146    sorted.sort();
147    let mut hash: u64 = 0;
148    for id in &sorted {
149        let id_bytes = id.as_bytes();
150        for chunk in id_bytes.chunks(8) {
151            let mut buf = [0u8; 8];
152            buf[..chunk.len()].copy_from_slice(chunk);
153            let val = u64::from_le_bytes(buf);
154            hash = hash.wrapping_mul(0x517cc1b727220a95).wrapping_add(val);
155        }
156    }
157    format!("{:016x}", hash)
158}
159
160#[cfg(test)]
161mod tests {
162    use super::*;
163    use tempfile::TempDir;
164
165    #[test]
166    fn new_state_has_defaults() {
167        let state = SyncState::new("test-session");
168        assert_eq!(state.session_id, "test-session");
169        assert_eq!(state.last_processed_index, None);
170        assert!(state.last_soul_hash.is_empty());
171        assert_eq!(state.last_hot_cache_count, 0);
172    }
173
174    #[test]
175    fn save_and_load_roundtrip() {
176        let dir = TempDir::new().unwrap();
177        let project_root = dir.path();
178
179        let mut state = SyncState::new("roundtrip-test");
180        state.last_soul_hash = "abc123".to_string();
181        state.last_hot_cache_count = 5;
182        state.last_processed_index = Some(42);
183        state.save(project_root).unwrap();
184
185        let loaded = SyncState::load(project_root, "roundtrip-test").unwrap();
186        assert_eq!(loaded.session_id, "roundtrip-test");
187        assert_eq!(loaded.last_soul_hash, "abc123");
188        assert_eq!(loaded.last_hot_cache_count, 5);
189        assert_eq!(loaded.last_processed_index, Some(42));
190    }
191
192    #[test]
193    fn load_nonexistent_creates_fresh() {
194        let dir = TempDir::new().unwrap();
195        let state = SyncState::load(dir.path(), "no-such-session").unwrap();
196        assert_eq!(state.session_id, "no-such-session");
197        assert_eq!(state.last_processed_index, None);
198    }
199
200    #[test]
201    fn has_updates_detects_soul_change() {
202        let state = SyncState::new("test");
203        assert!(state.has_updates("different", 0, ""));
204        assert!(!state.has_updates("", 0, ""));
205    }
206
207    #[test]
208    fn has_updates_detects_cache_growth() {
209        let state = SyncState::new("test");
210        assert!(state.has_updates("", 3, ""));
211    }
212
213    #[test]
214    fn advance_updates_watermarks() {
215        let mut state = SyncState::new("test");
216        state.advance("newhash".to_string(), 7, String::new(), Some(15));
217        assert_eq!(state.last_soul_hash, "newhash");
218        assert_eq!(state.last_hot_cache_count, 7);
219        assert_eq!(state.last_processed_index, Some(15));
220    }
221
222    #[test]
223    fn soul_hash_deterministic() {
224        let h1 = soul_content_hash("hello world");
225        let h2 = soul_content_hash("hello world");
226        let h3 = soul_content_hash("hello earth");
227        assert_eq!(h1, h2);
228        assert_ne!(h1, h3);
229    }
230}