Skip to main content

nexus_memory_agent/
job_processor.rs

1//! Job processing for derive, reflect, and digest cognition jobs.
2//!
3//! Each function claims a batch of pending jobs, dispatches them to the
4//! appropriate service, and marks them complete or failed.
5
6use std::collections::HashMap;
7use std::sync::Arc;
8
9use nexus_core::config::{AgentConfig, CognitionConfig};
10use nexus_core::traits::EmbeddingService;
11use nexus_llm::LlmClient;
12use nexus_storage::models::ClaimedMemoryJob;
13use nexus_storage::models::{memory_job_status, EnqueueJobParams, MemoryJobRow};
14use nexus_storage::repository::MemoryRepository;
15use serde_json::json;
16use tracing::debug;
17
18use crate::derive::DeriveService;
19use crate::digest::DigestService;
20use crate::error::AgentError;
21use crate::reflect::ReflectService;
22
23pub(crate) const DERIVE_MEMORY_JOB: &str = "derive_memory";
24pub(crate) const REFLECT_NAMESPACE_JOB: &str = "reflect_namespace";
25pub(crate) const REFLECT_PERSPECTIVE_JOB: &str = "reflect_perspective";
26pub(crate) const DIGEST_SESSION_JOB: &str = "digest_session";
27
28// ── Derive jobs ───────────────────────────────────────────────────────
29
30pub(crate) async fn process_derive_jobs(
31    repo: &MemoryRepository,
32    namespace_id: i64,
33    cognition: &CognitionConfig,
34    agent: &AgentConfig,
35    llm: Arc<dyn LlmClient>,
36    embeddings: Option<Arc<dyn EmbeddingService>>,
37    lease_owner: &str,
38) -> Result<usize, AgentError> {
39    let jobs = repo
40        .claim_jobs(
41            namespace_id,
42            DERIVE_MEMORY_JOB,
43            lease_owner,
44            cognition.lease_ttl_secs,
45            cognition.max_job_batch as i64,
46        )
47        .await
48        .map_err(|error| AgentError::Storage(error.to_string()))?;
49
50    let service = DeriveService::new(agent.clone(), llm, embeddings);
51    let mut processed = 0usize;
52    for job in jobs {
53        let memory_id = job
54            .payload
55            .get("memory_id")
56            .and_then(serde_json::Value::as_i64);
57        let outcome = async {
58            let memory_id = memory_id.ok_or_else(|| {
59                AgentError::Derivation("derive job missing memory_id".to_string())
60            })?;
61            let memory = repo
62                .get_by_id(memory_id)
63                .await
64                .map_err(|error| AgentError::Storage(error.to_string()))?
65                .ok_or_else(|| {
66                    AgentError::Derivation(format!("derive source memory {memory_id} not found"))
67                })?;
68            service
69                .derive_memory_with_perspective(&memory, job.perspective.as_ref(), repo)
70                .await
71                .map(|_| ())
72        }
73        .await;
74
75        match outcome {
76            Ok(()) => {
77                repo.complete_job(&job)
78                    .await
79                    .map_err(|error| AgentError::Storage(error.to_string()))?;
80                processed += 1;
81            }
82            Err(error) => {
83                repo.fail_job(&job, &error.to_string())
84                    .await
85                    .map_err(|storage_error| AgentError::Storage(storage_error.to_string()))?;
86            }
87        }
88    }
89
90    Ok(processed)
91}
92
93// ── Reflect jobs ──────────────────────────────────────────────────────
94
95pub(crate) async fn process_reflect_jobs(
96    repo: &MemoryRepository,
97    namespace_id: i64,
98    cognition: &CognitionConfig,
99    agent: &AgentConfig,
100    embeddings: Option<Arc<dyn EmbeddingService>>,
101    lease_owner: &str,
102) -> Result<usize, AgentError> {
103    let jobs = repo
104        .claim_jobs(
105            namespace_id,
106            REFLECT_PERSPECTIVE_JOB,
107            lease_owner,
108            cognition.lease_ttl_secs,
109            cognition.max_job_batch as i64,
110        )
111        .await
112        .map_err(|error| AgentError::Storage(error.to_string()))?;
113
114    let service = ReflectService::new(agent.clone(), cognition.clone(), embeddings.clone());
115    let mut processed = 0usize;
116    for job in jobs {
117        let outcome = async {
118            let perspective = job.perspective.as_ref().ok_or_else(|| {
119                AgentError::Reflection("reflect job missing perspective".to_string())
120            })?;
121            service
122                .reflect_perspective_cycle(namespace_id, perspective, repo)
123                .await
124                .map(|_| ())
125        }
126        .await;
127
128        match outcome {
129            Ok(()) => {
130                repo.complete_job(&job)
131                    .await
132                    .map_err(|error| AgentError::Storage(error.to_string()))?;
133                processed += 1;
134            }
135            Err(error) => {
136                repo.fail_job(&job, &error.to_string())
137                    .await
138                    .map_err(|storage_error| AgentError::Storage(storage_error.to_string()))?;
139            }
140        }
141    }
142
143    Ok(processed)
144}
145
146pub(crate) async fn process_reflect_namespace_jobs(
147    repo: &MemoryRepository,
148    namespace_id: i64,
149    cognition: &CognitionConfig,
150    agent: &AgentConfig,
151    embeddings: Option<Arc<dyn EmbeddingService>>,
152    lease_owner: &str,
153) -> Result<usize, AgentError> {
154    let jobs = repo
155        .claim_jobs(
156            namespace_id,
157            REFLECT_NAMESPACE_JOB,
158            lease_owner,
159            cognition.lease_ttl_secs,
160            cognition.max_job_batch as i64,
161        )
162        .await
163        .map_err(|error| AgentError::Storage(error.to_string()))?;
164
165    let service = ReflectService::new(agent.clone(), cognition.clone(), embeddings.clone());
166    let mut processed = 0usize;
167    for job in jobs {
168        let outcome = service.reflect_cycle(namespace_id, repo).await.map(|_| ());
169
170        match outcome {
171            Ok(()) => {
172                repo.complete_job(&job)
173                    .await
174                    .map_err(|error| AgentError::Storage(error.to_string()))?;
175                processed += 1;
176            }
177            Err(error) => {
178                repo.fail_job(&job, &error.to_string())
179                    .await
180                    .map_err(|storage_error| AgentError::Storage(storage_error.to_string()))?;
181            }
182        }
183    }
184
185    Ok(processed)
186}
187
188// ── Digest jobs ───────────────────────────────────────────────────────
189
190pub(crate) async fn process_digest_jobs(
191    repo: &MemoryRepository,
192    namespace_id: i64,
193    cognition: &CognitionConfig,
194    agent: &AgentConfig,
195    llm: Arc<dyn LlmClient>,
196    embeddings: Option<Arc<dyn EmbeddingService>>,
197    lease_owner: &str,
198) -> Result<usize, AgentError> {
199    let jobs = repo
200        .claim_jobs(
201            namespace_id,
202            DIGEST_SESSION_JOB,
203            lease_owner,
204            cognition.lease_ttl_secs,
205            cognition.max_job_batch as i64,
206        )
207        .await
208        .map_err(|error| AgentError::Storage(error.to_string()))?;
209
210    let service = DigestService::new(agent.clone(), llm, embeddings);
211    let mut processed = 0usize;
212
213    // Group claimed jobs by session_key, OR-ing the forced flag across
214    // duplicates so that a manual_digest/manual_rebuild is never silently
215    // dropped when an earlier automatic job exists for the same session.
216    let mut session_jobs: HashMap<String, Vec<(ClaimedMemoryJob, bool)>> = HashMap::new();
217    for job in jobs {
218        let session_key = match job
219            .payload
220            .get("session_key")
221            .and_then(serde_json::Value::as_str)
222            .map(ToString::to_string)
223            .or_else(|| {
224                job.perspective
225                    .as_ref()
226                    .and_then(|perspective| perspective.session_key.clone())
227            }) {
228            Some(key) => key,
229            None => {
230                let error = AgentError::Digest("digest job missing session_key".to_string());
231                repo.fail_job(&job, &error.to_string())
232                    .await
233                    .map_err(|storage_error| AgentError::Storage(storage_error.to_string()))?;
234                continue;
235            }
236        };
237        let force = digest_job_is_forced(
238            job.payload
239                .get("reason")
240                .and_then(serde_json::Value::as_str),
241        );
242        session_jobs
243            .entry(session_key)
244            .or_default()
245            .push((job, force));
246    }
247
248    for (session_key, job_batch) in session_jobs {
249        // OR forced flag across all jobs for this session.
250        let force = job_batch.iter().any(|(_, f)| *f);
251
252        if !force
253            && !should_run_incremental_digest(repo, namespace_id, &session_key, cognition).await?
254        {
255            debug!(
256                namespace_id,
257                session_key, "Skipping digest rollover below threshold"
258            );
259            for (job, _) in &job_batch {
260                repo.complete_job(job)
261                    .await
262                    .map_err(|error| AgentError::Storage(error.to_string()))?;
263                processed += 1;
264            }
265            continue;
266        }
267
268        let outcome = async {
269            service
270                .digest_session(namespace_id, &session_key, repo, force)
271                .await
272                .map(|_| ())
273        }
274        .await;
275
276        match outcome {
277            Ok(()) => {
278                for (job, _) in &job_batch {
279                    repo.complete_job(job)
280                        .await
281                        .map_err(|error| AgentError::Storage(error.to_string()))?;
282                    processed += 1;
283                }
284            }
285            Err(error) => {
286                for (job, _) in &job_batch {
287                    repo.fail_job(job, &error.to_string())
288                        .await
289                        .map_err(|storage_error| AgentError::Storage(storage_error.to_string()))?;
290                }
291            }
292        }
293    }
294
295    Ok(processed)
296}
297
298pub(crate) fn digest_job_is_forced(reason: Option<&str>) -> bool {
299    matches!(
300        reason,
301        Some("dream_digest" | "session_end" | "manual_digest" | "manual_rebuild")
302    )
303}
304
305async fn should_run_incremental_digest(
306    repo: &MemoryRepository,
307    namespace_id: i64,
308    session_key: &str,
309    cognition: &CognitionConfig,
310) -> Result<bool, AgentError> {
311    let rollover = repo
312        .session_digest_rollover(namespace_id, session_key)
313        .await
314        .map_err(|error| AgentError::Storage(error.to_string()))?;
315
316    if rollover.last_digest_end_memory_id.is_none() {
317        return Ok(true);
318    }
319
320    // Note: `activity_distill_min_events` is intentionally reused here as the
321    // minimum threshold for digest rollover as well.  The two operations (activity
322    // distillation and incremental digest) share the same "minimum new events before
323    // processing is worthwhile" semantic, so a single config knob avoids
324    // over-parameterisation.
325    Ok(
326        rollover.new_memory_count >= cognition.activity_distill_min_events as i64
327            || rollover.estimated_new_tokens >= cognition.digest_short_target_tokens as i64,
328    )
329}
330
331// ── Job enqueue helpers ───────────────────────────────────────────────
332
333pub(crate) async fn enqueue_digest_job_if_absent(
334    repo: &MemoryRepository,
335    namespace_id: i64,
336    session_key: &str,
337    digest_reason: &str,
338) -> Result<bool, AgentError> {
339    let payload = json!({
340        "session_key": session_key,
341        "reason": digest_reason,
342    });
343    enqueue_job_if_absent(
344        repo,
345        EnqueueJobParams {
346            namespace_id,
347            job_type: DIGEST_SESSION_JOB,
348            priority: 110,
349            perspective: None,
350            payload: &payload,
351        },
352    )
353    .await
354}
355
356/// Enqueue a job only if no matching pending/running job already exists.
357///
358/// **Note:** The `list_jobs` → `enqueue_job` sequence is subject to a TOCTOU
359/// race under high concurrency. A proper uniqueness guarantee should be
360/// enforced at the database level (e.g. a unique index on
361/// `(namespace_id, job_type, payload_hash)`). The current scan is a
362/// best-effort pre-check, not a source of truth.
363pub(crate) async fn enqueue_job_if_absent(
364    repo: &MemoryRepository,
365    params: EnqueueJobParams<'_>,
366) -> Result<bool, AgentError> {
367    for status in [memory_job_status::PENDING, memory_job_status::RUNNING] {
368        let jobs = repo
369            .list_jobs(
370                params.namespace_id,
371                Some(params.job_type),
372                Some(status),
373                64,
374                0,
375            )
376            .await
377            .map_err(|error| AgentError::Storage(error.to_string()))?;
378        if jobs
379            .iter()
380            .any(|row| queued_job_matches(row, params.perspective, params.payload))
381        {
382            return Ok(false);
383        }
384    }
385
386    repo.enqueue_job(params)
387        .await
388        .map_err(|error| AgentError::Storage(error.to_string()))?;
389    Ok(true)
390}
391
392fn queued_job_matches(
393    row: &MemoryJobRow,
394    perspective: Option<&serde_json::Value>,
395    payload: &serde_json::Value,
396) -> bool {
397    let row_payload: serde_json::Value = match serde_json::from_str(&row.payload_json) {
398        Ok(value) => value,
399        Err(_) => return false,
400    };
401    if &row_payload != payload {
402        return false;
403    }
404
405    match (&row.perspective_json, perspective) {
406        (None, None) => true,
407        (Some(existing), Some(expected)) => serde_json::from_str::<serde_json::Value>(existing)
408            .map(|value| value == *expected)
409            .unwrap_or(false),
410        _ => false,
411    }
412}