1use chrono::Utc;
6use nexus_core::{
7 infer_perspective, CognitiveLevel, CognitiveMetadata, MemoryCategory, MemoryLaneType,
8 PerspectiveSource,
9};
10use nexus_storage::models::EnqueueJobParams;
11use nexus_storage::repository::MemoryRepository;
12use serde_json::json;
13use std::collections::HashMap;
14use tracing::{debug, info, warn};
15
16use crate::claude_payload::NormalizedHookEvent;
17use crate::enrichment::EnrichmentBatchResult;
18
19#[derive(Debug, Clone, Default)]
21pub struct PersistResult {
22 pub stored: usize,
24 pub skipped: usize,
26 pub categories: HashMap<String, usize>,
28 pub stored_memory_ids: Vec<i64>,
30}
31
32pub async fn persist_enriched_memories(
40 namespace_id: i64,
41 event: &NormalizedHookEvent,
42 batch: &EnrichmentBatchResult,
43 memory_repo: &MemoryRepository,
44 model_name: &str,
45) -> anyhow::Result<PersistResult> {
46 let mut result = PersistResult::default();
47 let derived_session_key = derive_session_key(
48 &event.agent,
49 event.session_id.as_deref(),
50 event.cwd.as_deref(),
51 );
52 let perspective = infer_perspective(
53 PerspectiveSource::HookIngest,
54 event.agent.clone(),
55 None::<String>,
56 Some(derived_session_key.clone()),
57 );
58
59 for enriched in &batch.memories {
60 if !enriched.store {
61 result.skipped += 1;
62 debug!("Skipping memory (store=false): {}", enriched.memory_text);
63 continue;
64 }
65
66 let category = match MemoryCategory::parse(&enriched.category) {
68 Some(c) => c,
69 None => {
70 warn!(
71 "Skipping memory with invalid category '{}': {}",
72 enriched.category,
73 enriched.memory_text.chars().take(50).collect::<String>()
74 );
75 result.skipped += 1;
76 continue;
77 }
78 };
79
80 let memory_lane_type = enriched
82 .memory_lane_type
83 .as_ref()
84 .and_then(|t| MemoryLaneType::parse(t));
85
86 let evidence = json!({
88 "tool_name": event.tool_name,
89 "tool_input": event.tool_input,
90 "tool_response_excerpt": event.tool_response_text.as_ref()
91 .map(|s| s.chars().take(200).collect::<String>()),
92 "assistant_message_excerpt": event.assistant_message_text.as_ref()
93 .map(|s| s.chars().take(200).collect::<String>()),
94 "user_message_excerpt": event.user_message_text.as_ref()
95 .map(|s| s.chars().take(200).collect::<String>()),
96 });
97
98 let mut cognitive = CognitiveMetadata::new(
101 CognitiveLevel::Raw,
102 perspective.observer.clone(),
103 perspective.subject.clone(),
104 perspective.session_key.clone(),
105 "hook_persistence",
106 );
107 cognitive.confidence = Some(enriched.confidence);
108 cognitive.times_reinforced = 0;
109 cognitive.times_contradicted = 0;
110 cognitive.derived_at = Some(Utc::now());
111 cognitive.generated_by = Some("hook_ingest".to_string());
112
113 let metadata = cognitive.merge_into(&json!({
114 "source": {
115 "agent": event.agent,
116 "event_name": event.event_name,
117 "session_id": event.session_id,
118 "derived_session_key": derived_session_key,
119 "turn_id": event.turn_id,
120 "cwd": event.cwd,
121 },
122 "evidence": evidence,
123 "ingestion": {
124 "normalized_at": event.observed_at.to_rfc3339(),
125 "signal_score": null, "pipeline_version": "hook-ingest-v1",
127 },
128 "llm_comment": {
129 "model": model_name,
130 "generated_at": Utc::now().to_rfc3339(),
131 "text": enriched.comment,
132 },
133 "confidence": enriched.confidence,
134 }));
135
136 let params = nexus_storage::repository::StoreMemoryParams {
138 namespace_id,
139 content: &enriched.memory_text,
140 category: &category,
141 memory_lane_type: memory_lane_type.as_ref(),
142 labels: &enriched.labels,
143 metadata: &metadata,
144 embedding: None,
145 embedding_model: None,
146 };
147
148 match memory_repo.store(params).await {
149 Ok(memory) => {
150 result.stored += 1;
151 result.stored_memory_ids.push(memory.id);
152 *result
153 .categories
154 .entry(enriched.category.clone())
155 .or_insert(0) += 1;
156 debug!(
157 "Stored memory id={} category='{}': {}",
158 memory.id,
159 enriched.category,
160 enriched.memory_text.chars().take(50).collect::<String>()
161 );
162
163 let job_payload = serde_json::json!({
165 "memory_id": memory.id,
166 });
167 let perspective_json = serde_json::to_value(&perspective)
168 .map_err(|e| {
169 warn!("Failed to serialize perspective for job enqueue: {}", e);
170 e
171 })
172 .ok();
173 if let Err(e) = memory_repo
174 .enqueue_job(EnqueueJobParams {
175 namespace_id,
176 job_type: "derive_memory",
177 priority: 100,
178 perspective: perspective_json.as_ref(),
179 payload: &job_payload,
180 })
181 .await
182 {
183 warn!(
184 "Failed to enqueue derive job for memory {}: {}",
185 memory.id, e
186 );
187 }
188 }
189 Err(e) => {
190 warn!(
191 "Failed to store memory: {}. Content: {}",
192 e, enriched.memory_text
193 );
194 }
196 }
197 }
198
199 info!(
200 "Persistence complete: {} stored, {} skipped",
201 result.stored, result.skipped
202 );
203
204 Ok(result)
205}
206
207fn derive_session_key(agent: &str, session_key: Option<&str>, cwd: Option<&str>) -> String {
208 if let Some(value) = session_key.filter(|value| !value.trim().is_empty()) {
209 return value.to_string();
210 }
211
212 let fallback_scope = cwd
213 .filter(|value| !value.trim().is_empty())
214 .unwrap_or("unknown-cwd");
215 let mut hasher = std::collections::hash_map::DefaultHasher::new();
216 use std::hash::{Hash, Hasher};
217 agent.hash(&mut hasher);
218 fallback_scope.hash(&mut hasher);
219 format!("derived-{:016x}", hasher.finish())
220}
221
222#[cfg(test)]
223mod tests {
224 use super::*;
225 use crate::enrichment::EnrichedMemory;
226 use nexus_storage::repository::MemoryRepository;
227 use serde_json::json;
228
229 async fn create_test_repo() -> (MemoryRepository, sqlx::SqlitePool) {
230 let pool = sqlx::SqlitePool::connect(":memory:").await.unwrap();
231
232 sqlx::query(
234 r#"
235 CREATE TABLE agent_namespaces (
236 id INTEGER PRIMARY KEY AUTOINCREMENT,
237 name TEXT NOT NULL UNIQUE,
238 description TEXT,
239 agent_type TEXT NOT NULL,
240 created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
241 updated_at DATETIME
242 );
243 "#,
244 )
245 .execute(&pool)
246 .await
247 .unwrap();
248
249 sqlx::query(
250 r#"
251 CREATE TABLE memories (
252 id INTEGER PRIMARY KEY AUTOINCREMENT,
253 namespace_id INTEGER NOT NULL,
254 content TEXT NOT NULL,
255 category TEXT NOT NULL DEFAULT 'general',
256 memory_lane_type TEXT,
257 labels TEXT DEFAULT '[]',
258 metadata TEXT DEFAULT '{}',
259 similarity_score REAL,
260 relevance_score REAL,
261 content_embedding TEXT,
262 embedding_model TEXT,
263 created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
264 updated_at DATETIME,
265 last_accessed DATETIME,
266 is_active BOOLEAN DEFAULT 1,
267 is_archived BOOLEAN DEFAULT 0,
268 access_count INTEGER DEFAULT 0,
269 FOREIGN KEY (namespace_id) REFERENCES agent_namespaces(id)
270 );
271 "#,
272 )
273 .execute(&pool)
274 .await
275 .unwrap();
276
277 let repo = MemoryRepository::new(pool.clone());
278 (repo, pool)
279 }
280
281 async fn create_test_namespace(pool: &sqlx::SqlitePool) -> i64 {
282 let result = sqlx::query(
283 "INSERT INTO agent_namespaces (name, agent_type, created_at) VALUES (?, ?, ?)",
284 )
285 .bind("test-namespace")
286 .bind("test-agent")
287 .bind(Utc::now())
288 .execute(pool)
289 .await
290 .unwrap();
291 result.last_insert_rowid()
292 }
293
294 fn make_test_event() -> NormalizedHookEvent {
295 NormalizedHookEvent {
296 agent: "test-agent".to_string(),
297 event_name: "test_event".to_string(),
298 observed_at: Utc::now(),
299 session_id: Some("session-123".to_string()),
300 turn_id: Some("turn-456".to_string()),
301 cwd: Some("/home/user/project".to_string()),
302 tool_name: Some("test_tool".to_string()),
303 tool_input: Some(json!({"arg": "value"})),
304 tool_response_text: Some("Tool response text".to_string()),
305 assistant_message_text: Some("Assistant message text that might be quite long and need truncation for the evidence excerpt".to_string()),
306 user_message_text: Some("User message text that might also be quite long and need truncation for the evidence excerpt".to_string()),
307 observer: None,
308 subject: None,
309 session_key: None,
310 raw_payload: json!({}),
311 }
312 }
313
314 fn make_test_batch() -> EnrichmentBatchResult {
315 EnrichmentBatchResult {
316 memories: vec![
317 EnrichedMemory {
318 store: true,
319 category: "preferences".to_string(),
320 memory_text: "User prefers Rust for systems programming".to_string(),
321 labels: vec![
322 "rust".to_string(),
323 "systems".to_string(),
324 "preferences".to_string(),
325 ],
326 memory_lane_type: Some("preference".to_string()),
327 comment: "Clear preference statement".to_string(),
328 confidence: 0.9,
329 },
330 EnrichedMemory {
331 store: false, category: "general".to_string(),
333 memory_text: "Noise to skip".to_string(),
334 labels: vec!["noise".to_string()],
335 memory_lane_type: None,
336 comment: "Low signal".to_string(),
337 confidence: 0.3,
338 },
339 ],
340 }
341 }
342
343 #[tokio::test]
344 async fn test_persist_enriched_memories() {
345 let (repo, pool) = create_test_repo().await;
346 let namespace_id = create_test_namespace(&pool).await;
347 let event = make_test_event();
348 let batch = make_test_batch();
349
350 let result = persist_enriched_memories(namespace_id, &event, &batch, &repo, "test-model")
351 .await
352 .unwrap();
353
354 assert_eq!(result.stored, 1);
355 assert_eq!(result.skipped, 1);
356 assert_eq!(result.categories.get("preferences"), Some(&1));
357 }
358
359 #[test]
360 fn test_persist_result_default() {
361 let result = PersistResult::default();
362 assert_eq!(result.stored, 0);
363 assert_eq!(result.skipped, 0);
364 assert!(result.categories.is_empty());
365 }
366}