1use std::sync::Arc;
4
5use chrono::Utc;
6use nexus_core::config::AgentConfig;
7use nexus_core::{
8 infer_perspective, CognitiveLevel, CognitiveMetadata, MemoryCategory, PerspectiveSource,
9};
10use nexus_llm::{ChatMessage, GenerateParams, LlmClient, LlmClientJson};
11use nexus_storage::models::EnqueueJobParams;
12use nexus_storage::repository::{MemoryRepository, StoreMemoryParams};
13use tracing::{debug, error, info};
14
15const INGEST_MAX_TOKENS: u32 = 8192;
17const DERIVE_MEMORY_JOB: &str = "derive_memory";
18
19use crate::error::AgentError;
20use crate::prompts::{ingest_user_prompt, INGEST_SYSTEM_PROMPT};
21use crate::types::IngestExtraction;
22
23#[derive(Debug, Clone, Default)]
24pub struct IngestContext {
25 pub session_key: Option<String>,
26 pub cwd: Option<String>,
27}
28
29pub struct IngestService {
30 llm: Arc<dyn LlmClient>,
31 config: AgentConfig,
32}
33
34impl IngestService {
35 pub fn new(llm: Arc<dyn LlmClient>, config: AgentConfig) -> Self {
36 Self { llm, config }
37 }
38
39 pub async fn ingest(
40 &self,
41 content: &str,
42 source: &str,
43 namespace_id: i64,
44 repo: &MemoryRepository,
45 ) -> Result<i64, AgentError> {
46 self.ingest_with_context(
47 content,
48 source,
49 namespace_id,
50 repo,
51 IngestContext::default(),
52 )
53 .await
54 }
55
56 pub async fn ingest_with_context(
57 &self,
58 content: &str,
59 source: &str,
60 namespace_id: i64,
61 repo: &MemoryRepository,
62 context: IngestContext,
63 ) -> Result<i64, AgentError> {
64 info!(source = %source, "Ingesting content");
65
66 let extraction = self.extract(content, source).await?;
68 debug!(summary = %extraction.summary, "Extracted content info");
69
70 let labels: Vec<String> = extraction
72 .entities
73 .iter()
74 .chain(extraction.topics.iter())
75 .cloned()
76 .collect();
77
78 let perspective = infer_perspective(
79 PerspectiveSource::HookIngest,
80 self.config.namespace.clone(),
81 None,
82 context.session_key.clone(),
83 );
84 let mut cognitive = CognitiveMetadata::new(
85 CognitiveLevel::Raw,
86 perspective.observer.clone(),
87 perspective.subject.clone(),
88 perspective.session_key.clone(),
89 "ingest_service",
90 );
91 cognitive.confidence = Some(extraction.importance_score);
92 cognitive.times_reinforced = 0;
93 cognitive.times_contradicted = 0;
94 cognitive.derived_at = Some(Utc::now());
95 cognitive.generated_by = Some("ingest_service".to_string());
96
97 let metadata = cognitive.merge_into(&serde_json::json!({
99 "agent": {
100 "summary": extraction.summary,
101 "entities": extraction.entities,
102 "topics": extraction.topics,
103 "importance_score": extraction.importance_score,
104 "source": source,
105 "generated_by": "ingest_agent"
106 }
107 }));
108
109 let _title = format!("Ingested: {}", source);
111 let memory = repo
112 .store(StoreMemoryParams {
113 namespace_id,
114 content,
115 category: &MemoryCategory::General,
116 memory_lane_type: None,
117 labels: &labels,
118 metadata: &metadata,
119 embedding: None,
120 embedding_model: None,
121 })
122 .await
123 .map_err(|e| {
124 error!(error = %e, "Failed to store memory");
125 AgentError::Storage(e.to_string())
126 })?;
127
128 let derive_payload = serde_json::json!({
129 "memory_id": memory.id,
130 "agent_namespace": self.config.namespace,
131 "source": source,
132 "session_key": context.session_key,
133 "cwd": context.cwd,
134 });
135 let derive_perspective = serde_json::to_value(&perspective).ok();
136 repo.enqueue_job(EnqueueJobParams {
137 namespace_id,
138 job_type: DERIVE_MEMORY_JOB,
139 priority: 100,
140 perspective: derive_perspective.as_ref(),
141 payload: &derive_payload,
142 })
143 .await
144 .map_err(|e| {
145 error!(error = %e, memory_id = memory.id, "Failed to enqueue derive job");
146 AgentError::Storage(e.to_string())
147 })?;
148
149 info!(memory_id = memory.id, "Memory stored successfully");
150 Ok(memory.id)
151 }
152
153 async fn extract(&self, content: &str, source: &str) -> Result<IngestExtraction, AgentError> {
154 let params = GenerateParams {
155 messages: vec![
156 ChatMessage::system(INGEST_SYSTEM_PROMPT),
157 ChatMessage::user(ingest_user_prompt(content, source)),
158 ],
159 max_tokens: INGEST_MAX_TOKENS,
160 temperature: 0.3,
161 json_mode: true,
162 };
163
164 let extraction: IngestExtraction = self
165 .llm
166 .generate_json(params)
167 .await
168 .map_err(|e| AgentError::Llm(e.to_string()))?;
169
170 Ok(extraction)
171 }
172}
173
174#[cfg(test)]
175mod tests {
176 use super::*;
177
178 use std::collections::VecDeque;
179 use std::sync::Mutex;
180
181 use async_trait::async_trait;
182 use nexus_llm::GenerateResponse;
183 use nexus_storage::repository::NamespaceRepository;
184 use sqlx::sqlite::SqlitePoolOptions;
185
186 use crate::types::IngestExtraction;
187 use nexus_core::cognitive_level_from_metadata;
188
189 struct MockLlmClient {
190 responses: Mutex<VecDeque<nexus_llm::Result<GenerateResponse>>>,
191 }
192
193 impl MockLlmClient {
194 fn new(responses: Vec<nexus_llm::Result<GenerateResponse>>) -> Self {
195 Self {
196 responses: Mutex::new(VecDeque::from(responses)),
197 }
198 }
199 }
200
201 #[async_trait]
202 impl LlmClient for MockLlmClient {
203 async fn generate(&self, _params: GenerateParams) -> nexus_llm::Result<GenerateResponse> {
204 self.responses
205 .lock()
206 .expect("mock responses poisoned")
207 .pop_front()
208 .expect("mock response missing")
209 }
210
211 fn provider_name(&self) -> String {
212 "mock".to_string()
213 }
214
215 fn model_name(&self) -> String {
216 "mock-model".to_string()
217 }
218 }
219
220 async fn setup_repo() -> (sqlx::SqlitePool, MemoryRepository, i64) {
221 let pool = SqlitePoolOptions::new()
222 .max_connections(1)
223 .connect("sqlite::memory:")
224 .await
225 .unwrap();
226 nexus_storage::migrations::run_migrations(&pool)
227 .await
228 .unwrap();
229 let namespace_repo = NamespaceRepository::new(pool.clone());
230 let namespace = namespace_repo
231 .get_or_create("ingest-test", "ingest-test")
232 .await
233 .unwrap();
234 (pool.clone(), MemoryRepository::new(pool), namespace.id)
235 }
236
237 fn extraction_response() -> GenerateResponse {
238 let extraction = IngestExtraction {
239 summary: "Captured a durable implementation update.".to_string(),
240 entities: vec!["query".to_string()],
241 topics: vec!["pagination".to_string()],
242 importance_score: 0.88,
243 };
244 GenerateResponse {
245 content: serde_json::to_string(&extraction).unwrap(),
246 model: "mock-model".to_string(),
247 usage: None,
248 }
249 }
250
251 #[tokio::test]
252 async fn test_ingest_stores_raw_cognitive_metadata() {
253 let (_pool, repo, namespace_id) = setup_repo().await;
254 let service = IngestService::new(
255 Arc::new(MockLlmClient::new(vec![Ok(extraction_response())])),
256 AgentConfig {
257 enabled: true,
258 namespace: "claude-code".to_string(),
259 ..AgentConfig::default()
260 },
261 );
262
263 let memory_id = service
264 .ingest_with_context(
265 "Implemented working-set retrieval.",
266 "unit-test",
267 namespace_id,
268 &repo,
269 IngestContext::default(),
270 )
271 .await
272 .unwrap();
273
274 let memory = repo.get_by_id(memory_id).await.unwrap().unwrap();
275 assert_eq!(
276 cognitive_level_from_metadata(&memory.metadata),
277 CognitiveLevel::Raw
278 );
279 assert_eq!(
280 memory.metadata["cognitive"]["generated_by"],
281 serde_json::Value::String("ingest_service".to_string())
282 );
283 }
284
285 #[tokio::test]
286 async fn test_ingest_enqueues_derive_job() {
287 let (pool, repo, namespace_id) = setup_repo().await;
288 let service = IngestService::new(
289 Arc::new(MockLlmClient::new(vec![Ok(extraction_response())])),
290 AgentConfig {
291 enabled: true,
292 namespace: "claude-code".to_string(),
293 ..AgentConfig::default()
294 },
295 );
296
297 let memory_id = service
298 .ingest_with_context(
299 "Implemented derive service foundation.",
300 "unit-test",
301 namespace_id,
302 &repo,
303 IngestContext {
304 session_key: Some("session-ctx".to_string()),
305 cwd: Some("/tmp/project".to_string()),
306 },
307 )
308 .await
309 .unwrap();
310
311 let job_count: i64 =
312 sqlx::query_scalar("SELECT COUNT(*) FROM memory_jobs WHERE job_type = ?")
313 .bind(DERIVE_MEMORY_JOB)
314 .fetch_one(&pool)
315 .await
316 .unwrap();
317 let job_memory_id: i64 =
318 sqlx::query_scalar("SELECT json_extract(payload_json, '$.memory_id') FROM memory_jobs WHERE job_type = ? LIMIT 1")
319 .bind(DERIVE_MEMORY_JOB)
320 .fetch_one(&pool)
321 .await
322 .unwrap();
323 let job_session_key: String =
324 sqlx::query_scalar("SELECT json_extract(payload_json, '$.session_key') FROM memory_jobs WHERE job_type = ? LIMIT 1")
325 .bind(DERIVE_MEMORY_JOB)
326 .fetch_one(&pool)
327 .await
328 .unwrap();
329 let job_perspective_session_key: String =
330 sqlx::query_scalar("SELECT json_extract(perspective_json, '$.session_key') FROM memory_jobs WHERE job_type = ? LIMIT 1")
331 .bind(DERIVE_MEMORY_JOB)
332 .fetch_one(&pool)
333 .await
334 .unwrap();
335
336 assert_eq!(job_count, 1);
337 assert_eq!(job_memory_id, memory_id);
338 assert_eq!(job_session_key, "session-ctx");
339 assert_eq!(job_perspective_session_key, "session-ctx");
340 }
341}