Skip to main content

nexus_memory_agent/
ingest.rs

1//! Ingest service - extracts structured info from raw text
2
3use std::sync::Arc;
4
5use nexus_core::config::AgentConfig;
6use nexus_core::{
7    infer_perspective, CognitiveLevel, CognitiveMetadata, MemoryCategory, PerspectiveSource,
8};
9use nexus_llm::{ChatMessage, GenerateParams, LlmClient, LlmClientJson};
10use nexus_storage::models::EnqueueJobParams;
11use nexus_storage::repository::{MemoryRepository, StoreMemoryParams};
12use tracing::{debug, error, info};
13
14// Default max tokens for LLM enrichment responses
15const INGEST_MAX_TOKENS: u32 = 8192;
16const DERIVE_MEMORY_JOB: &str = "derive_memory";
17
18use crate::error::AgentError;
19use crate::prompts::{ingest_user_prompt, INGEST_SYSTEM_PROMPT};
20use crate::types::IngestExtraction;
21
22#[derive(Debug, Clone, Default)]
23pub struct IngestContext {
24    pub session_key: Option<String>,
25    pub cwd: Option<String>,
26}
27
28pub struct IngestService {
29    llm: Arc<dyn LlmClient>,
30    config: AgentConfig,
31}
32
33impl IngestService {
34    pub fn new(llm: Arc<dyn LlmClient>, config: AgentConfig) -> Self {
35        Self { llm, config }
36    }
37
38    pub async fn ingest(
39        &self,
40        content: &str,
41        source: &str,
42        namespace_id: i64,
43        repo: &MemoryRepository,
44    ) -> Result<i64, AgentError> {
45        self.ingest_with_context(
46            content,
47            source,
48            namespace_id,
49            repo,
50            IngestContext::default(),
51        )
52        .await
53    }
54
55    pub async fn ingest_with_context(
56        &self,
57        content: &str,
58        source: &str,
59        namespace_id: i64,
60        repo: &MemoryRepository,
61        context: IngestContext,
62    ) -> Result<i64, AgentError> {
63        info!(source = %source, "Ingesting content");
64
65        // Step 1: Extract structured info via LLM
66        let extraction = self.extract(content, source).await?;
67        debug!(summary = %extraction.summary, "Extracted content info");
68
69        // Step 2: Build labels from entities and topics
70        let labels: Vec<String> = extraction
71            .entities
72            .iter()
73            .chain(extraction.topics.iter())
74            .cloned()
75            .collect();
76
77        let perspective = infer_perspective(
78            PerspectiveSource::HookIngest,
79            self.config.namespace.clone(),
80            None,
81            context.session_key.clone(),
82        );
83        let mut cognitive = CognitiveMetadata::new(
84            CognitiveLevel::Raw,
85            perspective.observer.clone(),
86            perspective.subject.clone(),
87            perspective.session_key.clone(),
88            "ingest_service",
89        );
90        cognitive.confidence = Some(extraction.importance_score);
91        cognitive.generated_by = Some("ingest_service".to_string());
92
93        // Step 3: Build metadata with agent extraction info
94        let metadata = cognitive.merge_into(&serde_json::json!({
95            "agent": {
96                "summary": extraction.summary,
97                "entities": extraction.entities,
98                "topics": extraction.topics,
99                "importance_score": extraction.importance_score,
100                "source": source,
101                "generated_by": "ingest_agent"
102            }
103        }));
104
105        // Step 4: Store memory using repository
106        let _title = format!("Ingested: {}", source);
107        let memory = repo
108            .store(StoreMemoryParams {
109                namespace_id,
110                content,
111                category: &MemoryCategory::General,
112                memory_lane_type: None,
113                labels: &labels,
114                metadata: &metadata,
115                embedding: None,
116                embedding_model: None,
117            })
118            .await
119            .map_err(|e| {
120                error!(error = %e, "Failed to store memory");
121                AgentError::Storage(e.to_string())
122            })?;
123
124        let derive_payload = serde_json::json!({
125            "memory_id": memory.id,
126            "agent_namespace": self.config.namespace,
127            "source": source,
128            "session_key": context.session_key,
129            "cwd": context.cwd,
130        });
131        let derive_perspective = serde_json::to_value(&perspective).ok();
132        repo.enqueue_job(EnqueueJobParams {
133            namespace_id,
134            job_type: DERIVE_MEMORY_JOB,
135            priority: 100,
136            perspective: derive_perspective.as_ref(),
137            payload: &derive_payload,
138        })
139        .await
140        .map_err(|e| {
141            error!(error = %e, memory_id = memory.id, "Failed to enqueue derive job");
142            AgentError::Storage(e.to_string())
143        })?;
144
145        info!(memory_id = memory.id, "Memory stored successfully");
146        Ok(memory.id)
147    }
148
149    async fn extract(&self, content: &str, source: &str) -> Result<IngestExtraction, AgentError> {
150        let params = GenerateParams {
151            messages: vec![
152                ChatMessage::system(INGEST_SYSTEM_PROMPT),
153                ChatMessage::user(ingest_user_prompt(content, source)),
154            ],
155            max_tokens: INGEST_MAX_TOKENS,
156            temperature: 0.3,
157            json_mode: true,
158        };
159
160        let extraction: IngestExtraction = self
161            .llm
162            .generate_json(params)
163            .await
164            .map_err(|e| AgentError::Llm(e.to_string()))?;
165
166        Ok(extraction)
167    }
168}
169
170#[cfg(test)]
171mod tests {
172    use super::*;
173
174    use std::collections::VecDeque;
175    use std::sync::Mutex;
176
177    use async_trait::async_trait;
178    use nexus_llm::GenerateResponse;
179    use nexus_storage::repository::NamespaceRepository;
180    use sqlx::sqlite::SqlitePoolOptions;
181
182    use crate::types::IngestExtraction;
183    use nexus_core::cognitive_level_from_metadata;
184
185    struct MockLlmClient {
186        responses: Mutex<VecDeque<nexus_llm::Result<GenerateResponse>>>,
187    }
188
189    impl MockLlmClient {
190        fn new(responses: Vec<nexus_llm::Result<GenerateResponse>>) -> Self {
191            Self {
192                responses: Mutex::new(VecDeque::from(responses)),
193            }
194        }
195    }
196
197    #[async_trait]
198    impl LlmClient for MockLlmClient {
199        async fn generate(&self, _params: GenerateParams) -> nexus_llm::Result<GenerateResponse> {
200            self.responses
201                .lock()
202                .expect("mock responses poisoned")
203                .pop_front()
204                .expect("mock response missing")
205        }
206
207        fn provider_name(&self) -> String {
208            "mock".to_string()
209        }
210
211        fn model_name(&self) -> String {
212            "mock-model".to_string()
213        }
214    }
215
216    async fn setup_repo() -> (sqlx::SqlitePool, MemoryRepository, i64) {
217        let pool = SqlitePoolOptions::new()
218            .max_connections(1)
219            .connect("sqlite::memory:")
220            .await
221            .unwrap();
222        nexus_storage::migrations::run_migrations(&pool)
223            .await
224            .unwrap();
225        let namespace_repo = NamespaceRepository::new(pool.clone());
226        let namespace = namespace_repo
227            .get_or_create("ingest-test", "ingest-test")
228            .await
229            .unwrap();
230        (pool.clone(), MemoryRepository::new(pool), namespace.id)
231    }
232
233    fn extraction_response() -> GenerateResponse {
234        let extraction = IngestExtraction {
235            summary: "Captured a durable implementation update.".to_string(),
236            entities: vec!["query".to_string()],
237            topics: vec!["pagination".to_string()],
238            importance_score: 0.88,
239        };
240        GenerateResponse {
241            content: serde_json::to_string(&extraction).unwrap(),
242            model: "mock-model".to_string(),
243            usage: None,
244        }
245    }
246
247    #[tokio::test]
248    async fn test_ingest_stores_raw_cognitive_metadata() {
249        let (_pool, repo, namespace_id) = setup_repo().await;
250        let service = IngestService::new(
251            Arc::new(MockLlmClient::new(vec![Ok(extraction_response())])),
252            AgentConfig {
253                enabled: true,
254                namespace: "claude-code".to_string(),
255                ..AgentConfig::default()
256            },
257        );
258
259        let memory_id = service
260            .ingest_with_context(
261                "Implemented working-set retrieval.",
262                "unit-test",
263                namespace_id,
264                &repo,
265                IngestContext::default(),
266            )
267            .await
268            .unwrap();
269
270        let memory = repo.get_by_id(memory_id).await.unwrap().unwrap();
271        assert_eq!(
272            cognitive_level_from_metadata(&memory.metadata),
273            CognitiveLevel::Raw
274        );
275        assert_eq!(
276            memory.metadata["cognitive"]["generated_by"],
277            serde_json::Value::String("ingest_service".to_string())
278        );
279    }
280
281    #[tokio::test]
282    async fn test_ingest_enqueues_derive_job() {
283        let (pool, repo, namespace_id) = setup_repo().await;
284        let service = IngestService::new(
285            Arc::new(MockLlmClient::new(vec![Ok(extraction_response())])),
286            AgentConfig {
287                enabled: true,
288                namespace: "claude-code".to_string(),
289                ..AgentConfig::default()
290            },
291        );
292
293        let memory_id = service
294            .ingest_with_context(
295                "Implemented derive service foundation.",
296                "unit-test",
297                namespace_id,
298                &repo,
299                IngestContext {
300                    session_key: Some("session-ctx".to_string()),
301                    cwd: Some("/tmp/project".to_string()),
302                },
303            )
304            .await
305            .unwrap();
306
307        let job_count: i64 =
308            sqlx::query_scalar("SELECT COUNT(*) FROM memory_jobs WHERE job_type = ?")
309                .bind(DERIVE_MEMORY_JOB)
310                .fetch_one(&pool)
311                .await
312                .unwrap();
313        let job_memory_id: i64 =
314            sqlx::query_scalar("SELECT json_extract(payload_json, '$.memory_id') FROM memory_jobs WHERE job_type = ? LIMIT 1")
315                .bind(DERIVE_MEMORY_JOB)
316                .fetch_one(&pool)
317                .await
318                .unwrap();
319        let job_session_key: String =
320            sqlx::query_scalar("SELECT json_extract(payload_json, '$.session_key') FROM memory_jobs WHERE job_type = ? LIMIT 1")
321                .bind(DERIVE_MEMORY_JOB)
322                .fetch_one(&pool)
323                .await
324                .unwrap();
325        let job_perspective_session_key: String =
326            sqlx::query_scalar("SELECT json_extract(perspective_json, '$.session_key') FROM memory_jobs WHERE job_type = ? LIMIT 1")
327                .bind(DERIVE_MEMORY_JOB)
328                .fetch_one(&pool)
329                .await
330                .unwrap();
331
332        assert_eq!(job_count, 1);
333        assert_eq!(job_memory_id, memory_id);
334        assert_eq!(job_session_key, "session-ctx");
335        assert_eq!(job_perspective_session_key, "session-ctx");
336    }
337}