Skip to main content

nexus_memory_hooks/
retry_buffer.rs

1//! Retry buffer for failed LLM enrichment
2//!
3//! When enrichment fails, writes artifacts to disk for later retry.
4//! Location: ~/.local/state/nexus-memory-system/pending-enrichment/
5
6use chrono::Utc;
7use serde::{Deserialize, Serialize};
8use std::fs;
9use std::path::{Path, PathBuf};
10use tracing::{info, warn};
11
12use crate::candidate::MemoryCandidate;
13use crate::claude_payload::NormalizedHookEvent;
14
15/// A retry artifact containing the data needed to retry failed enrichment.
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct RetryArtifact {
18    /// Agent that generated the event
19    pub agent: String,
20    /// Event name from the hook
21    pub event_name: String,
22    /// Normalized event data
23    pub normalized_event: NormalizedHookEvent,
24    /// Candidates that failed enrichment
25    pub candidates: Vec<MemoryCandidate>,
26    /// Error message from the failed attempt
27    pub error: String,
28    /// When this artifact was created
29    pub created_at: String,
30}
31
32/// File-based retry buffer for failed enrichment attempts.
33pub struct RetryBuffer {
34    buffer_path: PathBuf,
35}
36
37impl RetryBuffer {
38    /// Create a new retry buffer with the default path.
39    pub fn new() -> Self {
40        Self {
41            buffer_path: Self::buffer_path(),
42        }
43    }
44
45    /// Get the default buffer directory path.
46    ///
47    /// Uses XDG state directory: ~/.local/state/nexus-memory-system/pending-enrichment/
48    pub fn buffer_path() -> PathBuf {
49        if let Some(dir) = dirs::state_dir() {
50            dir.join("nexus-memory-system").join("pending-enrichment")
51        } else {
52            std::env::var("HOME")
53                .map(|h| {
54                    PathBuf::from(h).join(".local/state/nexus-memory-system/pending-enrichment")
55                })
56                .unwrap_or_else(|_| PathBuf::from(".nexus-pending-enrichment"))
57        }
58    }
59
60    /// Write a failed enrichment attempt to the buffer.
61    ///
62    /// Creates a JSON file named `{timestamp}_{uuid}.json` in the buffer directory.
63    /// Creates the directory if it doesn't exist.
64    ///
65    /// Returns the path to the created file.
66    pub fn write_failed(
67        &self,
68        event: &NormalizedHookEvent,
69        candidates: &[MemoryCandidate],
70        error: &str,
71    ) -> std::io::Result<PathBuf> {
72        // Ensure directory exists
73        fs::create_dir_all(&self.buffer_path)?;
74
75        // Generate filename: timestamp with colons replaced + UUID
76        let timestamp = Utc::now().to_rfc3339().replace(':', "-");
77        let uuid = uuid::Uuid::new_v4();
78        let filename = format!("{}_{}.json", timestamp, uuid);
79        let file_path = self.buffer_path.join(&filename);
80
81        // Create the artifact
82        let artifact = RetryArtifact {
83            agent: event.agent.clone(),
84            event_name: event.event_name.clone(),
85            normalized_event: event.clone(),
86            candidates: candidates.to_vec(),
87            error: error.to_string(),
88            created_at: Utc::now().to_rfc3339(),
89        };
90
91        // Serialize and write
92        let json = serde_json::to_string_pretty(&artifact)?;
93        fs::write(&file_path, json)?;
94
95        warn!(
96            "Buffered failed enrichment for agent={}, event={}: {}",
97            event.agent,
98            event.event_name,
99            file_path.display()
100        );
101
102        Ok(file_path)
103    }
104
105    /// List all pending retry artifacts in the buffer directory.
106    ///
107    /// Returns paths to all .json files in the buffer.
108    pub fn list_pending(&self) -> std::io::Result<Vec<PathBuf>> {
109        if !self.buffer_path.exists() {
110            return Ok(Vec::new());
111        }
112
113        let entries = fs::read_dir(&self.buffer_path)?;
114
115        let mut files = Vec::new();
116        for entry in entries {
117            let entry = entry?;
118            let path = entry.path();
119
120            // Only include .json files
121            if path.extension().and_then(|s| s.to_str()) == Some("json") {
122                files.push(path);
123            }
124        }
125
126        // Sort by creation time (filename starts with timestamp)
127        files.sort();
128
129        Ok(files)
130    }
131
132    /// Read a pending retry artifact from disk.
133    pub fn read_pending(path: &Path) -> std::io::Result<RetryArtifact> {
134        let contents = fs::read_to_string(path)?;
135        let artifact: RetryArtifact = serde_json::from_str(&contents)
136            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
137        Ok(artifact)
138    }
139
140    /// Remove a pending retry artifact from disk.
141    pub fn remove_pending(path: &Path) -> std::io::Result<()> {
142        fs::remove_file(path)?;
143        info!("Removed buffered enrichment artifact: {}", path.display());
144        Ok(())
145    }
146
147    /// Count the number of pending retry artifacts.
148    pub fn pending_count(&self) -> std::io::Result<usize> {
149        Ok(self.list_pending()?.len())
150    }
151
152    /// Clear all pending retry artifacts.
153    ///
154    /// Returns the number of artifacts removed.
155    pub fn clear_all(&self) -> std::io::Result<usize> {
156        let files = self.list_pending()?;
157        let count = files.len();
158
159        for file in files {
160            fs::remove_file(&file)?;
161        }
162
163        info!("Cleared {} buffered enrichment artifacts", count);
164        Ok(count)
165    }
166}
167
168impl Default for RetryBuffer {
169    fn default() -> Self {
170        Self::new()
171    }
172}
173
174#[cfg(test)]
175mod tests {
176    use super::*;
177    use serde_json::json;
178
179    fn make_test_event() -> NormalizedHookEvent {
180        NormalizedHookEvent {
181            agent: "test-agent".to_string(),
182            event_name: "test_event".to_string(),
183            observed_at: Utc::now(),
184            session_id: Some("session-123".to_string()),
185            turn_id: Some("turn-456".to_string()),
186            cwd: Some("/home/user/project".to_string()),
187            tool_name: Some("test_tool".to_string()),
188            tool_input: None,
189            tool_response_text: None,
190            assistant_message_text: None,
191            user_message_text: None,
192            observer: Some("test-agent".to_string()),
193            subject: Some("test-agent".to_string()),
194            session_key: Some("session-123".to_string()),
195            raw_payload: json!({}),
196        }
197    }
198
199    fn make_test_candidates() -> Vec<MemoryCandidate> {
200        vec![MemoryCandidate {
201            candidate_id: "test-1".to_string(),
202            source_event_name: "test_event".to_string(),
203            source_agent: "test-agent".to_string(),
204            signal_score: 0.8,
205            provisional_category: Some("preferences".to_string()),
206            memory_text: "Test memory".to_string(),
207            evidence: json!({"key": "value"}),
208            labels: vec!["test".to_string()],
209        }]
210    }
211
212    #[test]
213    fn test_buffer_path() {
214        let path = RetryBuffer::buffer_path();
215        assert!(path.ends_with("pending-enrichment"));
216    }
217
218    #[test]
219    fn test_retry_artifact_serialization() {
220        let event = make_test_event();
221        let candidates = make_test_candidates();
222
223        let artifact = RetryArtifact {
224            agent: event.agent.clone(),
225            event_name: event.event_name.clone(),
226            normalized_event: event,
227            candidates,
228            error: "Test error".to_string(),
229            created_at: Utc::now().to_rfc3339(),
230        };
231
232        let serialized = serde_json::to_string(&artifact).unwrap();
233        let deserialized: RetryArtifact = serde_json::from_str(&serialized).unwrap();
234
235        assert_eq!(deserialized.agent, "test-agent");
236        assert_eq!(deserialized.event_name, "test_event");
237        assert_eq!(deserialized.candidates.len(), 1);
238        assert_eq!(deserialized.error, "Test error");
239    }
240
241    #[test]
242    fn test_write_and_list_pending() {
243        // Use a temp directory for testing
244        let temp_dir = std::env::temp_dir().join("nexus-test-retry-buffer");
245        fs::create_dir_all(&temp_dir).unwrap();
246
247        // We need to create a custom RetryBuffer with the temp path
248        // Since buffer_path() is static, we'll test the write logic differently
249        let event = make_test_event();
250        let candidates = make_test_candidates();
251
252        // Just test serialization works
253        let artifact = RetryArtifact {
254            agent: event.agent.clone(),
255            event_name: event.event_name.clone(),
256            normalized_event: event,
257            candidates,
258            error: "Test error".to_string(),
259            created_at: Utc::now().to_rfc3339(),
260        };
261
262        let json = serde_json::to_string_pretty(&artifact).unwrap();
263        assert!(json.contains("test-agent"));
264        assert!(json.contains("Test error"));
265
266        // Cleanup
267        fs::remove_dir_all(temp_dir).ok();
268    }
269
270    #[test]
271    fn test_default() {
272        let buffer = RetryBuffer::default();
273        assert_eq!(buffer.buffer_path, RetryBuffer::buffer_path());
274    }
275}