1use std::sync::Arc;
4
5use nexus_core::config::AgentConfig;
6use nexus_core::{
7 infer_perspective, CognitiveLevel, CognitiveMetadata, MemoryCategory, PerspectiveSource,
8};
9use nexus_llm::{ChatMessage, GenerateParams, LlmClient, LlmClientJson};
10use nexus_storage::models::EnqueueJobParams;
11use nexus_storage::repository::{MemoryRepository, StoreMemoryParams};
12use tracing::{debug, error, info};
13
14const INGEST_MAX_TOKENS: u32 = 8192;
16const DERIVE_MEMORY_JOB: &str = "derive_memory";
17
18use crate::error::AgentError;
19use crate::prompts::{ingest_user_prompt, INGEST_SYSTEM_PROMPT};
20use crate::types::IngestExtraction;
21
22#[derive(Debug, Clone, Default)]
23pub struct IngestContext {
24 pub session_key: Option<String>,
25 pub cwd: Option<String>,
26}
27
28pub struct IngestService {
29 llm: Arc<dyn LlmClient>,
30 config: AgentConfig,
31}
32
33impl IngestService {
34 pub fn new(llm: Arc<dyn LlmClient>, config: AgentConfig) -> Self {
35 Self { llm, config }
36 }
37
38 pub async fn ingest(
39 &self,
40 content: &str,
41 source: &str,
42 namespace_id: i64,
43 repo: &MemoryRepository,
44 ) -> Result<i64, AgentError> {
45 self.ingest_with_context(
46 content,
47 source,
48 namespace_id,
49 repo,
50 IngestContext::default(),
51 )
52 .await
53 }
54
55 pub async fn ingest_with_context(
56 &self,
57 content: &str,
58 source: &str,
59 namespace_id: i64,
60 repo: &MemoryRepository,
61 context: IngestContext,
62 ) -> Result<i64, AgentError> {
63 info!(source = %source, "Ingesting content");
64
65 let extraction = self.extract(content, source).await?;
67 debug!(summary = %extraction.summary, "Extracted content info");
68
69 let labels: Vec<String> = extraction
71 .entities
72 .iter()
73 .chain(extraction.topics.iter())
74 .cloned()
75 .collect();
76
77 let perspective = infer_perspective(
78 PerspectiveSource::HookIngest,
79 self.config.namespace.clone(),
80 None,
81 context.session_key.clone(),
82 );
83 let mut cognitive = CognitiveMetadata::new(
84 CognitiveLevel::Raw,
85 perspective.observer.clone(),
86 perspective.subject.clone(),
87 perspective.session_key.clone(),
88 "ingest_service",
89 );
90 cognitive.confidence = Some(extraction.importance_score);
91 cognitive.generated_by = Some("ingest_service".to_string());
92
93 let metadata = cognitive.merge_into(&serde_json::json!({
95 "agent": {
96 "summary": extraction.summary,
97 "entities": extraction.entities,
98 "topics": extraction.topics,
99 "importance_score": extraction.importance_score,
100 "source": source,
101 "generated_by": "ingest_agent"
102 }
103 }));
104
105 let _title = format!("Ingested: {}", source);
107 let memory = repo
108 .store(StoreMemoryParams {
109 namespace_id,
110 content,
111 category: &MemoryCategory::General,
112 memory_lane_type: None,
113 labels: &labels,
114 metadata: &metadata,
115 embedding: None,
116 embedding_model: None,
117 })
118 .await
119 .map_err(|e| {
120 error!(error = %e, "Failed to store memory");
121 AgentError::Storage(e.to_string())
122 })?;
123
124 let derive_payload = serde_json::json!({
125 "memory_id": memory.id,
126 "agent_namespace": self.config.namespace,
127 "source": source,
128 "session_key": context.session_key,
129 "cwd": context.cwd,
130 });
131 let derive_perspective = serde_json::to_value(&perspective).ok();
132 repo.enqueue_job(EnqueueJobParams {
133 namespace_id,
134 job_type: DERIVE_MEMORY_JOB,
135 priority: 100,
136 perspective: derive_perspective.as_ref(),
137 payload: &derive_payload,
138 })
139 .await
140 .map_err(|e| {
141 error!(error = %e, memory_id = memory.id, "Failed to enqueue derive job");
142 AgentError::Storage(e.to_string())
143 })?;
144
145 info!(memory_id = memory.id, "Memory stored successfully");
146 Ok(memory.id)
147 }
148
149 async fn extract(&self, content: &str, source: &str) -> Result<IngestExtraction, AgentError> {
150 let params = GenerateParams {
151 messages: vec![
152 ChatMessage::system(INGEST_SYSTEM_PROMPT),
153 ChatMessage::user(ingest_user_prompt(content, source)),
154 ],
155 max_tokens: INGEST_MAX_TOKENS,
156 temperature: 0.3,
157 json_mode: true,
158 };
159
160 let extraction: IngestExtraction = self
161 .llm
162 .generate_json(params)
163 .await
164 .map_err(|e| AgentError::Llm(e.to_string()))?;
165
166 Ok(extraction)
167 }
168}
169
170#[cfg(test)]
171mod tests {
172 use super::*;
173
174 use std::collections::VecDeque;
175 use std::sync::Mutex;
176
177 use async_trait::async_trait;
178 use nexus_llm::GenerateResponse;
179 use nexus_storage::repository::NamespaceRepository;
180 use sqlx::sqlite::SqlitePoolOptions;
181
182 use crate::types::IngestExtraction;
183 use nexus_core::cognitive_level_from_metadata;
184
185 struct MockLlmClient {
186 responses: Mutex<VecDeque<nexus_llm::Result<GenerateResponse>>>,
187 }
188
189 impl MockLlmClient {
190 fn new(responses: Vec<nexus_llm::Result<GenerateResponse>>) -> Self {
191 Self {
192 responses: Mutex::new(VecDeque::from(responses)),
193 }
194 }
195 }
196
197 #[async_trait]
198 impl LlmClient for MockLlmClient {
199 async fn generate(&self, _params: GenerateParams) -> nexus_llm::Result<GenerateResponse> {
200 self.responses
201 .lock()
202 .expect("mock responses poisoned")
203 .pop_front()
204 .expect("mock response missing")
205 }
206
207 fn provider_name(&self) -> String {
208 "mock".to_string()
209 }
210
211 fn model_name(&self) -> String {
212 "mock-model".to_string()
213 }
214 }
215
216 async fn setup_repo() -> (sqlx::SqlitePool, MemoryRepository, i64) {
217 let pool = SqlitePoolOptions::new()
218 .max_connections(1)
219 .connect("sqlite::memory:")
220 .await
221 .unwrap();
222 nexus_storage::migrations::run_migrations(&pool)
223 .await
224 .unwrap();
225 let namespace_repo = NamespaceRepository::new(pool.clone());
226 let namespace = namespace_repo
227 .get_or_create("ingest-test", "ingest-test")
228 .await
229 .unwrap();
230 (pool.clone(), MemoryRepository::new(pool), namespace.id)
231 }
232
233 fn extraction_response() -> GenerateResponse {
234 let extraction = IngestExtraction {
235 summary: "Captured a durable implementation update.".to_string(),
236 entities: vec!["query".to_string()],
237 topics: vec!["pagination".to_string()],
238 importance_score: 0.88,
239 };
240 GenerateResponse {
241 content: serde_json::to_string(&extraction).unwrap(),
242 model: "mock-model".to_string(),
243 usage: None,
244 }
245 }
246
247 #[tokio::test]
248 async fn test_ingest_stores_raw_cognitive_metadata() {
249 let (_pool, repo, namespace_id) = setup_repo().await;
250 let service = IngestService::new(
251 Arc::new(MockLlmClient::new(vec![Ok(extraction_response())])),
252 AgentConfig {
253 enabled: true,
254 namespace: "claude-code".to_string(),
255 ..AgentConfig::default()
256 },
257 );
258
259 let memory_id = service
260 .ingest_with_context(
261 "Implemented working-set retrieval.",
262 "unit-test",
263 namespace_id,
264 &repo,
265 IngestContext::default(),
266 )
267 .await
268 .unwrap();
269
270 let memory = repo.get_by_id(memory_id).await.unwrap().unwrap();
271 assert_eq!(
272 cognitive_level_from_metadata(&memory.metadata),
273 CognitiveLevel::Raw
274 );
275 assert_eq!(
276 memory.metadata["cognitive"]["generated_by"],
277 serde_json::Value::String("ingest_service".to_string())
278 );
279 }
280
281 #[tokio::test]
282 async fn test_ingest_enqueues_derive_job() {
283 let (pool, repo, namespace_id) = setup_repo().await;
284 let service = IngestService::new(
285 Arc::new(MockLlmClient::new(vec![Ok(extraction_response())])),
286 AgentConfig {
287 enabled: true,
288 namespace: "claude-code".to_string(),
289 ..AgentConfig::default()
290 },
291 );
292
293 let memory_id = service
294 .ingest_with_context(
295 "Implemented derive service foundation.",
296 "unit-test",
297 namespace_id,
298 &repo,
299 IngestContext {
300 session_key: Some("session-ctx".to_string()),
301 cwd: Some("/tmp/project".to_string()),
302 },
303 )
304 .await
305 .unwrap();
306
307 let job_count: i64 =
308 sqlx::query_scalar("SELECT COUNT(*) FROM memory_jobs WHERE job_type = ?")
309 .bind(DERIVE_MEMORY_JOB)
310 .fetch_one(&pool)
311 .await
312 .unwrap();
313 let job_memory_id: i64 =
314 sqlx::query_scalar("SELECT json_extract(payload_json, '$.memory_id') FROM memory_jobs WHERE job_type = ? LIMIT 1")
315 .bind(DERIVE_MEMORY_JOB)
316 .fetch_one(&pool)
317 .await
318 .unwrap();
319 let job_session_key: String =
320 sqlx::query_scalar("SELECT json_extract(payload_json, '$.session_key') FROM memory_jobs WHERE job_type = ? LIMIT 1")
321 .bind(DERIVE_MEMORY_JOB)
322 .fetch_one(&pool)
323 .await
324 .unwrap();
325 let job_perspective_session_key: String =
326 sqlx::query_scalar("SELECT json_extract(perspective_json, '$.session_key') FROM memory_jobs WHERE job_type = ? LIMIT 1")
327 .bind(DERIVE_MEMORY_JOB)
328 .fetch_one(&pool)
329 .await
330 .unwrap();
331
332 assert_eq!(job_count, 1);
333 assert_eq!(job_memory_id, memory_id);
334 assert_eq!(job_session_key, "session-ctx");
335 assert_eq!(job_perspective_session_key, "session-ctx");
336 }
337}