Skip to main content

nexus_memory_agent/
ingest.rs

1//! Ingest service - extracts structured info from raw text
2
3use std::sync::Arc;
4
5use nexus_core::config::AgentConfig;
6use nexus_core::{
7    infer_perspective, CognitiveLevel, CognitiveMetadata, MemoryCategory, PerspectiveSource,
8};
9use nexus_llm::{ChatMessage, GenerateParams, LlmClient, LlmClientJson};
10use nexus_storage::models::EnqueueJobParams;
11use nexus_storage::repository::{MemoryRepository, StoreMemoryParams};
12use tracing::{debug, error, info};
13
14// Default max tokens for LLM enrichment responses
15const INGEST_MAX_TOKENS: u32 = 8192;
16const DERIVE_MEMORY_JOB: &str = "derive_memory";
17
18use crate::error::AgentError;
19use crate::prompts::{ingest_user_prompt, INGEST_SYSTEM_PROMPT};
20use crate::types::IngestExtraction;
21
22#[derive(Debug, Clone, Default)]
23pub struct IngestContext {
24    pub session_key: Option<String>,
25    pub cwd: Option<String>,
26}
27
28pub struct IngestService {
29    llm: Arc<dyn LlmClient>,
30    config: AgentConfig,
31}
32
33impl IngestService {
34    pub fn new(llm: Arc<dyn LlmClient>, config: AgentConfig) -> Self {
35        Self { llm, config }
36    }
37
38    pub async fn ingest(
39        &self,
40        content: &str,
41        source: &str,
42        namespace_id: i64,
43        repo: &MemoryRepository,
44    ) -> Result<i64, AgentError> {
45        self.ingest_with_context(
46            content,
47            source,
48            namespace_id,
49            repo,
50            IngestContext::default(),
51        )
52        .await
53    }
54
55    pub async fn ingest_with_context(
56        &self,
57        content: &str,
58        source: &str,
59        namespace_id: i64,
60        repo: &MemoryRepository,
61        context: IngestContext,
62    ) -> Result<i64, AgentError> {
63        info!(source = %source, "Ingesting content");
64
65        // Step 1: Extract structured info via LLM
66        let extraction = self.extract(content, source).await?;
67        debug!(summary = %extraction.summary, "Extracted content info");
68
69        // Step 2: Build labels from entities and topics
70        let labels: Vec<String> = extraction
71            .entities
72            .iter()
73            .chain(extraction.topics.iter())
74            .cloned()
75            .collect();
76
77        let perspective = infer_perspective(
78            PerspectiveSource::HookIngest,
79            self.config.namespace.clone(),
80            None,
81            context.session_key.clone(),
82        );
83        let mut cognitive = CognitiveMetadata::new(
84            CognitiveLevel::Raw,
85            perspective.observer.clone(),
86            perspective.subject.clone(),
87            perspective.session_key.clone(),
88            "ingest_service",
89        );
90        cognitive.confidence = Some(extraction.importance_score);
91
92        // Step 3: Build metadata with agent extraction info
93        let metadata = cognitive.merge_into(&serde_json::json!({
94            "agent": {
95                "summary": extraction.summary,
96                "entities": extraction.entities,
97                "topics": extraction.topics,
98                "importance_score": extraction.importance_score,
99                "source": source,
100                "generated_by": "ingest_agent"
101            }
102        }));
103
104        // Step 4: Store memory using repository
105        let _title = format!("Ingested: {}", source);
106        let memory = repo
107            .store(StoreMemoryParams {
108                namespace_id,
109                content,
110                category: &MemoryCategory::General,
111                memory_lane_type: None,
112                labels: &labels,
113                metadata: &metadata,
114                embedding: None,
115                embedding_model: None,
116            })
117            .await
118            .map_err(|e| {
119                error!(error = %e, "Failed to store memory");
120                AgentError::Storage(e.to_string())
121            })?;
122
123        let derive_payload = serde_json::json!({
124            "memory_id": memory.id,
125            "agent_namespace": self.config.namespace,
126            "source": source,
127            "session_key": context.session_key,
128            "cwd": context.cwd,
129        });
130        let derive_perspective = serde_json::to_value(&perspective).ok();
131        repo.enqueue_job(EnqueueJobParams {
132            namespace_id,
133            job_type: DERIVE_MEMORY_JOB,
134            priority: 100,
135            perspective: derive_perspective.as_ref(),
136            payload: &derive_payload,
137        })
138        .await
139        .map_err(|e| {
140            error!(error = %e, memory_id = memory.id, "Failed to enqueue derive job");
141            AgentError::Storage(e.to_string())
142        })?;
143
144        info!(memory_id = memory.id, "Memory stored successfully");
145        Ok(memory.id)
146    }
147
148    async fn extract(&self, content: &str, source: &str) -> Result<IngestExtraction, AgentError> {
149        let params = GenerateParams {
150            messages: vec![
151                ChatMessage::system(INGEST_SYSTEM_PROMPT),
152                ChatMessage::user(ingest_user_prompt(content, source)),
153            ],
154            max_tokens: INGEST_MAX_TOKENS,
155            temperature: 0.3,
156            json_mode: true,
157        };
158
159        let extraction: IngestExtraction = self
160            .llm
161            .generate_json(params)
162            .await
163            .map_err(|e| AgentError::Llm(e.to_string()))?;
164
165        Ok(extraction)
166    }
167}
168
169#[cfg(test)]
170mod tests {
171    use super::*;
172
173    use std::collections::VecDeque;
174    use std::sync::Mutex;
175
176    use async_trait::async_trait;
177    use nexus_llm::GenerateResponse;
178    use nexus_storage::repository::NamespaceRepository;
179    use sqlx::sqlite::SqlitePoolOptions;
180
181    use crate::types::IngestExtraction;
182    use nexus_core::cognitive_level_from_metadata;
183
184    struct MockLlmClient {
185        responses: Mutex<VecDeque<nexus_llm::Result<GenerateResponse>>>,
186    }
187
188    impl MockLlmClient {
189        fn new(responses: Vec<nexus_llm::Result<GenerateResponse>>) -> Self {
190            Self {
191                responses: Mutex::new(VecDeque::from(responses)),
192            }
193        }
194    }
195
196    #[async_trait]
197    impl LlmClient for MockLlmClient {
198        async fn generate(&self, _params: GenerateParams) -> nexus_llm::Result<GenerateResponse> {
199            self.responses
200                .lock()
201                .expect("mock responses poisoned")
202                .pop_front()
203                .expect("mock response missing")
204        }
205
206        fn provider_name(&self) -> String {
207            "mock".to_string()
208        }
209
210        fn model_name(&self) -> String {
211            "mock-model".to_string()
212        }
213    }
214
215    async fn setup_repo() -> (sqlx::SqlitePool, MemoryRepository, i64) {
216        let pool = SqlitePoolOptions::new()
217            .max_connections(1)
218            .connect("sqlite::memory:")
219            .await
220            .unwrap();
221        nexus_storage::migrations::run_migrations(&pool)
222            .await
223            .unwrap();
224        let namespace_repo = NamespaceRepository::new(pool.clone());
225        let namespace = namespace_repo
226            .get_or_create("ingest-test", "ingest-test")
227            .await
228            .unwrap();
229        (pool.clone(), MemoryRepository::new(pool), namespace.id)
230    }
231
232    fn extraction_response() -> GenerateResponse {
233        let extraction = IngestExtraction {
234            summary: "Captured a durable implementation update.".to_string(),
235            entities: vec!["query".to_string()],
236            topics: vec!["pagination".to_string()],
237            importance_score: 0.88,
238        };
239        GenerateResponse {
240            content: serde_json::to_string(&extraction).unwrap(),
241            model: "mock-model".to_string(),
242            usage: None,
243        }
244    }
245
246    #[tokio::test]
247    async fn test_ingest_stores_raw_cognitive_metadata() {
248        let (_pool, repo, namespace_id) = setup_repo().await;
249        let service = IngestService::new(
250            Arc::new(MockLlmClient::new(vec![Ok(extraction_response())])),
251            AgentConfig {
252                enabled: true,
253                namespace: "claude-code".to_string(),
254                ..AgentConfig::default()
255            },
256        );
257
258        let memory_id = service
259            .ingest_with_context(
260                "Implemented working-set retrieval.",
261                "unit-test",
262                namespace_id,
263                &repo,
264                IngestContext::default(),
265            )
266            .await
267            .unwrap();
268
269        let memory = repo.get_by_id(memory_id).await.unwrap().unwrap();
270        assert_eq!(
271            cognitive_level_from_metadata(&memory.metadata),
272            CognitiveLevel::Raw
273        );
274        assert_eq!(
275            memory.metadata["cognitive"]["generated_by"],
276            serde_json::Value::String("ingest_service".to_string())
277        );
278    }
279
280    #[tokio::test]
281    async fn test_ingest_enqueues_derive_job() {
282        let (pool, repo, namespace_id) = setup_repo().await;
283        let service = IngestService::new(
284            Arc::new(MockLlmClient::new(vec![Ok(extraction_response())])),
285            AgentConfig {
286                enabled: true,
287                namespace: "claude-code".to_string(),
288                ..AgentConfig::default()
289            },
290        );
291
292        let memory_id = service
293            .ingest_with_context(
294                "Implemented derive service foundation.",
295                "unit-test",
296                namespace_id,
297                &repo,
298                IngestContext {
299                    session_key: Some("session-ctx".to_string()),
300                    cwd: Some("/tmp/project".to_string()),
301                },
302            )
303            .await
304            .unwrap();
305
306        let job_count: i64 =
307            sqlx::query_scalar("SELECT COUNT(*) FROM memory_jobs WHERE job_type = ?")
308                .bind(DERIVE_MEMORY_JOB)
309                .fetch_one(&pool)
310                .await
311                .unwrap();
312        let job_memory_id: i64 =
313            sqlx::query_scalar("SELECT json_extract(payload_json, '$.memory_id') FROM memory_jobs WHERE job_type = ? LIMIT 1")
314                .bind(DERIVE_MEMORY_JOB)
315                .fetch_one(&pool)
316                .await
317                .unwrap();
318        let job_session_key: String =
319            sqlx::query_scalar("SELECT json_extract(payload_json, '$.session_key') FROM memory_jobs WHERE job_type = ? LIMIT 1")
320                .bind(DERIVE_MEMORY_JOB)
321                .fetch_one(&pool)
322                .await
323                .unwrap();
324        let job_perspective_session_key: String =
325            sqlx::query_scalar("SELECT json_extract(perspective_json, '$.session_key') FROM memory_jobs WHERE job_type = ? LIMIT 1")
326                .bind(DERIVE_MEMORY_JOB)
327                .fetch_one(&pool)
328                .await
329                .unwrap();
330
331        assert_eq!(job_count, 1);
332        assert_eq!(job_memory_id, memory_id);
333        assert_eq!(job_session_key, "session-ctx");
334        assert_eq!(job_perspective_session_key, "session-ctx");
335    }
336}