1use std::sync::Arc;
4
5use nexus_core::config::AgentConfig;
6use nexus_core::{
7 infer_perspective, CognitiveLevel, CognitiveMetadata, MemoryCategory, PerspectiveSource,
8};
9use nexus_llm::{ChatMessage, GenerateParams, LlmClient, LlmClientJson};
10use nexus_storage::models::EnqueueJobParams;
11use nexus_storage::repository::{MemoryRepository, StoreMemoryParams};
12use tracing::{debug, error, info};
13
14const INGEST_MAX_TOKENS: u32 = 8192;
16const DERIVE_MEMORY_JOB: &str = "derive_memory";
17
18use crate::error::AgentError;
19use crate::prompts::{ingest_user_prompt, INGEST_SYSTEM_PROMPT};
20use crate::types::IngestExtraction;
21
22#[derive(Debug, Clone, Default)]
23pub struct IngestContext {
24 pub session_key: Option<String>,
25 pub cwd: Option<String>,
26}
27
28pub struct IngestService {
29 llm: Arc<dyn LlmClient>,
30 config: AgentConfig,
31}
32
33impl IngestService {
34 pub fn new(llm: Arc<dyn LlmClient>, config: AgentConfig) -> Self {
35 Self { llm, config }
36 }
37
38 pub async fn ingest(
39 &self,
40 content: &str,
41 source: &str,
42 namespace_id: i64,
43 repo: &MemoryRepository,
44 ) -> Result<i64, AgentError> {
45 self.ingest_with_context(
46 content,
47 source,
48 namespace_id,
49 repo,
50 IngestContext::default(),
51 )
52 .await
53 }
54
55 pub async fn ingest_with_context(
56 &self,
57 content: &str,
58 source: &str,
59 namespace_id: i64,
60 repo: &MemoryRepository,
61 context: IngestContext,
62 ) -> Result<i64, AgentError> {
63 info!(source = %source, "Ingesting content");
64
65 let extraction = self.extract(content, source).await?;
67 debug!(summary = %extraction.summary, "Extracted content info");
68
69 let labels: Vec<String> = extraction
71 .entities
72 .iter()
73 .chain(extraction.topics.iter())
74 .cloned()
75 .collect();
76
77 let perspective = infer_perspective(
78 PerspectiveSource::HookIngest,
79 self.config.namespace.clone(),
80 None,
81 context.session_key.clone(),
82 );
83 let mut cognitive = CognitiveMetadata::new(
84 CognitiveLevel::Raw,
85 perspective.observer.clone(),
86 perspective.subject.clone(),
87 perspective.session_key.clone(),
88 "ingest_service",
89 );
90 cognitive.confidence = Some(extraction.importance_score);
91
92 let metadata = cognitive.merge_into(&serde_json::json!({
94 "agent": {
95 "summary": extraction.summary,
96 "entities": extraction.entities,
97 "topics": extraction.topics,
98 "importance_score": extraction.importance_score,
99 "source": source,
100 "generated_by": "ingest_agent"
101 }
102 }));
103
104 let _title = format!("Ingested: {}", source);
106 let memory = repo
107 .store(StoreMemoryParams {
108 namespace_id,
109 content,
110 category: &MemoryCategory::General,
111 memory_lane_type: None,
112 labels: &labels,
113 metadata: &metadata,
114 embedding: None,
115 embedding_model: None,
116 })
117 .await
118 .map_err(|e| {
119 error!(error = %e, "Failed to store memory");
120 AgentError::Storage(e.to_string())
121 })?;
122
123 let derive_payload = serde_json::json!({
124 "memory_id": memory.id,
125 "agent_namespace": self.config.namespace,
126 "source": source,
127 "session_key": context.session_key,
128 "cwd": context.cwd,
129 });
130 let derive_perspective = serde_json::to_value(&perspective).ok();
131 repo.enqueue_job(EnqueueJobParams {
132 namespace_id,
133 job_type: DERIVE_MEMORY_JOB,
134 priority: 100,
135 perspective: derive_perspective.as_ref(),
136 payload: &derive_payload,
137 })
138 .await
139 .map_err(|e| {
140 error!(error = %e, memory_id = memory.id, "Failed to enqueue derive job");
141 AgentError::Storage(e.to_string())
142 })?;
143
144 info!(memory_id = memory.id, "Memory stored successfully");
145 Ok(memory.id)
146 }
147
148 async fn extract(&self, content: &str, source: &str) -> Result<IngestExtraction, AgentError> {
149 let params = GenerateParams {
150 messages: vec![
151 ChatMessage::system(INGEST_SYSTEM_PROMPT),
152 ChatMessage::user(ingest_user_prompt(content, source)),
153 ],
154 max_tokens: INGEST_MAX_TOKENS,
155 temperature: 0.3,
156 json_mode: true,
157 };
158
159 let extraction: IngestExtraction = self
160 .llm
161 .generate_json(params)
162 .await
163 .map_err(|e| AgentError::Llm(e.to_string()))?;
164
165 Ok(extraction)
166 }
167}
168
169#[cfg(test)]
170mod tests {
171 use super::*;
172
173 use std::collections::VecDeque;
174 use std::sync::Mutex;
175
176 use async_trait::async_trait;
177 use nexus_llm::GenerateResponse;
178 use nexus_storage::repository::NamespaceRepository;
179 use sqlx::sqlite::SqlitePoolOptions;
180
181 use crate::types::IngestExtraction;
182 use nexus_core::cognitive_level_from_metadata;
183
184 struct MockLlmClient {
185 responses: Mutex<VecDeque<nexus_llm::Result<GenerateResponse>>>,
186 }
187
188 impl MockLlmClient {
189 fn new(responses: Vec<nexus_llm::Result<GenerateResponse>>) -> Self {
190 Self {
191 responses: Mutex::new(VecDeque::from(responses)),
192 }
193 }
194 }
195
196 #[async_trait]
197 impl LlmClient for MockLlmClient {
198 async fn generate(&self, _params: GenerateParams) -> nexus_llm::Result<GenerateResponse> {
199 self.responses
200 .lock()
201 .expect("mock responses poisoned")
202 .pop_front()
203 .expect("mock response missing")
204 }
205
206 fn provider_name(&self) -> String {
207 "mock".to_string()
208 }
209
210 fn model_name(&self) -> String {
211 "mock-model".to_string()
212 }
213 }
214
215 async fn setup_repo() -> (sqlx::SqlitePool, MemoryRepository, i64) {
216 let pool = SqlitePoolOptions::new()
217 .max_connections(1)
218 .connect("sqlite::memory:")
219 .await
220 .unwrap();
221 nexus_storage::migrations::run_migrations(&pool)
222 .await
223 .unwrap();
224 let namespace_repo = NamespaceRepository::new(pool.clone());
225 let namespace = namespace_repo
226 .get_or_create("ingest-test", "ingest-test")
227 .await
228 .unwrap();
229 (pool.clone(), MemoryRepository::new(pool), namespace.id)
230 }
231
232 fn extraction_response() -> GenerateResponse {
233 let extraction = IngestExtraction {
234 summary: "Captured a durable implementation update.".to_string(),
235 entities: vec!["query".to_string()],
236 topics: vec!["pagination".to_string()],
237 importance_score: 0.88,
238 };
239 GenerateResponse {
240 content: serde_json::to_string(&extraction).unwrap(),
241 model: "mock-model".to_string(),
242 usage: None,
243 }
244 }
245
246 #[tokio::test]
247 async fn test_ingest_stores_raw_cognitive_metadata() {
248 let (_pool, repo, namespace_id) = setup_repo().await;
249 let service = IngestService::new(
250 Arc::new(MockLlmClient::new(vec![Ok(extraction_response())])),
251 AgentConfig {
252 enabled: true,
253 namespace: "claude-code".to_string(),
254 ..AgentConfig::default()
255 },
256 );
257
258 let memory_id = service
259 .ingest_with_context(
260 "Implemented working-set retrieval.",
261 "unit-test",
262 namespace_id,
263 &repo,
264 IngestContext::default(),
265 )
266 .await
267 .unwrap();
268
269 let memory = repo.get_by_id(memory_id).await.unwrap().unwrap();
270 assert_eq!(
271 cognitive_level_from_metadata(&memory.metadata),
272 CognitiveLevel::Raw
273 );
274 assert_eq!(
275 memory.metadata["cognitive"]["generated_by"],
276 serde_json::Value::String("ingest_service".to_string())
277 );
278 }
279
280 #[tokio::test]
281 async fn test_ingest_enqueues_derive_job() {
282 let (pool, repo, namespace_id) = setup_repo().await;
283 let service = IngestService::new(
284 Arc::new(MockLlmClient::new(vec![Ok(extraction_response())])),
285 AgentConfig {
286 enabled: true,
287 namespace: "claude-code".to_string(),
288 ..AgentConfig::default()
289 },
290 );
291
292 let memory_id = service
293 .ingest_with_context(
294 "Implemented derive service foundation.",
295 "unit-test",
296 namespace_id,
297 &repo,
298 IngestContext {
299 session_key: Some("session-ctx".to_string()),
300 cwd: Some("/tmp/project".to_string()),
301 },
302 )
303 .await
304 .unwrap();
305
306 let job_count: i64 =
307 sqlx::query_scalar("SELECT COUNT(*) FROM memory_jobs WHERE job_type = ?")
308 .bind(DERIVE_MEMORY_JOB)
309 .fetch_one(&pool)
310 .await
311 .unwrap();
312 let job_memory_id: i64 =
313 sqlx::query_scalar("SELECT json_extract(payload_json, '$.memory_id') FROM memory_jobs WHERE job_type = ? LIMIT 1")
314 .bind(DERIVE_MEMORY_JOB)
315 .fetch_one(&pool)
316 .await
317 .unwrap();
318 let job_session_key: String =
319 sqlx::query_scalar("SELECT json_extract(payload_json, '$.session_key') FROM memory_jobs WHERE job_type = ? LIMIT 1")
320 .bind(DERIVE_MEMORY_JOB)
321 .fetch_one(&pool)
322 .await
323 .unwrap();
324 let job_perspective_session_key: String =
325 sqlx::query_scalar("SELECT json_extract(perspective_json, '$.session_key') FROM memory_jobs WHERE job_type = ? LIMIT 1")
326 .bind(DERIVE_MEMORY_JOB)
327 .fetch_one(&pool)
328 .await
329 .unwrap();
330
331 assert_eq!(job_count, 1);
332 assert_eq!(job_memory_id, memory_id);
333 assert_eq!(job_session_key, "session-ctx");
334 assert_eq!(job_perspective_session_key, "session-ctx");
335 }
336}