Skip to main content

nexus_memory_agent/
ingest.rs

1//! Ingest service - extracts structured info from raw text
2
3use std::sync::Arc;
4
5use chrono::Utc;
6use nexus_core::config::AgentConfig;
7use nexus_core::{
8    infer_perspective, CognitiveLevel, CognitiveMetadata, MemoryCategory, PerspectiveSource,
9};
10use nexus_llm::{ChatMessage, GenerateParams, LlmClient, LlmClientJson};
11use nexus_storage::models::EnqueueJobParams;
12use nexus_storage::repository::{MemoryRepository, StoreMemoryParams};
13use tracing::{debug, error, info};
14
15// Default max tokens for LLM enrichment responses
16const INGEST_MAX_TOKENS: u32 = 8192;
17const DERIVE_MEMORY_JOB: &str = "derive_memory";
18
19use crate::error::AgentError;
20use crate::prompts::{ingest_user_prompt, INGEST_SYSTEM_PROMPT};
21use crate::types::IngestExtraction;
22
23#[derive(Debug, Clone, Default)]
24pub struct IngestContext {
25    pub session_key: Option<String>,
26    pub cwd: Option<String>,
27}
28
29pub struct IngestService {
30    llm: Arc<dyn LlmClient>,
31    config: AgentConfig,
32}
33
34impl IngestService {
35    pub fn new(llm: Arc<dyn LlmClient>, config: AgentConfig) -> Self {
36        Self { llm, config }
37    }
38
39    pub async fn ingest(
40        &self,
41        content: &str,
42        source: &str,
43        namespace_id: i64,
44        repo: &MemoryRepository,
45    ) -> Result<i64, AgentError> {
46        self.ingest_with_context(
47            content,
48            source,
49            namespace_id,
50            repo,
51            IngestContext::default(),
52        )
53        .await
54    }
55
56    pub async fn ingest_with_context(
57        &self,
58        content: &str,
59        source: &str,
60        namespace_id: i64,
61        repo: &MemoryRepository,
62        context: IngestContext,
63    ) -> Result<i64, AgentError> {
64        info!(source = %source, "Ingesting content");
65
66        // Step 1: Extract structured info via LLM
67        let extraction = self.extract(content, source).await?;
68        debug!(summary = %extraction.summary, "Extracted content info");
69
70        // Step 2: Build labels from entities and topics
71        let labels: Vec<String> = extraction
72            .entities
73            .iter()
74            .chain(extraction.topics.iter())
75            .cloned()
76            .collect();
77
78        let perspective = infer_perspective(
79            PerspectiveSource::HookIngest,
80            self.config.namespace.clone(),
81            None,
82            context.session_key.clone(),
83        );
84        let mut cognitive = CognitiveMetadata::new(
85            CognitiveLevel::Raw,
86            perspective.observer.clone(),
87            perspective.subject.clone(),
88            perspective.session_key.clone(),
89            "ingest_service",
90        );
91        cognitive.confidence = Some(extraction.importance_score);
92        cognitive.times_reinforced = 0;
93        cognitive.times_contradicted = 0;
94        cognitive.derived_at = Some(Utc::now());
95        cognitive.generated_by = Some("ingest_service".to_string());
96
97        // Step 3: Build metadata with agent extraction info
98        let metadata = cognitive.merge_into(&serde_json::json!({
99            "agent": {
100                "summary": extraction.summary,
101                "entities": extraction.entities,
102                "topics": extraction.topics,
103                "importance_score": extraction.importance_score,
104                "source": source,
105                "generated_by": "ingest_agent"
106            }
107        }));
108
109        // Step 4: Store memory using repository
110        let _title = format!("Ingested: {}", source);
111        let memory = repo
112            .store(StoreMemoryParams {
113                namespace_id,
114                content,
115                category: &MemoryCategory::General,
116                memory_lane_type: None,
117                labels: &labels,
118                metadata: &metadata,
119                embedding: None,
120                embedding_model: None,
121            })
122            .await
123            .map_err(|e| {
124                error!(error = %e, "Failed to store memory");
125                AgentError::Storage(e.to_string())
126            })?;
127
128        let derive_payload = serde_json::json!({
129            "memory_id": memory.id,
130            "agent_namespace": self.config.namespace,
131            "source": source,
132            "session_key": context.session_key,
133            "cwd": context.cwd,
134        });
135        let derive_perspective = serde_json::to_value(&perspective).ok();
136        repo.enqueue_job(EnqueueJobParams {
137            namespace_id,
138            job_type: DERIVE_MEMORY_JOB,
139            priority: 100,
140            perspective: derive_perspective.as_ref(),
141            payload: &derive_payload,
142        })
143        .await
144        .map_err(|e| {
145            error!(error = %e, memory_id = memory.id, "Failed to enqueue derive job");
146            AgentError::Storage(e.to_string())
147        })?;
148
149        info!(memory_id = memory.id, "Memory stored successfully");
150        Ok(memory.id)
151    }
152
153    async fn extract(&self, content: &str, source: &str) -> Result<IngestExtraction, AgentError> {
154        let params = GenerateParams {
155            messages: vec![
156                ChatMessage::system(INGEST_SYSTEM_PROMPT),
157                ChatMessage::user(ingest_user_prompt(content, source)),
158            ],
159            max_tokens: INGEST_MAX_TOKENS,
160            temperature: 0.3,
161            json_mode: true,
162        };
163
164        let extraction: IngestExtraction = self
165            .llm
166            .generate_json(params)
167            .await
168            .map_err(|e| AgentError::Llm(e.to_string()))?;
169
170        Ok(extraction)
171    }
172}
173
174#[cfg(test)]
175mod tests {
176    use super::*;
177
178    use std::collections::VecDeque;
179    use std::sync::Mutex;
180
181    use async_trait::async_trait;
182    use nexus_llm::GenerateResponse;
183    use nexus_storage::repository::NamespaceRepository;
184    use sqlx::sqlite::SqlitePoolOptions;
185
186    use crate::types::IngestExtraction;
187    use nexus_core::cognitive_level_from_metadata;
188
189    struct MockLlmClient {
190        responses: Mutex<VecDeque<nexus_llm::Result<GenerateResponse>>>,
191    }
192
193    impl MockLlmClient {
194        fn new(responses: Vec<nexus_llm::Result<GenerateResponse>>) -> Self {
195            Self {
196                responses: Mutex::new(VecDeque::from(responses)),
197            }
198        }
199    }
200
201    #[async_trait]
202    impl LlmClient for MockLlmClient {
203        async fn generate(&self, _params: GenerateParams) -> nexus_llm::Result<GenerateResponse> {
204            self.responses
205                .lock()
206                .expect("mock responses poisoned")
207                .pop_front()
208                .expect("mock response missing")
209        }
210
211        fn provider_name(&self) -> String {
212            "mock".to_string()
213        }
214
215        fn model_name(&self) -> String {
216            "mock-model".to_string()
217        }
218    }
219
220    async fn setup_repo() -> (sqlx::SqlitePool, MemoryRepository, i64) {
221        let pool = SqlitePoolOptions::new()
222            .max_connections(1)
223            .connect("sqlite::memory:")
224            .await
225            .unwrap();
226        nexus_storage::migrations::run_migrations(&pool)
227            .await
228            .unwrap();
229        let namespace_repo = NamespaceRepository::new(pool.clone());
230        let namespace = namespace_repo
231            .get_or_create("ingest-test", "ingest-test")
232            .await
233            .unwrap();
234        (pool.clone(), MemoryRepository::new(pool), namespace.id)
235    }
236
237    fn extraction_response() -> GenerateResponse {
238        let extraction = IngestExtraction {
239            summary: "Captured a durable implementation update.".to_string(),
240            entities: vec!["query".to_string()],
241            topics: vec!["pagination".to_string()],
242            importance_score: 0.88,
243        };
244        GenerateResponse {
245            content: serde_json::to_string(&extraction).unwrap(),
246            model: "mock-model".to_string(),
247            usage: None,
248        }
249    }
250
251    #[tokio::test]
252    async fn test_ingest_stores_raw_cognitive_metadata() {
253        let (_pool, repo, namespace_id) = setup_repo().await;
254        let service = IngestService::new(
255            Arc::new(MockLlmClient::new(vec![Ok(extraction_response())])),
256            AgentConfig {
257                enabled: true,
258                namespace: "claude-code".to_string(),
259                ..AgentConfig::default()
260            },
261        );
262
263        let memory_id = service
264            .ingest_with_context(
265                "Implemented working-set retrieval.",
266                "unit-test",
267                namespace_id,
268                &repo,
269                IngestContext::default(),
270            )
271            .await
272            .unwrap();
273
274        let memory = repo.get_by_id(memory_id).await.unwrap().unwrap();
275        assert_eq!(
276            cognitive_level_from_metadata(&memory.metadata),
277            CognitiveLevel::Raw
278        );
279        assert_eq!(
280            memory.metadata["cognitive"]["generated_by"],
281            serde_json::Value::String("ingest_service".to_string())
282        );
283    }
284
285    #[tokio::test]
286    async fn test_ingest_enqueues_derive_job() {
287        let (pool, repo, namespace_id) = setup_repo().await;
288        let service = IngestService::new(
289            Arc::new(MockLlmClient::new(vec![Ok(extraction_response())])),
290            AgentConfig {
291                enabled: true,
292                namespace: "claude-code".to_string(),
293                ..AgentConfig::default()
294            },
295        );
296
297        let memory_id = service
298            .ingest_with_context(
299                "Implemented derive service foundation.",
300                "unit-test",
301                namespace_id,
302                &repo,
303                IngestContext {
304                    session_key: Some("session-ctx".to_string()),
305                    cwd: Some("/tmp/project".to_string()),
306                },
307            )
308            .await
309            .unwrap();
310
311        let job_count: i64 =
312            sqlx::query_scalar("SELECT COUNT(*) FROM memory_jobs WHERE job_type = ?")
313                .bind(DERIVE_MEMORY_JOB)
314                .fetch_one(&pool)
315                .await
316                .unwrap();
317        let job_memory_id: i64 =
318            sqlx::query_scalar("SELECT json_extract(payload_json, '$.memory_id') FROM memory_jobs WHERE job_type = ? LIMIT 1")
319                .bind(DERIVE_MEMORY_JOB)
320                .fetch_one(&pool)
321                .await
322                .unwrap();
323        let job_session_key: String =
324            sqlx::query_scalar("SELECT json_extract(payload_json, '$.session_key') FROM memory_jobs WHERE job_type = ? LIMIT 1")
325                .bind(DERIVE_MEMORY_JOB)
326                .fetch_one(&pool)
327                .await
328                .unwrap();
329        let job_perspective_session_key: String =
330            sqlx::query_scalar("SELECT json_extract(perspective_json, '$.session_key') FROM memory_jobs WHERE job_type = ? LIMIT 1")
331                .bind(DERIVE_MEMORY_JOB)
332                .fetch_one(&pool)
333                .await
334                .unwrap();
335
336        assert_eq!(job_count, 1);
337        assert_eq!(job_memory_id, memory_id);
338        assert_eq!(job_session_key, "session-ctx");
339        assert_eq!(job_perspective_session_key, "session-ctx");
340    }
341}