Skip to main content

nexus_memory_hooks/
persistence.rs

1//! Persistence adapter for enriched hook memories
2//!
3//! Builds rich metadata and persists memories via the storage layer.
4
5use chrono::Utc;
6use nexus_core::{
7    infer_perspective, CognitiveLevel, CognitiveMetadata, MemoryCategory, MemoryLaneType,
8    PerspectiveSource,
9};
10use nexus_storage::models::EnqueueJobParams;
11use nexus_storage::repository::MemoryRepository;
12use serde_json::json;
13use std::collections::HashMap;
14use tracing::{debug, info, warn};
15
16use crate::claude_payload::NormalizedHookEvent;
17use crate::enrichment::EnrichmentBatchResult;
18
19/// Result of persisting a batch of enriched memories.
20#[derive(Debug, Clone, Default)]
21pub struct PersistResult {
22    /// Number of memories successfully stored
23    pub stored: usize,
24    /// Number of memories skipped (store=false)
25    pub skipped: usize,
26    /// Breakdown by category
27    pub categories: HashMap<String, usize>,
28    /// Stored memory IDs, in insertion order
29    pub stored_memory_ids: Vec<i64>,
30}
31
32/// Persist enriched memories to the database with rich metadata.
33///
34/// For each `EnrichedMemory` where `store == true`:
35/// 1. Parses the category string into a `MemoryCategory`
36/// 2. Parses memory_lane_type (optional) into `MemoryLaneType`
37/// 3. Builds rich metadata including source, evidence, ingestion, and LLM comment
38/// 4. Stores via `MemoryRepository`
39pub async fn persist_enriched_memories(
40    namespace_id: i64,
41    event: &NormalizedHookEvent,
42    batch: &EnrichmentBatchResult,
43    memory_repo: &MemoryRepository,
44    model_name: &str,
45) -> anyhow::Result<PersistResult> {
46    let mut result = PersistResult::default();
47    let derived_session_key = derive_session_key(
48        &event.agent,
49        event.session_id.as_deref(),
50        event.cwd.as_deref(),
51    );
52    let perspective = infer_perspective(
53        PerspectiveSource::HookIngest,
54        event.agent.clone(),
55        None::<String>,
56        Some(derived_session_key.clone()),
57    );
58
59    for enriched in &batch.memories {
60        if !enriched.store {
61            result.skipped += 1;
62            debug!("Skipping memory (store=false): {}", enriched.memory_text);
63            continue;
64        }
65
66        // Parse category — skip invalid categories rather than aborting the batch
67        let category = match MemoryCategory::parse(&enriched.category) {
68            Some(c) => c,
69            None => {
70                warn!(
71                    "Skipping memory with invalid category '{}': {}",
72                    enriched.category,
73                    enriched.memory_text.chars().take(50).collect::<String>()
74                );
75                result.skipped += 1;
76                continue;
77            }
78        };
79
80        // Parse memory lane type (optional)
81        let memory_lane_type = enriched
82            .memory_lane_type
83            .as_ref()
84            .and_then(|t| MemoryLaneType::parse(t));
85
86        // Build evidence from event fields (excerpts for large text)
87        let evidence = json!({
88            "tool_name": event.tool_name,
89            "tool_input": event.tool_input,
90            "tool_response_excerpt": event.tool_response_text.as_ref()
91                .map(|s| s.chars().take(200).collect::<String>()),
92            "assistant_message_excerpt": event.assistant_message_text.as_ref()
93                .map(|s| s.chars().take(200).collect::<String>()),
94            "user_message_excerpt": event.user_message_text.as_ref()
95                .map(|s| s.chars().take(200).collect::<String>()),
96        });
97
98        // Build rich metadata with cognitive envelope
99        // Hook-ingested memories are RAW - they need to be derived into explicit observations
100        let mut cognitive = CognitiveMetadata::new(
101            CognitiveLevel::Raw,
102            perspective.observer.clone(),
103            perspective.subject.clone(),
104            perspective.session_key.clone(),
105            "hook_persistence",
106        );
107        cognitive.confidence = Some(enriched.confidence);
108        cognitive.times_reinforced = 0;
109        cognitive.times_contradicted = 0;
110        cognitive.derived_at = Some(Utc::now());
111        cognitive.generated_by = Some("hook_ingest".to_string());
112
113        let metadata = cognitive.merge_into(&json!({
114            "source": {
115                "agent": event.agent,
116                "event_name": event.event_name,
117                "session_id": event.session_id,
118                "derived_session_key": derived_session_key,
119                "turn_id": event.turn_id,
120                "cwd": event.cwd,
121            },
122            "evidence": evidence,
123            "ingestion": {
124                "normalized_at": event.observed_at.to_rfc3339(),
125                "signal_score": null, // Would come from original candidate if we had it
126                "pipeline_version": "hook-ingest-v1",
127            },
128            "llm_comment": {
129                "model": model_name,
130                "generated_at": Utc::now().to_rfc3339(),
131                "text": enriched.comment,
132            },
133            "confidence": enriched.confidence,
134        }));
135
136        // Store the memory
137        let params = nexus_storage::repository::StoreMemoryParams {
138            namespace_id,
139            content: &enriched.memory_text,
140            category: &category,
141            memory_lane_type: memory_lane_type.as_ref(),
142            labels: &enriched.labels,
143            metadata: &metadata,
144            embedding: None,
145            embedding_model: None,
146        };
147
148        match memory_repo.store(params).await {
149            Ok(memory) => {
150                result.stored += 1;
151                result.stored_memory_ids.push(memory.id);
152                *result
153                    .categories
154                    .entry(enriched.category.clone())
155                    .or_insert(0) += 1;
156                debug!(
157                    "Stored memory id={} category='{}': {}",
158                    memory.id,
159                    enriched.category,
160                    enriched.memory_text.chars().take(50).collect::<String>()
161                );
162
163                // Enqueue a DeriveMemory job to process this raw memory
164                let job_payload = serde_json::json!({
165                    "memory_id": memory.id,
166                });
167                let perspective_json = serde_json::to_value(&perspective)
168                    .map_err(|e| {
169                        warn!("Failed to serialize perspective for job enqueue: {}", e);
170                        e
171                    })
172                    .ok();
173                if let Err(e) = memory_repo
174                    .enqueue_job(EnqueueJobParams {
175                        namespace_id,
176                        job_type: "derive_memory",
177                        priority: 100,
178                        perspective: perspective_json.as_ref(),
179                        payload: &job_payload,
180                    })
181                    .await
182                {
183                    warn!(
184                        "Failed to enqueue derive job for memory {}: {}",
185                        memory.id, e
186                    );
187                }
188            }
189            Err(e) => {
190                warn!(
191                    "Failed to store memory: {}. Content: {}",
192                    e, enriched.memory_text
193                );
194                // Continue with other memories
195            }
196        }
197    }
198
199    info!(
200        "Persistence complete: {} stored, {} skipped",
201        result.stored, result.skipped
202    );
203
204    Ok(result)
205}
206
207fn derive_session_key(agent: &str, session_key: Option<&str>, cwd: Option<&str>) -> String {
208    if let Some(value) = session_key.filter(|value| !value.trim().is_empty()) {
209        return value.to_string();
210    }
211
212    let fallback_scope = cwd
213        .filter(|value| !value.trim().is_empty())
214        .unwrap_or("unknown-cwd");
215    let mut hasher = std::collections::hash_map::DefaultHasher::new();
216    use std::hash::{Hash, Hasher};
217    agent.hash(&mut hasher);
218    fallback_scope.hash(&mut hasher);
219    format!("derived-{:016x}", hasher.finish())
220}
221
222#[cfg(test)]
223mod tests {
224    use super::*;
225    use crate::enrichment::EnrichedMemory;
226    use nexus_storage::repository::MemoryRepository;
227    use serde_json::json;
228
229    async fn create_test_repo() -> (MemoryRepository, sqlx::SqlitePool) {
230        let pool = sqlx::SqlitePool::connect(":memory:").await.unwrap();
231
232        // Run migrations matching the real schema
233        sqlx::query(
234            r#"
235            CREATE TABLE agent_namespaces (
236                id INTEGER PRIMARY KEY AUTOINCREMENT,
237                name TEXT NOT NULL UNIQUE,
238                description TEXT,
239                agent_type TEXT NOT NULL,
240                created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
241                updated_at DATETIME
242            );
243        "#,
244        )
245        .execute(&pool)
246        .await
247        .unwrap();
248
249        sqlx::query(
250            r#"
251            CREATE TABLE memories (
252                id INTEGER PRIMARY KEY AUTOINCREMENT,
253                namespace_id INTEGER NOT NULL,
254                content TEXT NOT NULL,
255                category TEXT NOT NULL DEFAULT 'general',
256                memory_lane_type TEXT,
257                labels TEXT DEFAULT '[]',
258                metadata TEXT DEFAULT '{}',
259                similarity_score REAL,
260                relevance_score REAL,
261                content_embedding TEXT,
262                embedding_model TEXT,
263                created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
264                updated_at DATETIME,
265                last_accessed DATETIME,
266                is_active BOOLEAN DEFAULT 1,
267                is_archived BOOLEAN DEFAULT 0,
268                access_count INTEGER DEFAULT 0,
269                FOREIGN KEY (namespace_id) REFERENCES agent_namespaces(id)
270            );
271        "#,
272        )
273        .execute(&pool)
274        .await
275        .unwrap();
276
277        let repo = MemoryRepository::new(pool.clone());
278        (repo, pool)
279    }
280
281    async fn create_test_namespace(pool: &sqlx::SqlitePool) -> i64 {
282        let result = sqlx::query(
283            "INSERT INTO agent_namespaces (name, agent_type, created_at) VALUES (?, ?, ?)",
284        )
285        .bind("test-namespace")
286        .bind("test-agent")
287        .bind(Utc::now())
288        .execute(pool)
289        .await
290        .unwrap();
291        result.last_insert_rowid()
292    }
293
294    fn make_test_event() -> NormalizedHookEvent {
295        NormalizedHookEvent {
296            agent: "test-agent".to_string(),
297            event_name: "test_event".to_string(),
298            observed_at: Utc::now(),
299            session_id: Some("session-123".to_string()),
300            turn_id: Some("turn-456".to_string()),
301            cwd: Some("/home/user/project".to_string()),
302            tool_name: Some("test_tool".to_string()),
303            tool_input: Some(json!({"arg": "value"})),
304            tool_response_text: Some("Tool response text".to_string()),
305            assistant_message_text: Some("Assistant message text that might be quite long and need truncation for the evidence excerpt".to_string()),
306            user_message_text: Some("User message text that might also be quite long and need truncation for the evidence excerpt".to_string()),
307            observer: None,
308            subject: None,
309            session_key: None,
310            raw_payload: json!({}),
311        }
312    }
313
314    fn make_test_batch() -> EnrichmentBatchResult {
315        EnrichmentBatchResult {
316            memories: vec![
317                EnrichedMemory {
318                    store: true,
319                    category: "preferences".to_string(),
320                    memory_text: "User prefers Rust for systems programming".to_string(),
321                    labels: vec![
322                        "rust".to_string(),
323                        "systems".to_string(),
324                        "preferences".to_string(),
325                    ],
326                    memory_lane_type: Some("preference".to_string()),
327                    comment: "Clear preference statement".to_string(),
328                    confidence: 0.9,
329                },
330                EnrichedMemory {
331                    store: false, // Should be skipped
332                    category: "general".to_string(),
333                    memory_text: "Noise to skip".to_string(),
334                    labels: vec!["noise".to_string()],
335                    memory_lane_type: None,
336                    comment: "Low signal".to_string(),
337                    confidence: 0.3,
338                },
339            ],
340        }
341    }
342
343    #[tokio::test]
344    async fn test_persist_enriched_memories() {
345        let (repo, pool) = create_test_repo().await;
346        let namespace_id = create_test_namespace(&pool).await;
347        let event = make_test_event();
348        let batch = make_test_batch();
349
350        let result = persist_enriched_memories(namespace_id, &event, &batch, &repo, "test-model")
351            .await
352            .unwrap();
353
354        assert_eq!(result.stored, 1);
355        assert_eq!(result.skipped, 1);
356        assert_eq!(result.categories.get("preferences"), Some(&1));
357    }
358
359    #[test]
360    fn test_persist_result_default() {
361        let result = PersistResult::default();
362        assert_eq!(result.stored, 0);
363        assert_eq!(result.skipped, 0);
364        assert!(result.categories.is_empty());
365    }
366}