Skip to main content

nexus_memory_agent/
digest.rs

1//! Digest service - produces short and long session summaries.
2
3use std::sync::Arc;
4use std::time::Instant;
5
6use chrono::Utc;
7use nexus_core::config::AgentConfig;
8use nexus_core::traits::EmbeddingService;
9use nexus_core::{
10    cognitive_level_from_metadata, CognitiveLevel, CognitiveMetadata, Memory, MemoryCategory,
11    MemoryLaneCognitiveType, MemoryLaneType, PerspectiveKey,
12};
13use nexus_llm::{ChatMessage, GenerateParams, LlmClient, TokenUsage};
14use nexus_storage::repository::{
15    MemoryRepository, StoreDigestParams, StoreMemoryParams, StoreMemoryWithLineageParams,
16};
17use tracing::{debug, info, warn};
18
19use crate::error::AgentError;
20use crate::prompts::{digest_user_prompt, DIGEST_SYSTEM_PROMPT};
21use crate::util::{
22    flush_metric_samples, maybe_embed, parse_json_response, stage_metric_sample,
23    token_usage_metric_samples,
24};
25
26const DIGEST_MAX_TOKENS: u32 = 4096;
27const DIGEST_MAX_SOURCE_MEMORIES: i64 = 200;
28const DIGEST_GENERATED_BY: &str = "digest_service";
29const DIGEST_KIND_SHORT: &str = "short";
30const DIGEST_KIND_LONG: &str = "long";
31const SHORT_MAX_CHARS: usize = 4000;
32const LONG_MAX_CHARS: usize = 12000;
33const DIGESTED_FROM_ROLE: &str = "digested_from";
34
35/// Result of a session digest operation.
36#[derive(Debug, Clone)]
37pub struct DigestResult {
38    pub short_id: i64,
39    pub long_id: i64,
40    pub source_count: usize,
41}
42
43/// Parsed LLM digest output.
44#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
45struct DigestEnvelope {
46    short: String,
47    long: String,
48}
49
50pub struct DigestService {
51    config: AgentConfig,
52    llm: Arc<dyn LlmClient>,
53    embeddings: Option<Arc<dyn EmbeddingService>>,
54}
55
56impl DigestService {
57    pub fn new(
58        config: AgentConfig,
59        llm: Arc<dyn LlmClient>,
60        embeddings: Option<Arc<dyn EmbeddingService>>,
61    ) -> Self {
62        Self {
63            config,
64            llm,
65            embeddings,
66        }
67    }
68
69    /// Produce short and long digests for a session.
70    ///
71    /// Idempotent: if digests already exist for the session and `force` is false,
72    /// returns the existing digest IDs without regenerating.
73    pub async fn digest_session(
74        &self,
75        namespace_id: i64,
76        session_key: &str,
77        repo: &MemoryRepository,
78        _force: bool,
79    ) -> Result<DigestResult, AgentError> {
80        let total_started = Instant::now();
81        let mut metrics = Vec::new();
82        // Idempotent: if digests already exist for the session, return the existing
83        // digest IDs without regenerating, regardless of `force`. The `force` flag
84        // only bypasses the rollover threshold check (handled by the caller).
85        if let Some(existing) = existing_digest_ids(repo, namespace_id, session_key).await? {
86            debug!(
87                namespace_id,
88                session_key,
89                short_id = existing.short_id,
90                long_id = existing.long_id,
91                "Reusing existing session digests"
92            );
93            return Ok(existing);
94        }
95
96        let gather_started = Instant::now();
97        let memories = gather_session_memories(repo, namespace_id, session_key).await?;
98        metrics.push(stage_metric_sample(
99            namespace_id,
100            "cognition.digest.gather_ms",
101            gather_started.elapsed().as_secs_f64() * 1000.0,
102            "gather",
103        ));
104        if memories.is_empty() {
105            return Err(AgentError::Digest(format!(
106                "No non-raw memories found for session \"{}\"",
107                session_key
108            )));
109        }
110
111        let source_count = memories.len();
112        let source_ids: Vec<i64> = memories.iter().map(|m| m.id).collect();
113        let min_id = source_ids.iter().copied().min().unwrap_or(0);
114        let max_id = source_ids.iter().copied().max().unwrap_or(0);
115
116        let produce_started = Instant::now();
117        let (envelope, usage) = produce_digests(self.llm.as_ref(), session_key, &memories).await;
118        metrics.push(stage_metric_sample(
119            namespace_id,
120            "cognition.digest.produce_ms",
121            produce_started.elapsed().as_secs_f64() * 1000.0,
122            "produce",
123        ));
124        metrics.extend(token_usage_metric_samples(
125            namespace_id,
126            "cognition.digest.produce",
127            "produce",
128            usage.as_ref(),
129        ));
130        let short_content = truncate(envelope.short.trim(), SHORT_MAX_CHARS);
131        let long_content = truncate(envelope.long.trim(), LONG_MAX_CHARS);
132
133        let perspective = PerspectiveKey {
134            observer: self.config.namespace.clone(),
135            subject: self.config.namespace.clone(),
136            session_key: Some(session_key.to_string()),
137        };
138
139        let embeddings = self.embeddings.as_deref();
140        let short_memory = store_digest_memory(
141            repo,
142            namespace_id,
143            &short_content,
144            CognitiveLevel::SummaryShort,
145            &perspective,
146            &source_ids,
147            embeddings,
148        )
149        .await?;
150
151        let long_memory = store_digest_memory(
152            repo,
153            namespace_id,
154            &long_content,
155            CognitiveLevel::SummaryLong,
156            &perspective,
157            &source_ids,
158            embeddings,
159        )
160        .await?;
161
162        let short_tokens = estimate_tokens(&short_content);
163        let long_tokens = estimate_tokens(&long_content);
164
165        let store_started = Instant::now();
166        repo.store_digest(StoreDigestParams {
167            namespace_id,
168            session_key,
169            digest_kind: DIGEST_KIND_SHORT,
170            memory_id: short_memory.id,
171            start_memory_id: Some(min_id),
172            end_memory_id: Some(max_id),
173            token_count: short_tokens,
174        })
175        .await
176        .map_err(|e| AgentError::Storage(e.to_string()))?;
177
178        repo.store_digest(StoreDigestParams {
179            namespace_id,
180            session_key,
181            digest_kind: DIGEST_KIND_LONG,
182            memory_id: long_memory.id,
183            start_memory_id: Some(min_id),
184            end_memory_id: Some(max_id),
185            token_count: long_tokens,
186        })
187        .await
188        .map_err(|e| AgentError::Storage(e.to_string()))?;
189        metrics.push(stage_metric_sample(
190            namespace_id,
191            "cognition.digest.store_ms",
192            store_started.elapsed().as_secs_f64() * 1000.0,
193            "store",
194        ));
195
196        info!(
197            namespace_id,
198            session_key,
199            short_id = short_memory.id,
200            long_id = long_memory.id,
201            source_count,
202            "Created session digests"
203        );
204        metrics.push(stage_metric_sample(
205            namespace_id,
206            "cognition.digest.total_ms",
207            total_started.elapsed().as_secs_f64() * 1000.0,
208            "total",
209        ));
210        flush_metric_samples(repo, &metrics).await;
211
212        Ok(DigestResult {
213            short_id: short_memory.id,
214            long_id: long_memory.id,
215            source_count,
216        })
217    }
218}
219
220// ---------------------------------------------------------------------------
221// Helpers
222// ---------------------------------------------------------------------------
223
224async fn existing_digest_ids(
225    repo: &MemoryRepository,
226    namespace_id: i64,
227    session_key: &str,
228) -> Result<Option<DigestResult>, AgentError> {
229    let short = repo
230        .latest_digest_for_session(namespace_id, session_key, DIGEST_KIND_SHORT)
231        .await
232        .map_err(|e| AgentError::Storage(e.to_string()))?;
233
234    let long = repo
235        .latest_digest_for_session(namespace_id, session_key, DIGEST_KIND_LONG)
236        .await
237        .map_err(|e| AgentError::Storage(e.to_string()))?;
238
239    match (short, long) {
240        (Some(s), Some(l)) => Ok(Some(DigestResult {
241            short_id: s.id,
242            long_id: l.id,
243            source_count: 0,
244        })),
245        _ => Ok(None),
246    }
247}
248
249async fn gather_session_memories(
250    repo: &MemoryRepository,
251    namespace_id: i64,
252    session_key: &str,
253) -> Result<Vec<Memory>, AgentError> {
254    let matching = repo
255        .list_by_session_key(namespace_id, session_key, DIGEST_MAX_SOURCE_MEMORIES, false)
256        .await
257        .map_err(|e| AgentError::Storage(e.to_string()))?;
258
259    Ok(matching
260        .into_iter()
261        .filter(|m| {
262            let level = cognitive_level_from_metadata(&m.metadata);
263            if matches!(
264                level,
265                CognitiveLevel::Raw | CognitiveLevel::SummaryShort | CognitiveLevel::SummaryLong
266            ) {
267                return false;
268            }
269            true
270        })
271        .collect())
272}
273
274async fn produce_digests(
275    llm: &dyn LlmClient,
276    session_key: &str,
277    memories: &[Memory],
278) -> (DigestEnvelope, Option<TokenUsage>) {
279    let pairs: Vec<(i64, &str)> = memories
280        .iter()
281        .map(|m| (m.id, m.content.as_str()))
282        .collect();
283    let user_msg = digest_user_prompt(session_key, &pairs);
284
285    let params = GenerateParams {
286        messages: vec![
287            ChatMessage::system(DIGEST_SYSTEM_PROMPT),
288            ChatMessage::user(user_msg),
289        ],
290        max_tokens: DIGEST_MAX_TOKENS,
291        temperature: 0.2,
292        json_mode: true,
293    };
294
295    match llm.generate(params).await {
296        Ok(response) => {
297            let usage = response.usage.clone();
298            match parse_json_response::<DigestEnvelope>(&response) {
299                Ok(envelope)
300                    if envelope.short.trim().is_empty() && envelope.long.trim().is_empty() =>
301                {
302                    warn!("LLM returned empty digest, using fallback");
303                    (fallback_digest(memories), usage)
304                }
305                Ok(envelope) => (envelope, usage),
306                Err(error) => {
307                    warn!(%error, "LLM digest response was invalid JSON, using fallback");
308                    (fallback_digest(memories), usage)
309                }
310            }
311        }
312        Err(error) => {
313            warn!(%error, "LLM digest call failed, using fallback");
314            (fallback_digest(memories), None)
315        }
316    }
317}
318
319fn fallback_digest(memories: &[Memory]) -> DigestEnvelope {
320    let short = fallback_short(memories);
321    let long = fallback_long(memories);
322    DigestEnvelope { short, long }
323}
324
325fn fallback_short(memories: &[Memory]) -> String {
326    let combined: String = memories
327        .iter()
328        .take(5)
329        .map(|m| m.content.trim())
330        .filter(|s| !s.is_empty())
331        .collect::<Vec<_>>()
332        .join(" ");
333
334    truncate(&combined, SHORT_MAX_CHARS)
335}
336
337fn fallback_long(memories: &[Memory]) -> String {
338    let combined: String = memories
339        .iter()
340        .map(|m| m.content.trim())
341        .filter(|s| !s.is_empty())
342        .collect::<Vec<_>>()
343        .join(" ");
344
345    truncate(&combined, LONG_MAX_CHARS)
346}
347
348async fn store_digest_memory(
349    repo: &MemoryRepository,
350    namespace_id: i64,
351    content: &str,
352    level: CognitiveLevel,
353    perspective: &PerspectiveKey,
354    source_ids: &[i64],
355    embeddings: Option<&dyn EmbeddingService>,
356) -> Result<Memory, AgentError> {
357    let mut cognitive = CognitiveMetadata::new(
358        level,
359        perspective.observer.clone(),
360        perspective.subject.clone(),
361        perspective.session_key.clone(),
362        DIGEST_GENERATED_BY,
363    );
364    cognitive.source_memory_ids = source_ids.to_vec();
365    cognitive.confidence = Some(0.80);
366    cognitive.times_reinforced = 0;
367    cognitive.times_contradicted = 0;
368    cognitive.derived_at = Some(Utc::now());
369    cognitive.generated_by = Some(DIGEST_GENERATED_BY.to_string());
370
371    let metadata = cognitive.merge_into(&serde_json::json!({}));
372    let (embedding, embedding_model) = maybe_embed(embeddings, content).await;
373
374    let memory = repo
375        .store_with_lineage(StoreMemoryWithLineageParams {
376            store: StoreMemoryParams {
377                namespace_id,
378                content,
379                category: &MemoryCategory::Session,
380                memory_lane_type: Some(&MemoryLaneType::Cognitive(
381                    MemoryLaneCognitiveType::Explicit,
382                )),
383                labels: &["digest".to_string(), level.to_string().to_lowercase()],
384                metadata: &metadata,
385                embedding: embedding.as_deref(),
386                embedding_model: embedding_model.as_deref(),
387            },
388            source_memory_ids: source_ids,
389            evidence_role: DIGESTED_FROM_ROLE,
390        })
391        .await
392        .map_err(|e| AgentError::Storage(e.to_string()))?;
393
394    Ok(memory)
395}
396
397fn truncate(s: &str, max_chars: usize) -> String {
398    if s.len() <= max_chars {
399        return s.to_string();
400    }
401    let mut end = max_chars;
402    while end > 0 && !s.is_char_boundary(end) {
403        end -= 1;
404    }
405    let mut truncated = s[..end].to_string();
406    if let Some(last_space) = truncated.rfind(' ') {
407        truncated.truncate(last_space);
408    }
409    truncated
410}
411
412fn estimate_tokens(s: &str) -> usize {
413    s.split_whitespace().count()
414}
415
416// ---------------------------------------------------------------------------
417// Tests
418// ---------------------------------------------------------------------------
419
420#[cfg(test)]
421mod tests {
422    use super::*;
423
424    use std::collections::VecDeque;
425    use std::sync::Mutex;
426
427    use async_trait::async_trait;
428    use chrono::Utc;
429    use nexus_core::MemoryLanePriorityType;
430    use nexus_llm::GenerateResponse;
431    use nexus_storage::repository::NamespaceRepository;
432    use sqlx::sqlite::SqlitePoolOptions;
433
434    struct MockLlmClient {
435        responses: Mutex<VecDeque<nexus_llm::Result<GenerateResponse>>>,
436    }
437
438    impl MockLlmClient {
439        fn new(responses: Vec<nexus_llm::Result<GenerateResponse>>) -> Self {
440            Self {
441                responses: Mutex::new(VecDeque::from(responses)),
442            }
443        }
444    }
445
446    #[async_trait]
447    impl LlmClient for MockLlmClient {
448        async fn generate(&self, _params: GenerateParams) -> nexus_llm::Result<GenerateResponse> {
449            self.responses
450                .lock()
451                .expect("mock responses poisoned")
452                .pop_front()
453                .expect("mock response missing")
454        }
455
456        fn provider_name(&self) -> String {
457            "mock".to_string()
458        }
459
460        fn model_name(&self) -> String {
461            "mock-model".to_string()
462        }
463    }
464
465    async fn setup_repo() -> (sqlx::SqlitePool, MemoryRepository, i64) {
466        let pool = SqlitePoolOptions::new()
467            .max_connections(1)
468            .connect("sqlite::memory:")
469            .await
470            .unwrap();
471        nexus_storage::migrations::run_migrations(&pool)
472            .await
473            .unwrap();
474        let namespace_repo = NamespaceRepository::new(pool.clone());
475        let namespace = namespace_repo
476            .get_or_create("digest-test", "digest-test")
477            .await
478            .unwrap();
479        let repo = MemoryRepository::new(pool.clone());
480        (pool, repo, namespace.id)
481    }
482
483    fn session_metadata(session_key: &str, level: CognitiveLevel) -> serde_json::Value {
484        let cognitive = CognitiveMetadata::new(
485            level,
486            "claude-code",
487            "claude-code",
488            Some(session_key.to_string()),
489            "derive_service",
490        );
491        cognitive.merge_into(&serde_json::json!({}))
492    }
493
494    async fn store_session_memory(
495        repo: &MemoryRepository,
496        namespace_id: i64,
497        content: &str,
498        session_key: &str,
499        level: CognitiveLevel,
500    ) -> Memory {
501        repo.store(StoreMemoryParams {
502            namespace_id,
503            content,
504            category: &MemoryCategory::Session,
505            memory_lane_type: Some(&MemoryLaneType::Priority(MemoryLanePriorityType::Decision)),
506            labels: &["test".to_string()],
507            metadata: &session_metadata(session_key, level),
508            embedding: None,
509            embedding_model: None,
510        })
511        .await
512        .unwrap()
513    }
514
515    fn good_digest_response() -> GenerateResponse {
516        GenerateResponse {
517            content: r#"{"short":"Fixed query pagination and added working-set retrieval.","long":"Session focused on fixing query pagination behavior in the memory system. The team tightened the ranking logic and added bounded working-set retrieval. New digest infrastructure was introduced to track session summaries."}"#
518                .to_string(),
519            model: "mock-model".to_string(),
520            usage: None,
521        }
522    }
523
524    #[tokio::test]
525    async fn test_digest_session_errors_on_empty_session() {
526        let (_pool, repo, namespace_id) = setup_repo().await;
527        let service = DigestService::new(
528            AgentConfig::default(),
529            Arc::new(MockLlmClient::new(Vec::new())),
530            None,
531        );
532
533        let result = service
534            .digest_session(namespace_id, "empty-session", &repo, false)
535            .await;
536        assert!(result.is_err());
537        let err = result.unwrap_err().to_string();
538        assert!(err.contains("No non-raw memories"));
539    }
540
541    #[tokio::test]
542    async fn test_digest_session_creates_short_and_long() {
543        let (pool, repo, namespace_id) = setup_repo().await;
544        store_session_memory(
545            &repo,
546            namespace_id,
547            "Fixed query pagination behavior.",
548            "session-1",
549            CognitiveLevel::Explicit,
550        )
551        .await;
552        store_session_memory(
553            &repo,
554            namespace_id,
555            "Added working-set retrieval primitives.",
556            "session-1",
557            CognitiveLevel::Explicit,
558        )
559        .await;
560
561        let service = DigestService::new(
562            AgentConfig::default(),
563            Arc::new(MockLlmClient::new(vec![Ok(good_digest_response())])),
564            None,
565        );
566
567        let result = service
568            .digest_session(namespace_id, "session-1", &repo, false)
569            .await
570            .unwrap();
571
572        assert_eq!(result.source_count, 2);
573        assert!(result.short_id > 0);
574        assert!(result.long_id > 0);
575
576        let short = repo.get_by_id(result.short_id).await.unwrap().unwrap();
577        assert_eq!(
578            cognitive_level_from_metadata(&short.metadata),
579            CognitiveLevel::SummaryShort
580        );
581        assert!(short.content.contains("pagination"));
582
583        let long = repo.get_by_id(result.long_id).await.unwrap().unwrap();
584        assert_eq!(
585            cognitive_level_from_metadata(&long.metadata),
586            CognitiveLevel::SummaryLong
587        );
588        assert!(long.content.contains("digest"));
589
590        // Verify session_digests registrations
591        let short_digests: i64 =
592            sqlx::query_scalar("SELECT COUNT(*) FROM session_digests WHERE digest_kind = ?")
593                .bind(DIGEST_KIND_SHORT)
594                .fetch_one(&pool)
595                .await
596                .unwrap();
597        let long_digests: i64 =
598            sqlx::query_scalar("SELECT COUNT(*) FROM session_digests WHERE digest_kind = ?")
599                .bind(DIGEST_KIND_LONG)
600                .fetch_one(&pool)
601                .await
602                .unwrap();
603        assert_eq!(short_digests, 1);
604        assert_eq!(long_digests, 1);
605    }
606
607    #[tokio::test]
608    async fn test_digest_session_falls_back_on_llm_failure() {
609        let (_pool, repo, namespace_id) = setup_repo().await;
610        store_session_memory(
611            &repo,
612            namespace_id,
613            "Implemented digest service with fallback behavior.",
614            "session-2",
615            CognitiveLevel::Explicit,
616        )
617        .await;
618
619        let bad_response = GenerateResponse {
620            content: "not valid json at all".to_string(),
621            model: "mock-model".to_string(),
622            usage: None,
623        };
624        let service = DigestService::new(
625            AgentConfig::default(),
626            Arc::new(MockLlmClient::new(vec![Ok(bad_response)])),
627            None,
628        );
629
630        let result = service
631            .digest_session(namespace_id, "session-2", &repo, false)
632            .await
633            .unwrap();
634
635        assert_eq!(result.source_count, 1);
636
637        let short = repo.get_by_id(result.short_id).await.unwrap().unwrap();
638        assert_eq!(
639            cognitive_level_from_metadata(&short.metadata),
640            CognitiveLevel::SummaryShort
641        );
642        // Fallback content comes from source memories
643        assert!(short.content.len() <= SHORT_MAX_CHARS);
644
645        let long = repo.get_by_id(result.long_id).await.unwrap().unwrap();
646        assert!(long.content.contains("digest service"));
647    }
648
649    #[tokio::test]
650    async fn test_digest_session_is_idempotent() {
651        let (_pool, repo, namespace_id) = setup_repo().await;
652        store_session_memory(
653            &repo,
654            namespace_id,
655            "Refactored the memory consolidation pipeline.",
656            "session-3",
657            CognitiveLevel::Explicit,
658        )
659        .await;
660
661        let service = DigestService::new(
662            AgentConfig::default(),
663            Arc::new(MockLlmClient::new(vec![Ok(good_digest_response())])),
664            None,
665        );
666
667        let first = service
668            .digest_session(namespace_id, "session-3", &repo, false)
669            .await
670            .unwrap();
671        let second = service
672            .digest_session(namespace_id, "session-3", &repo, false)
673            .await
674            .unwrap();
675
676        assert_eq!(first.short_id, second.short_id);
677        assert_eq!(first.long_id, second.long_id);
678    }
679
680    #[tokio::test]
681    async fn test_digest_session_force_respects_existing() {
682        let (pool, repo, namespace_id) = setup_repo().await;
683        store_session_memory(
684            &repo,
685            namespace_id,
686            "Added token counting to digest service.",
687            "session-4",
688            CognitiveLevel::Explicit,
689        )
690        .await;
691
692        let service = DigestService::new(
693            AgentConfig::default(),
694            Arc::new(MockLlmClient::new(vec![
695                Ok(good_digest_response()),
696                Ok(good_digest_response()),
697            ])),
698            None,
699        );
700
701        let first = service
702            .digest_session(namespace_id, "session-4", &repo, false)
703            .await
704            .unwrap();
705        let forced = service
706            .digest_session(namespace_id, "session-4", &repo, true)
707            .await
708            .unwrap();
709
710        // Force flag does not regenerate if digests already exist; IDs should be equal
711        assert_eq!(first.short_id, forced.short_id);
712        assert_eq!(first.long_id, forced.long_id);
713
714        // No new session_digest rows should be created; counts remain 2 (short+long)
715        let total_digests: i64 =
716            sqlx::query_scalar("SELECT COUNT(*) FROM session_digests WHERE session_key = ?")
717                .bind("session-4")
718                .fetch_one(&pool)
719                .await
720                .unwrap();
721        assert_eq!(total_digests, 2);
722
723        let latest_short = repo
724            .latest_digest_for_session(namespace_id, "session-4", DIGEST_KIND_SHORT)
725            .await
726            .unwrap()
727            .unwrap();
728        let latest_long = repo
729            .latest_digest_for_session(namespace_id, "session-4", DIGEST_KIND_LONG)
730            .await
731            .unwrap()
732            .unwrap();
733        assert_eq!(latest_short.id, forced.short_id);
734        assert_eq!(latest_long.id, forced.long_id);
735    }
736
737    #[tokio::test]
738    async fn test_digest_session_ignores_raw_memories() {
739        let (_pool, repo, namespace_id) = setup_repo().await;
740        store_session_memory(
741            &repo,
742            namespace_id,
743            "Raw noise from hook capture",
744            "session-5",
745            CognitiveLevel::Raw,
746        )
747        .await;
748
749        let service = DigestService::new(
750            AgentConfig::default(),
751            Arc::new(MockLlmClient::new(Vec::new())),
752            None,
753        );
754
755        let result = service
756            .digest_session(namespace_id, "session-5", &repo, false)
757            .await;
758        assert!(result.is_err());
759    }
760
761    #[tokio::test]
762    async fn test_digest_session_ignores_other_sessions() {
763        let (_pool, repo, namespace_id) = setup_repo().await;
764        store_session_memory(
765            &repo,
766            namespace_id,
767            "Memory from a different session entirely.",
768            "other-session",
769            CognitiveLevel::Explicit,
770        )
771        .await;
772
773        let service = DigestService::new(
774            AgentConfig::default(),
775            Arc::new(MockLlmClient::new(Vec::new())),
776            None,
777        );
778
779        let result = service
780            .digest_session(namespace_id, "session-6", &repo, false)
781            .await;
782        assert!(result.is_err());
783    }
784
785    #[tokio::test]
786    async fn test_digest_session_creates_evidence_lineage() {
787        let (_pool, repo, namespace_id) = setup_repo().await;
788        let m1 = store_session_memory(
789            &repo,
790            namespace_id,
791            "Source memory one.",
792            "session-7",
793            CognitiveLevel::Explicit,
794        )
795        .await;
796        let m2 = store_session_memory(
797            &repo,
798            namespace_id,
799            "Source memory two.",
800            "session-7",
801            CognitiveLevel::Explicit,
802        )
803        .await;
804
805        let service = DigestService::new(
806            AgentConfig::default(),
807            Arc::new(MockLlmClient::new(vec![Ok(good_digest_response())])),
808            None,
809        );
810
811        let result = service
812            .digest_session(namespace_id, "session-7", &repo, false)
813            .await
814            .unwrap();
815
816        let short_lineage = repo.load_lineage(result.short_id).await.unwrap();
817        assert_eq!(short_lineage.len(), 2);
818        let short_sources: Vec<i64> = short_lineage.iter().map(|e| e.source_memory_id).collect();
819        assert!(short_sources.contains(&m1.id));
820        assert!(short_sources.contains(&m2.id));
821        assert_eq!(short_lineage[0].evidence_role, DIGESTED_FROM_ROLE);
822
823        let long_lineage = repo.load_lineage(result.long_id).await.unwrap();
824        assert_eq!(long_lineage.len(), 2);
825    }
826
827    #[test]
828    fn test_truncate_within_limit() {
829        assert_eq!(truncate("hello", 10), "hello");
830    }
831
832    #[test]
833    fn test_truncate_at_boundary() {
834        let input = "a".repeat(50);
835        assert_eq!(truncate(&input, 50), input);
836        assert_eq!(truncate(&input, 30).len(), 30);
837    }
838
839    #[test]
840    fn test_truncate_splits_at_word() {
841        let input = "the quick brown fox jumps over the lazy dog";
842        let result = truncate(input, 20);
843        assert!(result.len() <= 20);
844        assert!(!result.ends_with(' ') || result.is_empty());
845    }
846
847    #[test]
848    fn test_estimate_tokens() {
849        assert_eq!(estimate_tokens("hello world"), 2);
850        assert_eq!(estimate_tokens(""), 0);
851        assert_eq!(estimate_tokens("  spaced  out  "), 2);
852    }
853
854    fn test_memory(id: i64, content: &str) -> Memory {
855        Memory {
856            id,
857            namespace_id: 1,
858            content: content.to_string(),
859            category: MemoryCategory::Session,
860            memory_lane_type: None,
861            labels: vec![],
862            metadata: serde_json::json!({}),
863            similarity_score: None,
864            relevance_score: None,
865            content_embedding: None,
866            embedding_model: None,
867            created_at: Utc::now(),
868            updated_at: None,
869            last_accessed: None,
870            is_active: true,
871            is_archived: false,
872            access_count: 0,
873        }
874    }
875
876    #[tokio::test]
877    async fn test_digest_memories_get_embeddings_when_service_provided() {
878        let (_pool, repo, namespace_id) = setup_repo().await;
879        store_session_memory(
880            &repo,
881            namespace_id,
882            "Fixed query pagination behavior.",
883            "session-embed",
884            CognitiveLevel::Explicit,
885        )
886        .await;
887        store_session_memory(
888            &repo,
889            namespace_id,
890            "Added working-set retrieval primitives.",
891            "session-embed",
892            CognitiveLevel::Explicit,
893        )
894        .await;
895
896        let mock_embed = nexus_embeddings::MockEmbeddingService::new();
897        let service = DigestService::new(
898            AgentConfig::default(),
899            Arc::new(MockLlmClient::new(vec![Ok(good_digest_response())])),
900            Some(Arc::new(mock_embed)),
901        );
902
903        let result = service
904            .digest_session(namespace_id, "session-embed", &repo, false)
905            .await
906            .unwrap();
907
908        let short = repo.get_by_id(result.short_id).await.unwrap().unwrap();
909        assert!(
910            short.content_embedding.is_some(),
911            "short digest should have an embedding when service is provided"
912        );
913        assert_eq!(
914            short.content_embedding.as_ref().unwrap().len(),
915            384,
916            "short digest embedding dimension should be 384"
917        );
918
919        let long = repo.get_by_id(result.long_id).await.unwrap().unwrap();
920        assert!(
921            long.content_embedding.is_some(),
922            "long digest should have an embedding when service is provided"
923        );
924        assert_eq!(
925            long.content_embedding.as_ref().unwrap().len(),
926            384,
927            "long digest embedding dimension should be 384"
928        );
929    }
930
931    #[tokio::test]
932    async fn test_digest_memories_stored_without_embedding_when_service_absent() {
933        let (_pool, repo, namespace_id) = setup_repo().await;
934        store_session_memory(
935            &repo,
936            namespace_id,
937            "Fixed query pagination behavior.",
938            "session-no-embed",
939            CognitiveLevel::Explicit,
940        )
941        .await;
942        store_session_memory(
943            &repo,
944            namespace_id,
945            "Added working-set retrieval primitives.",
946            "session-no-embed",
947            CognitiveLevel::Explicit,
948        )
949        .await;
950
951        let service = DigestService::new(
952            AgentConfig::default(),
953            Arc::new(MockLlmClient::new(vec![Ok(good_digest_response())])),
954            None,
955        );
956
957        let result = service
958            .digest_session(namespace_id, "session-no-embed", &repo, false)
959            .await
960            .unwrap();
961
962        let short = repo.get_by_id(result.short_id).await.unwrap().unwrap();
963        assert!(
964            short.content_embedding.is_none(),
965            "short digest should NOT have embedding when no service provided"
966        );
967
968        let long = repo.get_by_id(result.long_id).await.unwrap().unwrap();
969        assert!(
970            long.content_embedding.is_none(),
971            "long digest should NOT have embedding when no service provided"
972        );
973    }
974
975    #[test]
976    fn test_fallback_short_uses_first_five_memories() {
977        let memories = vec![
978            test_memory(1, "First memory content here."),
979            test_memory(2, "Second memory."),
980        ];
981        let result = fallback_short(&memories);
982        assert!(result.contains("First memory"));
983        assert!(result.contains("Second memory"));
984    }
985
986    #[test]
987    fn test_fallback_long_concatenates_all_memories() {
988        let memories = vec![
989            test_memory(1, "Alpha."),
990            test_memory(2, "Beta."),
991            test_memory(3, "Gamma."),
992        ];
993        let result = fallback_long(&memories);
994        assert!(result.contains("Alpha"));
995        assert!(result.contains("Beta"));
996        assert!(result.contains("Gamma"));
997    }
998}