1use std::collections::HashMap;
7use std::sync::Arc;
8
9use nexus_core::config::{AgentConfig, CognitionConfig};
10use nexus_core::traits::EmbeddingService;
11use nexus_llm::LlmClient;
12use nexus_storage::models::ClaimedMemoryJob;
13use nexus_storage::models::{memory_job_status, EnqueueJobParams, MemoryJobRow};
14use nexus_storage::repository::MemoryRepository;
15use serde_json::json;
16use tracing::debug;
17
18use crate::derive::DeriveService;
19use crate::digest::DigestService;
20use crate::error::AgentError;
21use crate::reflect::ReflectService;
22
23pub(crate) const DERIVE_MEMORY_JOB: &str = "derive_memory";
24pub(crate) const REFLECT_NAMESPACE_JOB: &str = "reflect_namespace";
25pub(crate) const REFLECT_PERSPECTIVE_JOB: &str = "reflect_perspective";
26pub(crate) const DIGEST_SESSION_JOB: &str = "digest_session";
27
28pub(crate) async fn process_derive_jobs(
31 repo: &MemoryRepository,
32 namespace_id: i64,
33 cognition: &CognitionConfig,
34 agent: &AgentConfig,
35 llm: Arc<dyn LlmClient>,
36 embeddings: Option<Arc<dyn EmbeddingService>>,
37 lease_owner: &str,
38) -> Result<usize, AgentError> {
39 let jobs = repo
40 .claim_jobs(
41 namespace_id,
42 DERIVE_MEMORY_JOB,
43 lease_owner,
44 cognition.lease_ttl_secs,
45 cognition.max_job_batch as i64,
46 )
47 .await
48 .map_err(|error| AgentError::Storage(error.to_string()))?;
49
50 let service = DeriveService::new(agent.clone(), llm, embeddings);
51 let mut processed = 0usize;
52 for job in jobs {
53 let memory_id = job
54 .payload
55 .get("memory_id")
56 .and_then(serde_json::Value::as_i64);
57 let outcome = async {
58 let memory_id = memory_id.ok_or_else(|| {
59 AgentError::Derivation("derive job missing memory_id".to_string())
60 })?;
61 let memory = repo
62 .get_by_id(memory_id)
63 .await
64 .map_err(|error| AgentError::Storage(error.to_string()))?
65 .ok_or_else(|| {
66 AgentError::Derivation(format!("derive source memory {memory_id} not found"))
67 })?;
68 service
69 .derive_memory_with_perspective(&memory, job.perspective.as_ref(), repo)
70 .await
71 .map(|_| ())
72 }
73 .await;
74
75 match outcome {
76 Ok(()) => {
77 repo.complete_job(&job)
78 .await
79 .map_err(|error| AgentError::Storage(error.to_string()))?;
80 processed += 1;
81 }
82 Err(error) => {
83 repo.fail_job(&job, &error.to_string())
84 .await
85 .map_err(|storage_error| AgentError::Storage(storage_error.to_string()))?;
86 }
87 }
88 }
89
90 Ok(processed)
91}
92
93pub(crate) async fn process_reflect_jobs(
96 repo: &MemoryRepository,
97 namespace_id: i64,
98 cognition: &CognitionConfig,
99 agent: &AgentConfig,
100 embeddings: Option<Arc<dyn EmbeddingService>>,
101 lease_owner: &str,
102) -> Result<usize, AgentError> {
103 let jobs = repo
104 .claim_jobs(
105 namespace_id,
106 REFLECT_PERSPECTIVE_JOB,
107 lease_owner,
108 cognition.lease_ttl_secs,
109 cognition.max_job_batch as i64,
110 )
111 .await
112 .map_err(|error| AgentError::Storage(error.to_string()))?;
113
114 let service = ReflectService::new(agent.clone(), cognition.clone(), embeddings.clone());
115 let mut processed = 0usize;
116 for job in jobs {
117 let outcome = async {
118 let perspective = job.perspective.as_ref().ok_or_else(|| {
119 AgentError::Reflection("reflect job missing perspective".to_string())
120 })?;
121 service
122 .reflect_perspective_cycle(namespace_id, perspective, repo)
123 .await
124 .map(|_| ())
125 }
126 .await;
127
128 match outcome {
129 Ok(()) => {
130 repo.complete_job(&job)
131 .await
132 .map_err(|error| AgentError::Storage(error.to_string()))?;
133 processed += 1;
134 }
135 Err(error) => {
136 repo.fail_job(&job, &error.to_string())
137 .await
138 .map_err(|storage_error| AgentError::Storage(storage_error.to_string()))?;
139 }
140 }
141 }
142
143 Ok(processed)
144}
145
146pub(crate) async fn process_reflect_namespace_jobs(
147 repo: &MemoryRepository,
148 namespace_id: i64,
149 cognition: &CognitionConfig,
150 agent: &AgentConfig,
151 embeddings: Option<Arc<dyn EmbeddingService>>,
152 lease_owner: &str,
153) -> Result<usize, AgentError> {
154 let jobs = repo
155 .claim_jobs(
156 namespace_id,
157 REFLECT_NAMESPACE_JOB,
158 lease_owner,
159 cognition.lease_ttl_secs,
160 cognition.max_job_batch as i64,
161 )
162 .await
163 .map_err(|error| AgentError::Storage(error.to_string()))?;
164
165 let service = ReflectService::new(agent.clone(), cognition.clone(), embeddings.clone());
166 let mut processed = 0usize;
167 for job in jobs {
168 let outcome = service.reflect_cycle(namespace_id, repo).await.map(|_| ());
169
170 match outcome {
171 Ok(()) => {
172 repo.complete_job(&job)
173 .await
174 .map_err(|error| AgentError::Storage(error.to_string()))?;
175 processed += 1;
176 }
177 Err(error) => {
178 repo.fail_job(&job, &error.to_string())
179 .await
180 .map_err(|storage_error| AgentError::Storage(storage_error.to_string()))?;
181 }
182 }
183 }
184
185 Ok(processed)
186}
187
188pub(crate) async fn process_digest_jobs(
191 repo: &MemoryRepository,
192 namespace_id: i64,
193 cognition: &CognitionConfig,
194 agent: &AgentConfig,
195 llm: Arc<dyn LlmClient>,
196 embeddings: Option<Arc<dyn EmbeddingService>>,
197 lease_owner: &str,
198) -> Result<usize, AgentError> {
199 let jobs = repo
200 .claim_jobs(
201 namespace_id,
202 DIGEST_SESSION_JOB,
203 lease_owner,
204 cognition.lease_ttl_secs,
205 cognition.max_job_batch as i64,
206 )
207 .await
208 .map_err(|error| AgentError::Storage(error.to_string()))?;
209
210 let service = DigestService::new(agent.clone(), llm, embeddings);
211 let mut processed = 0usize;
212
213 let mut session_jobs: HashMap<String, Vec<(ClaimedMemoryJob, bool)>> = HashMap::new();
217 for job in jobs {
218 let session_key = match job
219 .payload
220 .get("session_key")
221 .and_then(serde_json::Value::as_str)
222 .map(ToString::to_string)
223 .or_else(|| {
224 job.perspective
225 .as_ref()
226 .and_then(|perspective| perspective.session_key.clone())
227 }) {
228 Some(key) => key,
229 None => {
230 let error = AgentError::Digest("digest job missing session_key".to_string());
231 repo.fail_job(&job, &error.to_string())
232 .await
233 .map_err(|storage_error| AgentError::Storage(storage_error.to_string()))?;
234 continue;
235 }
236 };
237 let force = digest_job_is_forced(
238 job.payload
239 .get("reason")
240 .and_then(serde_json::Value::as_str),
241 );
242 session_jobs
243 .entry(session_key)
244 .or_default()
245 .push((job, force));
246 }
247
248 for (session_key, job_batch) in session_jobs {
249 let force = job_batch.iter().any(|(_, f)| *f);
251
252 if !force
253 && !should_run_incremental_digest(repo, namespace_id, &session_key, cognition).await?
254 {
255 debug!(
256 namespace_id,
257 session_key, "Skipping digest rollover below threshold"
258 );
259 for (job, _) in &job_batch {
260 repo.complete_job(job)
261 .await
262 .map_err(|error| AgentError::Storage(error.to_string()))?;
263 processed += 1;
264 }
265 continue;
266 }
267
268 let outcome = async {
269 service
270 .digest_session(namespace_id, &session_key, repo, force)
271 .await
272 .map(|_| ())
273 }
274 .await;
275
276 match outcome {
277 Ok(()) => {
278 for (job, _) in &job_batch {
279 repo.complete_job(job)
280 .await
281 .map_err(|error| AgentError::Storage(error.to_string()))?;
282 processed += 1;
283 }
284 }
285 Err(error) => {
286 for (job, _) in &job_batch {
287 repo.fail_job(job, &error.to_string())
288 .await
289 .map_err(|storage_error| AgentError::Storage(storage_error.to_string()))?;
290 }
291 }
292 }
293 }
294
295 Ok(processed)
296}
297
298pub(crate) fn digest_job_is_forced(reason: Option<&str>) -> bool {
299 matches!(
300 reason,
301 Some("dream_digest" | "session_end" | "manual_digest" | "manual_rebuild")
302 )
303}
304
305async fn should_run_incremental_digest(
306 repo: &MemoryRepository,
307 namespace_id: i64,
308 session_key: &str,
309 cognition: &CognitionConfig,
310) -> Result<bool, AgentError> {
311 let rollover = repo
312 .session_digest_rollover(namespace_id, session_key)
313 .await
314 .map_err(|error| AgentError::Storage(error.to_string()))?;
315
316 if rollover.last_digest_end_memory_id.is_none() {
317 return Ok(true);
318 }
319
320 Ok(
326 rollover.new_memory_count >= cognition.activity_distill_min_events as i64
327 || rollover.estimated_new_tokens >= cognition.digest_short_target_tokens as i64,
328 )
329}
330
331pub(crate) async fn enqueue_digest_job_if_absent(
334 repo: &MemoryRepository,
335 namespace_id: i64,
336 session_key: &str,
337 digest_reason: &str,
338) -> Result<bool, AgentError> {
339 let payload = json!({
340 "session_key": session_key,
341 "reason": digest_reason,
342 });
343 enqueue_job_if_absent(
344 repo,
345 EnqueueJobParams {
346 namespace_id,
347 job_type: DIGEST_SESSION_JOB,
348 priority: 110,
349 perspective: None,
350 payload: &payload,
351 },
352 )
353 .await
354}
355
356pub(crate) async fn enqueue_job_if_absent(
364 repo: &MemoryRepository,
365 params: EnqueueJobParams<'_>,
366) -> Result<bool, AgentError> {
367 for status in [memory_job_status::PENDING, memory_job_status::RUNNING] {
368 let jobs = repo
369 .list_jobs(
370 params.namespace_id,
371 Some(params.job_type),
372 Some(status),
373 64,
374 0,
375 )
376 .await
377 .map_err(|error| AgentError::Storage(error.to_string()))?;
378 if jobs
379 .iter()
380 .any(|row| queued_job_matches(row, params.perspective, params.payload))
381 {
382 return Ok(false);
383 }
384 }
385
386 repo.enqueue_job(params)
387 .await
388 .map_err(|error| AgentError::Storage(error.to_string()))?;
389 Ok(true)
390}
391
392fn queued_job_matches(
393 row: &MemoryJobRow,
394 perspective: Option<&serde_json::Value>,
395 payload: &serde_json::Value,
396) -> bool {
397 let row_payload: serde_json::Value = match serde_json::from_str(&row.payload_json) {
398 Ok(value) => value,
399 Err(_) => return false,
400 };
401 if &row_payload != payload {
402 return false;
403 }
404
405 match (&row.perspective_json, perspective) {
406 (None, None) => true,
407 (Some(existing), Some(expected)) => serde_json::from_str::<serde_json::Value>(existing)
408 .map(|value| value == *expected)
409 .unwrap_or(false),
410 _ => false,
411 }
412}