Skip to main content

nexus_memory_hooks/
retry_buffer.rs

1//! Retry buffer for failed LLM enrichment
2//!
3//! When enrichment fails, writes artifacts to disk for later retry.
4//! Location: ~/.local/state/nexus-memory-system/pending-enrichment/
5
6use chrono::Utc;
7use serde::{Deserialize, Serialize};
8use std::fs;
9use std::path::{Path, PathBuf};
10use tracing::{info, warn};
11
12use crate::candidate::MemoryCandidate;
13use crate::claude_payload::NormalizedHookEvent;
14
15/// A retry artifact containing the data needed to retry failed enrichment.
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct RetryArtifact {
18    /// Agent that generated the event
19    pub agent: String,
20    /// Event name from the hook
21    pub event_name: String,
22    /// Normalized event data
23    pub normalized_event: NormalizedHookEvent,
24    /// Candidates that failed enrichment
25    pub candidates: Vec<MemoryCandidate>,
26    /// Error message from the failed attempt
27    pub error: String,
28    /// When this artifact was created
29    pub created_at: String,
30}
31
32/// File-based retry buffer for failed enrichment attempts.
33pub struct RetryBuffer {
34    buffer_path: PathBuf,
35}
36
37impl RetryBuffer {
38    /// Create a new retry buffer with the default path.
39    pub fn new() -> Self {
40        Self {
41            buffer_path: Self::buffer_path(),
42        }
43    }
44
45    /// Get the default buffer directory path.
46    ///
47    /// Uses XDG state directory: ~/.local/state/nexus-memory-system/pending-enrichment/
48    pub fn buffer_path() -> PathBuf {
49        if let Some(dir) = dirs::state_dir() {
50            dir.join("nexus-memory-system").join("pending-enrichment")
51        } else {
52            std::env::var("HOME")
53                .map(|h| {
54                    PathBuf::from(h).join(".local/state/nexus-memory-system/pending-enrichment")
55                })
56                .unwrap_or_else(|_| PathBuf::from(".nexus-pending-enrichment"))
57        }
58    }
59
60    /// Write a failed enrichment attempt to the buffer.
61    ///
62    /// Creates a JSON file named `{timestamp}_{uuid}.json` in the buffer directory.
63    /// Creates the directory if it doesn't exist.
64    ///
65    /// Returns the path to the created file.
66    pub fn write_failed(
67        &self,
68        event: &NormalizedHookEvent,
69        candidates: &[MemoryCandidate],
70        error: &str,
71    ) -> std::io::Result<PathBuf> {
72        // Ensure directory exists
73        fs::create_dir_all(&self.buffer_path)?;
74
75        // Generate filename: timestamp with colons replaced + UUID
76        let timestamp = Utc::now().to_rfc3339().replace(':', "-");
77        let uuid = uuid::Uuid::new_v4();
78        let filename = format!("{}_{}.json", timestamp, uuid);
79        let file_path = self.buffer_path.join(&filename);
80
81        // Create the artifact
82        let artifact = RetryArtifact {
83            agent: event.agent.clone(),
84            event_name: event.event_name.clone(),
85            normalized_event: event.clone(),
86            candidates: candidates.to_vec(),
87            error: error.to_string(),
88            created_at: Utc::now().to_rfc3339(),
89        };
90
91        // Serialize and write
92        let json = serde_json::to_string_pretty(&artifact)?;
93        fs::write(&file_path, json)?;
94
95        warn!(
96            "Buffered failed enrichment for agent={}, event={}: {}",
97            event.agent,
98            event.event_name,
99            file_path.display()
100        );
101
102        Ok(file_path)
103    }
104
105    /// List all pending retry artifacts in the buffer directory.
106    ///
107    /// Returns paths to all .json files in the buffer.
108    pub fn list_pending(&self) -> std::io::Result<Vec<PathBuf>> {
109        if !self.buffer_path.exists() {
110            return Ok(Vec::new());
111        }
112
113        let entries = fs::read_dir(&self.buffer_path)?;
114
115        let mut files = Vec::new();
116        for entry in entries {
117            let entry = entry?;
118            let path = entry.path();
119
120            // Only include .json files
121            if path.extension().and_then(|s| s.to_str()) == Some("json") {
122                files.push(path);
123            }
124        }
125
126        // Sort by creation time (filename starts with timestamp)
127        files.sort();
128
129        Ok(files)
130    }
131
132    /// Read a pending retry artifact from disk.
133    pub fn read_pending(path: &Path) -> std::io::Result<RetryArtifact> {
134        let contents = fs::read_to_string(path)?;
135        let artifact: RetryArtifact = serde_json::from_str(&contents)
136            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
137        Ok(artifact)
138    }
139
140    /// Remove a pending retry artifact from disk.
141    pub fn remove_pending(path: &Path) -> std::io::Result<()> {
142        fs::remove_file(path)?;
143        info!("Removed buffered enrichment artifact: {}", path.display());
144        Ok(())
145    }
146
147    /// Count the number of pending retry artifacts.
148    pub fn pending_count(&self) -> std::io::Result<usize> {
149        Ok(self.list_pending()?.len())
150    }
151
152    /// Clear all pending retry artifacts.
153    ///
154    /// Returns the number of artifacts removed.
155    pub fn clear_all(&self) -> std::io::Result<usize> {
156        let files = self.list_pending()?;
157        let count = files.len();
158
159        for file in files {
160            fs::remove_file(&file)?;
161        }
162
163        info!("Cleared {} buffered enrichment artifacts", count);
164        Ok(count)
165    }
166}
167
168impl Default for RetryBuffer {
169    fn default() -> Self {
170        Self::new()
171    }
172}
173
174#[cfg(test)]
175mod tests {
176    use super::*;
177    use serde_json::json;
178
179    fn make_test_event() -> NormalizedHookEvent {
180        NormalizedHookEvent {
181            agent: "test-agent".to_string(),
182            event_name: "test_event".to_string(),
183            observed_at: Utc::now(),
184            session_id: Some("session-123".to_string()),
185            turn_id: Some("turn-456".to_string()),
186            cwd: Some("/home/user/project".to_string()),
187            tool_name: Some("test_tool".to_string()),
188            tool_input: None,
189            tool_response_text: None,
190            assistant_message_text: None,
191            user_message_text: None,
192            raw_payload: json!({}),
193        }
194    }
195
196    fn make_test_candidates() -> Vec<MemoryCandidate> {
197        vec![MemoryCandidate {
198            candidate_id: "test-1".to_string(),
199            source_event_name: "test_event".to_string(),
200            source_agent: "test-agent".to_string(),
201            signal_score: 0.8,
202            provisional_category: Some("preferences".to_string()),
203            memory_text: "Test memory".to_string(),
204            evidence: json!({"key": "value"}),
205            labels: vec!["test".to_string()],
206        }]
207    }
208
209    #[test]
210    fn test_buffer_path() {
211        let path = RetryBuffer::buffer_path();
212        assert!(path.ends_with("pending-enrichment"));
213    }
214
215    #[test]
216    fn test_retry_artifact_serialization() {
217        let event = make_test_event();
218        let candidates = make_test_candidates();
219
220        let artifact = RetryArtifact {
221            agent: event.agent.clone(),
222            event_name: event.event_name.clone(),
223            normalized_event: event,
224            candidates,
225            error: "Test error".to_string(),
226            created_at: Utc::now().to_rfc3339(),
227        };
228
229        let serialized = serde_json::to_string(&artifact).unwrap();
230        let deserialized: RetryArtifact = serde_json::from_str(&serialized).unwrap();
231
232        assert_eq!(deserialized.agent, "test-agent");
233        assert_eq!(deserialized.event_name, "test_event");
234        assert_eq!(deserialized.candidates.len(), 1);
235        assert_eq!(deserialized.error, "Test error");
236    }
237
238    #[test]
239    fn test_write_and_list_pending() {
240        // Use a temp directory for testing
241        let temp_dir = std::env::temp_dir().join("nexus-test-retry-buffer");
242        fs::create_dir_all(&temp_dir).unwrap();
243
244        // We need to create a custom RetryBuffer with the temp path
245        // Since buffer_path() is static, we'll test the write logic differently
246        let event = make_test_event();
247        let candidates = make_test_candidates();
248
249        // Just test serialization works
250        let artifact = RetryArtifact {
251            agent: event.agent.clone(),
252            event_name: event.event_name.clone(),
253            normalized_event: event,
254            candidates,
255            error: "Test error".to_string(),
256            created_at: Utc::now().to_rfc3339(),
257        };
258
259        let json = serde_json::to_string_pretty(&artifact).unwrap();
260        assert!(json.contains("test-agent"));
261        assert!(json.contains("Test error"));
262
263        // Cleanup
264        fs::remove_dir_all(temp_dir).ok();
265    }
266
267    #[test]
268    fn test_default() {
269        let buffer = RetryBuffer::default();
270        assert_eq!(buffer.buffer_path, RetryBuffer::buffer_path());
271    }
272}