nexus_memory_hooks/
retry_buffer.rs1use chrono::Utc;
7use serde::{Deserialize, Serialize};
8use std::fs;
9use std::path::{Path, PathBuf};
10use tracing::{info, warn};
11
12use crate::candidate::MemoryCandidate;
13use crate::claude_payload::NormalizedHookEvent;
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct RetryArtifact {
18 pub agent: String,
20 pub event_name: String,
22 pub normalized_event: NormalizedHookEvent,
24 pub candidates: Vec<MemoryCandidate>,
26 pub error: String,
28 pub created_at: String,
30}
31
32pub struct RetryBuffer {
34 buffer_path: PathBuf,
35}
36
37impl RetryBuffer {
38 pub fn new() -> Self {
40 Self {
41 buffer_path: Self::buffer_path(),
42 }
43 }
44
45 pub fn buffer_path() -> PathBuf {
49 if let Some(dir) = dirs::state_dir() {
50 dir.join("nexus-memory-system").join("pending-enrichment")
51 } else {
52 std::env::var("HOME")
53 .map(|h| {
54 PathBuf::from(h).join(".local/state/nexus-memory-system/pending-enrichment")
55 })
56 .unwrap_or_else(|_| PathBuf::from(".nexus-pending-enrichment"))
57 }
58 }
59
60 pub fn write_failed(
67 &self,
68 event: &NormalizedHookEvent,
69 candidates: &[MemoryCandidate],
70 error: &str,
71 ) -> std::io::Result<PathBuf> {
72 fs::create_dir_all(&self.buffer_path)?;
74
75 let timestamp = Utc::now().to_rfc3339().replace(':', "-");
77 let uuid = uuid::Uuid::new_v4();
78 let filename = format!("{}_{}.json", timestamp, uuid);
79 let file_path = self.buffer_path.join(&filename);
80
81 let artifact = RetryArtifact {
83 agent: event.agent.clone(),
84 event_name: event.event_name.clone(),
85 normalized_event: event.clone(),
86 candidates: candidates.to_vec(),
87 error: error.to_string(),
88 created_at: Utc::now().to_rfc3339(),
89 };
90
91 let json = serde_json::to_string_pretty(&artifact)?;
93 fs::write(&file_path, json)?;
94
95 warn!(
96 "Buffered failed enrichment for agent={}, event={}: {}",
97 event.agent,
98 event.event_name,
99 file_path.display()
100 );
101
102 Ok(file_path)
103 }
104
105 pub fn list_pending(&self) -> std::io::Result<Vec<PathBuf>> {
109 if !self.buffer_path.exists() {
110 return Ok(Vec::new());
111 }
112
113 let entries = fs::read_dir(&self.buffer_path)?;
114
115 let mut files = Vec::new();
116 for entry in entries {
117 let entry = entry?;
118 let path = entry.path();
119
120 if path.extension().and_then(|s| s.to_str()) == Some("json") {
122 files.push(path);
123 }
124 }
125
126 files.sort();
128
129 Ok(files)
130 }
131
132 pub fn read_pending(path: &Path) -> std::io::Result<RetryArtifact> {
134 let contents = fs::read_to_string(path)?;
135 let artifact: RetryArtifact = serde_json::from_str(&contents)
136 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
137 Ok(artifact)
138 }
139
140 pub fn remove_pending(path: &Path) -> std::io::Result<()> {
142 fs::remove_file(path)?;
143 info!("Removed buffered enrichment artifact: {}", path.display());
144 Ok(())
145 }
146
147 pub fn pending_count(&self) -> std::io::Result<usize> {
149 Ok(self.list_pending()?.len())
150 }
151
152 pub fn clear_all(&self) -> std::io::Result<usize> {
156 let files = self.list_pending()?;
157 let count = files.len();
158
159 for file in files {
160 fs::remove_file(&file)?;
161 }
162
163 info!("Cleared {} buffered enrichment artifacts", count);
164 Ok(count)
165 }
166}
167
168impl Default for RetryBuffer {
169 fn default() -> Self {
170 Self::new()
171 }
172}
173
174#[cfg(test)]
175mod tests {
176 use super::*;
177 use serde_json::json;
178
179 fn make_test_event() -> NormalizedHookEvent {
180 NormalizedHookEvent {
181 agent: "test-agent".to_string(),
182 event_name: "test_event".to_string(),
183 observed_at: Utc::now(),
184 session_id: Some("session-123".to_string()),
185 turn_id: Some("turn-456".to_string()),
186 cwd: Some("/home/user/project".to_string()),
187 tool_name: Some("test_tool".to_string()),
188 tool_input: None,
189 tool_response_text: None,
190 assistant_message_text: None,
191 user_message_text: None,
192 observer: Some("test-agent".to_string()),
193 subject: Some("test-agent".to_string()),
194 session_key: Some("session-123".to_string()),
195 raw_payload: json!({}),
196 }
197 }
198
199 fn make_test_candidates() -> Vec<MemoryCandidate> {
200 vec![MemoryCandidate {
201 candidate_id: "test-1".to_string(),
202 source_event_name: "test_event".to_string(),
203 source_agent: "test-agent".to_string(),
204 signal_score: 0.8,
205 provisional_category: Some("preferences".to_string()),
206 memory_text: "Test memory".to_string(),
207 evidence: json!({"key": "value"}),
208 labels: vec!["test".to_string()],
209 }]
210 }
211
212 #[test]
213 fn test_buffer_path() {
214 let path = RetryBuffer::buffer_path();
215 assert!(path.ends_with("pending-enrichment"));
216 }
217
218 #[test]
219 fn test_retry_artifact_serialization() {
220 let event = make_test_event();
221 let candidates = make_test_candidates();
222
223 let artifact = RetryArtifact {
224 agent: event.agent.clone(),
225 event_name: event.event_name.clone(),
226 normalized_event: event,
227 candidates,
228 error: "Test error".to_string(),
229 created_at: Utc::now().to_rfc3339(),
230 };
231
232 let serialized = serde_json::to_string(&artifact).unwrap();
233 let deserialized: RetryArtifact = serde_json::from_str(&serialized).unwrap();
234
235 assert_eq!(deserialized.agent, "test-agent");
236 assert_eq!(deserialized.event_name, "test_event");
237 assert_eq!(deserialized.candidates.len(), 1);
238 assert_eq!(deserialized.error, "Test error");
239 }
240
241 #[test]
242 fn test_write_and_list_pending() {
243 let temp_dir = std::env::temp_dir().join("nexus-test-retry-buffer");
245 fs::create_dir_all(&temp_dir).unwrap();
246
247 let event = make_test_event();
250 let candidates = make_test_candidates();
251
252 let artifact = RetryArtifact {
254 agent: event.agent.clone(),
255 event_name: event.event_name.clone(),
256 normalized_event: event,
257 candidates,
258 error: "Test error".to_string(),
259 created_at: Utc::now().to_rfc3339(),
260 };
261
262 let json = serde_json::to_string_pretty(&artifact).unwrap();
263 assert!(json.contains("test-agent"));
264 assert!(json.contains("Test error"));
265
266 fs::remove_dir_all(temp_dir).ok();
268 }
269
270 #[test]
271 fn test_default() {
272 let buffer = RetryBuffer::default();
273 assert_eq!(buffer.buffer_path, RetryBuffer::buffer_path());
274 }
275}