Skip to main content

nexus_memory_agent/
digest.rs

1//! Digest service - produces short and long session summaries.
2
3use std::sync::Arc;
4use std::time::Instant;
5
6use nexus_core::config::AgentConfig;
7use nexus_core::traits::EmbeddingService;
8use nexus_core::{
9    cognitive_level_from_metadata, CognitiveLevel, CognitiveMetadata, Memory, MemoryCategory,
10    MemoryLaneCognitiveType, MemoryLaneType, PerspectiveKey,
11};
12use nexus_llm::{ChatMessage, GenerateParams, LlmClient, TokenUsage};
13use nexus_storage::repository::{
14    MemoryRepository, StoreDigestParams, StoreMemoryParams, StoreMemoryWithLineageParams,
15};
16use tracing::{debug, info, warn};
17
18use crate::error::AgentError;
19use crate::prompts::{digest_user_prompt, DIGEST_SYSTEM_PROMPT};
20use crate::util::{
21    flush_metric_samples, maybe_embed, parse_json_response, stage_metric_sample,
22    token_usage_metric_samples,
23};
24
25const DIGEST_MAX_TOKENS: u32 = 4096;
26const DIGEST_MAX_SOURCE_MEMORIES: i64 = 200;
27const DIGEST_GENERATED_BY: &str = "digest_service";
28const DIGEST_KIND_SHORT: &str = "short";
29const DIGEST_KIND_LONG: &str = "long";
30const SHORT_MAX_CHARS: usize = 300;
31const LONG_MAX_CHARS: usize = 1500;
32const DIGESTED_FROM_ROLE: &str = "digested_from";
33
34/// Result of a session digest operation.
35#[derive(Debug, Clone)]
36pub struct DigestResult {
37    pub short_id: i64,
38    pub long_id: i64,
39    pub source_count: usize,
40}
41
42/// Parsed LLM digest output.
43#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
44struct DigestEnvelope {
45    short: String,
46    long: String,
47}
48
49pub struct DigestService {
50    config: AgentConfig,
51    llm: Arc<dyn LlmClient>,
52    embeddings: Option<Arc<dyn EmbeddingService>>,
53}
54
55impl DigestService {
56    pub fn new(
57        config: AgentConfig,
58        llm: Arc<dyn LlmClient>,
59        embeddings: Option<Arc<dyn EmbeddingService>>,
60    ) -> Self {
61        Self {
62            config,
63            llm,
64            embeddings,
65        }
66    }
67
68    /// Produce short and long digests for a session.
69    ///
70    /// Idempotent: if digests already exist for the session and `force` is false,
71    /// returns the existing digest IDs without regenerating.
72    pub async fn digest_session(
73        &self,
74        namespace_id: i64,
75        session_key: &str,
76        repo: &MemoryRepository,
77        force: bool,
78    ) -> Result<DigestResult, AgentError> {
79        let total_started = Instant::now();
80        let mut metrics = Vec::new();
81        // Idempotency: return existing digests if present
82        if !force {
83            if let Some(existing) = existing_digest_ids(repo, namespace_id, session_key).await? {
84                debug!(
85                    namespace_id,
86                    session_key,
87                    short_id = existing.short_id,
88                    long_id = existing.long_id,
89                    "Reusing existing session digests"
90                );
91                return Ok(existing);
92            }
93        }
94
95        let gather_started = Instant::now();
96        let memories = gather_session_memories(repo, namespace_id, session_key).await?;
97        metrics.push(stage_metric_sample(
98            namespace_id,
99            "cognition.digest.gather_ms",
100            gather_started.elapsed().as_secs_f64() * 1000.0,
101            "gather",
102        ));
103        if memories.is_empty() {
104            return Err(AgentError::Digest(format!(
105                "No non-raw memories found for session \"{}\"",
106                session_key
107            )));
108        }
109
110        let source_count = memories.len();
111        let source_ids: Vec<i64> = memories.iter().map(|m| m.id).collect();
112        let min_id = source_ids.iter().copied().min().unwrap_or(0);
113        let max_id = source_ids.iter().copied().max().unwrap_or(0);
114
115        let produce_started = Instant::now();
116        let (envelope, usage) = produce_digests(self.llm.as_ref(), session_key, &memories).await;
117        metrics.push(stage_metric_sample(
118            namespace_id,
119            "cognition.digest.produce_ms",
120            produce_started.elapsed().as_secs_f64() * 1000.0,
121            "produce",
122        ));
123        metrics.extend(token_usage_metric_samples(
124            namespace_id,
125            "cognition.digest.produce",
126            "produce",
127            usage.as_ref(),
128        ));
129        let short_content = truncate(envelope.short.trim(), SHORT_MAX_CHARS);
130        let long_content = truncate(envelope.long.trim(), LONG_MAX_CHARS);
131
132        let perspective = PerspectiveKey {
133            observer: self.config.namespace.clone(),
134            subject: self.config.namespace.clone(),
135            session_key: Some(session_key.to_string()),
136        };
137
138        let embeddings = self.embeddings.as_deref();
139        let short_memory = store_digest_memory(
140            repo,
141            namespace_id,
142            &short_content,
143            CognitiveLevel::SummaryShort,
144            &perspective,
145            &source_ids,
146            embeddings,
147        )
148        .await?;
149
150        let long_memory = store_digest_memory(
151            repo,
152            namespace_id,
153            &long_content,
154            CognitiveLevel::SummaryLong,
155            &perspective,
156            &source_ids,
157            embeddings,
158        )
159        .await?;
160
161        let short_tokens = estimate_tokens(&short_content);
162        let long_tokens = estimate_tokens(&long_content);
163
164        let store_started = Instant::now();
165        repo.store_digest(StoreDigestParams {
166            namespace_id,
167            session_key,
168            digest_kind: DIGEST_KIND_SHORT,
169            memory_id: short_memory.id,
170            start_memory_id: Some(min_id),
171            end_memory_id: Some(max_id),
172            token_count: short_tokens,
173        })
174        .await
175        .map_err(|e| AgentError::Storage(e.to_string()))?;
176
177        repo.store_digest(StoreDigestParams {
178            namespace_id,
179            session_key,
180            digest_kind: DIGEST_KIND_LONG,
181            memory_id: long_memory.id,
182            start_memory_id: Some(min_id),
183            end_memory_id: Some(max_id),
184            token_count: long_tokens,
185        })
186        .await
187        .map_err(|e| AgentError::Storage(e.to_string()))?;
188        metrics.push(stage_metric_sample(
189            namespace_id,
190            "cognition.digest.store_ms",
191            store_started.elapsed().as_secs_f64() * 1000.0,
192            "store",
193        ));
194
195        info!(
196            namespace_id,
197            session_key,
198            short_id = short_memory.id,
199            long_id = long_memory.id,
200            source_count,
201            "Created session digests"
202        );
203        metrics.push(stage_metric_sample(
204            namespace_id,
205            "cognition.digest.total_ms",
206            total_started.elapsed().as_secs_f64() * 1000.0,
207            "total",
208        ));
209        flush_metric_samples(repo, &metrics).await;
210
211        Ok(DigestResult {
212            short_id: short_memory.id,
213            long_id: long_memory.id,
214            source_count,
215        })
216    }
217}
218
219// ---------------------------------------------------------------------------
220// Helpers
221// ---------------------------------------------------------------------------
222
223async fn existing_digest_ids(
224    repo: &MemoryRepository,
225    namespace_id: i64,
226    session_key: &str,
227) -> Result<Option<DigestResult>, AgentError> {
228    let short = repo
229        .latest_digest_for_session(namespace_id, session_key, DIGEST_KIND_SHORT)
230        .await
231        .map_err(|e| AgentError::Storage(e.to_string()))?;
232
233    let long = repo
234        .latest_digest_for_session(namespace_id, session_key, DIGEST_KIND_LONG)
235        .await
236        .map_err(|e| AgentError::Storage(e.to_string()))?;
237
238    match (short, long) {
239        (Some(s), Some(l)) => Ok(Some(DigestResult {
240            short_id: s.id,
241            long_id: l.id,
242            source_count: 0,
243        })),
244        _ => Ok(None),
245    }
246}
247
248async fn gather_session_memories(
249    repo: &MemoryRepository,
250    namespace_id: i64,
251    session_key: &str,
252) -> Result<Vec<Memory>, AgentError> {
253    let matching = repo
254        .list_by_session_key(namespace_id, session_key, DIGEST_MAX_SOURCE_MEMORIES, false)
255        .await
256        .map_err(|e| AgentError::Storage(e.to_string()))?;
257
258    Ok(matching
259        .into_iter()
260        .filter(|m| {
261            let level = cognitive_level_from_metadata(&m.metadata);
262            if matches!(
263                level,
264                CognitiveLevel::Raw | CognitiveLevel::SummaryShort | CognitiveLevel::SummaryLong
265            ) {
266                return false;
267            }
268            true
269        })
270        .collect())
271}
272
273async fn produce_digests(
274    llm: &dyn LlmClient,
275    session_key: &str,
276    memories: &[Memory],
277) -> (DigestEnvelope, Option<TokenUsage>) {
278    let pairs: Vec<(i64, &str)> = memories
279        .iter()
280        .map(|m| (m.id, m.content.as_str()))
281        .collect();
282    let user_msg = digest_user_prompt(session_key, &pairs);
283
284    let params = GenerateParams {
285        messages: vec![
286            ChatMessage::system(DIGEST_SYSTEM_PROMPT),
287            ChatMessage::user(user_msg),
288        ],
289        max_tokens: DIGEST_MAX_TOKENS,
290        temperature: 0.2,
291        json_mode: true,
292    };
293
294    match llm.generate(params).await {
295        Ok(response) => {
296            let usage = response.usage.clone();
297            match parse_json_response::<DigestEnvelope>(&response) {
298                Ok(envelope)
299                    if envelope.short.trim().is_empty() && envelope.long.trim().is_empty() =>
300                {
301                    warn!("LLM returned empty digest, using fallback");
302                    (fallback_digest(memories), usage)
303                }
304                Ok(envelope) => (envelope, usage),
305                Err(error) => {
306                    warn!(%error, "LLM digest response was invalid JSON, using fallback");
307                    (fallback_digest(memories), usage)
308                }
309            }
310        }
311        Err(error) => {
312            warn!(%error, "LLM digest call failed, using fallback");
313            (fallback_digest(memories), None)
314        }
315    }
316}
317
318fn fallback_digest(memories: &[Memory]) -> DigestEnvelope {
319    let short = fallback_short(memories);
320    let long = fallback_long(memories);
321    DigestEnvelope { short, long }
322}
323
324fn fallback_short(memories: &[Memory]) -> String {
325    let combined: String = memories
326        .iter()
327        .take(5)
328        .map(|m| m.content.trim())
329        .filter(|s| !s.is_empty())
330        .collect::<Vec<_>>()
331        .join(" ");
332
333    truncate(&combined, SHORT_MAX_CHARS)
334}
335
336fn fallback_long(memories: &[Memory]) -> String {
337    let combined: String = memories
338        .iter()
339        .map(|m| m.content.trim())
340        .filter(|s| !s.is_empty())
341        .collect::<Vec<_>>()
342        .join(" ");
343
344    truncate(&combined, LONG_MAX_CHARS)
345}
346
347async fn store_digest_memory(
348    repo: &MemoryRepository,
349    namespace_id: i64,
350    content: &str,
351    level: CognitiveLevel,
352    perspective: &PerspectiveKey,
353    source_ids: &[i64],
354    embeddings: Option<&dyn EmbeddingService>,
355) -> Result<Memory, AgentError> {
356    let mut cognitive = CognitiveMetadata::new(
357        level,
358        perspective.observer.clone(),
359        perspective.subject.clone(),
360        perspective.session_key.clone(),
361        DIGEST_GENERATED_BY,
362    );
363    cognitive.source_memory_ids = source_ids.to_vec();
364    cognitive.confidence = Some(0.80);
365
366    let metadata = cognitive.merge_into(&serde_json::json!({}));
367    let (embedding, embedding_model) = maybe_embed(embeddings, content).await;
368
369    let memory = repo
370        .store_with_lineage(StoreMemoryWithLineageParams {
371            store: StoreMemoryParams {
372                namespace_id,
373                content,
374                category: &MemoryCategory::Session,
375                memory_lane_type: Some(&MemoryLaneType::Cognitive(
376                    MemoryLaneCognitiveType::Explicit,
377                )),
378                labels: &["digest".to_string(), level.to_string().to_lowercase()],
379                metadata: &metadata,
380                embedding: embedding.as_deref(),
381                embedding_model: embedding_model.as_deref(),
382            },
383            source_memory_ids: source_ids,
384            evidence_role: DIGESTED_FROM_ROLE,
385        })
386        .await
387        .map_err(|e| AgentError::Storage(e.to_string()))?;
388
389    Ok(memory)
390}
391
392fn truncate(s: &str, max_chars: usize) -> String {
393    if s.len() <= max_chars {
394        return s.to_string();
395    }
396    let mut end = max_chars;
397    while end > 0 && !s.is_char_boundary(end) {
398        end -= 1;
399    }
400    let mut truncated = s[..end].to_string();
401    if let Some(last_space) = truncated.rfind(' ') {
402        truncated.truncate(last_space);
403    }
404    truncated
405}
406
407fn estimate_tokens(s: &str) -> usize {
408    s.split_whitespace().count()
409}
410
411// ---------------------------------------------------------------------------
412// Tests
413// ---------------------------------------------------------------------------
414
415#[cfg(test)]
416mod tests {
417    use super::*;
418
419    use std::collections::VecDeque;
420    use std::sync::Mutex;
421
422    use async_trait::async_trait;
423    use chrono::Utc;
424    use nexus_core::MemoryLanePriorityType;
425    use nexus_llm::GenerateResponse;
426    use nexus_storage::repository::NamespaceRepository;
427    use sqlx::sqlite::SqlitePoolOptions;
428
429    struct MockLlmClient {
430        responses: Mutex<VecDeque<nexus_llm::Result<GenerateResponse>>>,
431    }
432
433    impl MockLlmClient {
434        fn new(responses: Vec<nexus_llm::Result<GenerateResponse>>) -> Self {
435            Self {
436                responses: Mutex::new(VecDeque::from(responses)),
437            }
438        }
439    }
440
441    #[async_trait]
442    impl LlmClient for MockLlmClient {
443        async fn generate(&self, _params: GenerateParams) -> nexus_llm::Result<GenerateResponse> {
444            self.responses
445                .lock()
446                .expect("mock responses poisoned")
447                .pop_front()
448                .expect("mock response missing")
449        }
450
451        fn provider_name(&self) -> String {
452            "mock".to_string()
453        }
454
455        fn model_name(&self) -> String {
456            "mock-model".to_string()
457        }
458    }
459
460    async fn setup_repo() -> (sqlx::SqlitePool, MemoryRepository, i64) {
461        let pool = SqlitePoolOptions::new()
462            .max_connections(1)
463            .connect("sqlite::memory:")
464            .await
465            .unwrap();
466        nexus_storage::migrations::run_migrations(&pool)
467            .await
468            .unwrap();
469        let namespace_repo = NamespaceRepository::new(pool.clone());
470        let namespace = namespace_repo
471            .get_or_create("digest-test", "digest-test")
472            .await
473            .unwrap();
474        let repo = MemoryRepository::new(pool.clone());
475        (pool, repo, namespace.id)
476    }
477
478    fn session_metadata(session_key: &str, level: CognitiveLevel) -> serde_json::Value {
479        let cognitive = CognitiveMetadata::new(
480            level,
481            "claude-code",
482            "claude-code",
483            Some(session_key.to_string()),
484            "derive_service",
485        );
486        cognitive.merge_into(&serde_json::json!({}))
487    }
488
489    async fn store_session_memory(
490        repo: &MemoryRepository,
491        namespace_id: i64,
492        content: &str,
493        session_key: &str,
494        level: CognitiveLevel,
495    ) -> Memory {
496        repo.store(StoreMemoryParams {
497            namespace_id,
498            content,
499            category: &MemoryCategory::Session,
500            memory_lane_type: Some(&MemoryLaneType::Priority(MemoryLanePriorityType::Decision)),
501            labels: &["test".to_string()],
502            metadata: &session_metadata(session_key, level),
503            embedding: None,
504            embedding_model: None,
505        })
506        .await
507        .unwrap()
508    }
509
510    fn good_digest_response() -> GenerateResponse {
511        GenerateResponse {
512            content: r#"{"short":"Fixed query pagination and added working-set retrieval.","long":"Session focused on fixing query pagination behavior in the memory system. The team tightened the ranking logic and added bounded working-set retrieval. New digest infrastructure was introduced to track session summaries."}"#
513                .to_string(),
514            model: "mock-model".to_string(),
515            usage: None,
516        }
517    }
518
519    #[tokio::test]
520    async fn test_digest_session_errors_on_empty_session() {
521        let (_pool, repo, namespace_id) = setup_repo().await;
522        let service = DigestService::new(
523            AgentConfig::default(),
524            Arc::new(MockLlmClient::new(Vec::new())),
525            None,
526        );
527
528        let result = service
529            .digest_session(namespace_id, "empty-session", &repo, false)
530            .await;
531        assert!(result.is_err());
532        let err = result.unwrap_err().to_string();
533        assert!(err.contains("No non-raw memories"));
534    }
535
536    #[tokio::test]
537    async fn test_digest_session_creates_short_and_long() {
538        let (pool, repo, namespace_id) = setup_repo().await;
539        store_session_memory(
540            &repo,
541            namespace_id,
542            "Fixed query pagination behavior.",
543            "session-1",
544            CognitiveLevel::Explicit,
545        )
546        .await;
547        store_session_memory(
548            &repo,
549            namespace_id,
550            "Added working-set retrieval primitives.",
551            "session-1",
552            CognitiveLevel::Explicit,
553        )
554        .await;
555
556        let service = DigestService::new(
557            AgentConfig::default(),
558            Arc::new(MockLlmClient::new(vec![Ok(good_digest_response())])),
559            None,
560        );
561
562        let result = service
563            .digest_session(namespace_id, "session-1", &repo, false)
564            .await
565            .unwrap();
566
567        assert_eq!(result.source_count, 2);
568        assert!(result.short_id > 0);
569        assert!(result.long_id > 0);
570
571        let short = repo.get_by_id(result.short_id).await.unwrap().unwrap();
572        assert_eq!(
573            cognitive_level_from_metadata(&short.metadata),
574            CognitiveLevel::SummaryShort
575        );
576        assert!(short.content.contains("pagination"));
577
578        let long = repo.get_by_id(result.long_id).await.unwrap().unwrap();
579        assert_eq!(
580            cognitive_level_from_metadata(&long.metadata),
581            CognitiveLevel::SummaryLong
582        );
583        assert!(long.content.contains("digest"));
584
585        // Verify session_digests registrations
586        let short_digests: i64 =
587            sqlx::query_scalar("SELECT COUNT(*) FROM session_digests WHERE digest_kind = ?")
588                .bind(DIGEST_KIND_SHORT)
589                .fetch_one(&pool)
590                .await
591                .unwrap();
592        let long_digests: i64 =
593            sqlx::query_scalar("SELECT COUNT(*) FROM session_digests WHERE digest_kind = ?")
594                .bind(DIGEST_KIND_LONG)
595                .fetch_one(&pool)
596                .await
597                .unwrap();
598        assert_eq!(short_digests, 1);
599        assert_eq!(long_digests, 1);
600    }
601
602    #[tokio::test]
603    async fn test_digest_session_falls_back_on_llm_failure() {
604        let (_pool, repo, namespace_id) = setup_repo().await;
605        store_session_memory(
606            &repo,
607            namespace_id,
608            "Implemented digest service with fallback behavior.",
609            "session-2",
610            CognitiveLevel::Explicit,
611        )
612        .await;
613
614        let bad_response = GenerateResponse {
615            content: "not valid json at all".to_string(),
616            model: "mock-model".to_string(),
617            usage: None,
618        };
619        let service = DigestService::new(
620            AgentConfig::default(),
621            Arc::new(MockLlmClient::new(vec![Ok(bad_response)])),
622            None,
623        );
624
625        let result = service
626            .digest_session(namespace_id, "session-2", &repo, false)
627            .await
628            .unwrap();
629
630        assert_eq!(result.source_count, 1);
631
632        let short = repo.get_by_id(result.short_id).await.unwrap().unwrap();
633        assert_eq!(
634            cognitive_level_from_metadata(&short.metadata),
635            CognitiveLevel::SummaryShort
636        );
637        // Fallback content comes from source memories
638        assert!(short.content.len() <= SHORT_MAX_CHARS);
639
640        let long = repo.get_by_id(result.long_id).await.unwrap().unwrap();
641        assert!(long.content.contains("digest service"));
642    }
643
644    #[tokio::test]
645    async fn test_digest_session_is_idempotent() {
646        let (_pool, repo, namespace_id) = setup_repo().await;
647        store_session_memory(
648            &repo,
649            namespace_id,
650            "Refactored the memory consolidation pipeline.",
651            "session-3",
652            CognitiveLevel::Explicit,
653        )
654        .await;
655
656        let service = DigestService::new(
657            AgentConfig::default(),
658            Arc::new(MockLlmClient::new(vec![Ok(good_digest_response())])),
659            None,
660        );
661
662        let first = service
663            .digest_session(namespace_id, "session-3", &repo, false)
664            .await
665            .unwrap();
666        let second = service
667            .digest_session(namespace_id, "session-3", &repo, false)
668            .await
669            .unwrap();
670
671        assert_eq!(first.short_id, second.short_id);
672        assert_eq!(first.long_id, second.long_id);
673    }
674
675    #[tokio::test]
676    async fn test_digest_session_force_regenerates() {
677        let (pool, repo, namespace_id) = setup_repo().await;
678        store_session_memory(
679            &repo,
680            namespace_id,
681            "Added token counting to digest service.",
682            "session-4",
683            CognitiveLevel::Explicit,
684        )
685        .await;
686
687        let service = DigestService::new(
688            AgentConfig::default(),
689            Arc::new(MockLlmClient::new(vec![
690                Ok(good_digest_response()),
691                Ok(good_digest_response()),
692            ])),
693            None,
694        );
695
696        let first = service
697            .digest_session(namespace_id, "session-4", &repo, false)
698            .await
699            .unwrap();
700        let forced = service
701            .digest_session(namespace_id, "session-4", &repo, true)
702            .await
703            .unwrap();
704
705        // Force creates new memories with different IDs
706        assert_ne!(first.short_id, forced.short_id);
707        assert_ne!(first.long_id, forced.long_id);
708
709        // Force regeneration replaces the existing digest pointers rather than
710        // summarizing prior digests recursively.
711        let total_digests: i64 =
712            sqlx::query_scalar("SELECT COUNT(*) FROM session_digests WHERE session_key = ?")
713                .bind("session-4")
714                .fetch_one(&pool)
715                .await
716                .unwrap();
717        assert_eq!(total_digests, 2);
718
719        let latest_short = repo
720            .latest_digest_for_session(namespace_id, "session-4", DIGEST_KIND_SHORT)
721            .await
722            .unwrap()
723            .unwrap();
724        let latest_long = repo
725            .latest_digest_for_session(namespace_id, "session-4", DIGEST_KIND_LONG)
726            .await
727            .unwrap()
728            .unwrap();
729        assert_eq!(latest_short.id, forced.short_id);
730        assert_eq!(latest_long.id, forced.long_id);
731    }
732
733    #[tokio::test]
734    async fn test_digest_session_ignores_raw_memories() {
735        let (_pool, repo, namespace_id) = setup_repo().await;
736        store_session_memory(
737            &repo,
738            namespace_id,
739            "Raw noise from hook capture",
740            "session-5",
741            CognitiveLevel::Raw,
742        )
743        .await;
744
745        let service = DigestService::new(
746            AgentConfig::default(),
747            Arc::new(MockLlmClient::new(Vec::new())),
748            None,
749        );
750
751        let result = service
752            .digest_session(namespace_id, "session-5", &repo, false)
753            .await;
754        assert!(result.is_err());
755    }
756
757    #[tokio::test]
758    async fn test_digest_session_ignores_other_sessions() {
759        let (_pool, repo, namespace_id) = setup_repo().await;
760        store_session_memory(
761            &repo,
762            namespace_id,
763            "Memory from a different session entirely.",
764            "other-session",
765            CognitiveLevel::Explicit,
766        )
767        .await;
768
769        let service = DigestService::new(
770            AgentConfig::default(),
771            Arc::new(MockLlmClient::new(Vec::new())),
772            None,
773        );
774
775        let result = service
776            .digest_session(namespace_id, "session-6", &repo, false)
777            .await;
778        assert!(result.is_err());
779    }
780
781    #[tokio::test]
782    async fn test_digest_session_creates_evidence_lineage() {
783        let (_pool, repo, namespace_id) = setup_repo().await;
784        let m1 = store_session_memory(
785            &repo,
786            namespace_id,
787            "Source memory one.",
788            "session-7",
789            CognitiveLevel::Explicit,
790        )
791        .await;
792        let m2 = store_session_memory(
793            &repo,
794            namespace_id,
795            "Source memory two.",
796            "session-7",
797            CognitiveLevel::Explicit,
798        )
799        .await;
800
801        let service = DigestService::new(
802            AgentConfig::default(),
803            Arc::new(MockLlmClient::new(vec![Ok(good_digest_response())])),
804            None,
805        );
806
807        let result = service
808            .digest_session(namespace_id, "session-7", &repo, false)
809            .await
810            .unwrap();
811
812        let short_lineage = repo.load_lineage(result.short_id).await.unwrap();
813        assert_eq!(short_lineage.len(), 2);
814        let short_sources: Vec<i64> = short_lineage.iter().map(|e| e.source_memory_id).collect();
815        assert!(short_sources.contains(&m1.id));
816        assert!(short_sources.contains(&m2.id));
817        assert_eq!(short_lineage[0].evidence_role, DIGESTED_FROM_ROLE);
818
819        let long_lineage = repo.load_lineage(result.long_id).await.unwrap();
820        assert_eq!(long_lineage.len(), 2);
821    }
822
823    #[test]
824    fn test_truncate_within_limit() {
825        assert_eq!(truncate("hello", 10), "hello");
826    }
827
828    #[test]
829    fn test_truncate_at_boundary() {
830        let input = "a".repeat(50);
831        assert_eq!(truncate(&input, 50), input);
832        assert_eq!(truncate(&input, 30).len(), 30);
833    }
834
835    #[test]
836    fn test_truncate_splits_at_word() {
837        let input = "the quick brown fox jumps over the lazy dog";
838        let result = truncate(input, 20);
839        assert!(result.len() <= 20);
840        assert!(!result.ends_with(' ') || result.is_empty());
841    }
842
843    #[test]
844    fn test_estimate_tokens() {
845        assert_eq!(estimate_tokens("hello world"), 2);
846        assert_eq!(estimate_tokens(""), 0);
847        assert_eq!(estimate_tokens("  spaced  out  "), 2);
848    }
849
850    fn test_memory(id: i64, content: &str) -> Memory {
851        Memory {
852            id,
853            namespace_id: 1,
854            content: content.to_string(),
855            category: MemoryCategory::Session,
856            memory_lane_type: None,
857            labels: vec![],
858            metadata: serde_json::json!({}),
859            similarity_score: None,
860            relevance_score: None,
861            content_embedding: None,
862            embedding_model: None,
863            created_at: Utc::now(),
864            updated_at: None,
865            last_accessed: None,
866            is_active: true,
867            is_archived: false,
868            access_count: 0,
869        }
870    }
871
872    #[tokio::test]
873    async fn test_digest_memories_get_embeddings_when_service_provided() {
874        let (_pool, repo, namespace_id) = setup_repo().await;
875        store_session_memory(
876            &repo,
877            namespace_id,
878            "Fixed query pagination behavior.",
879            "session-embed",
880            CognitiveLevel::Explicit,
881        )
882        .await;
883        store_session_memory(
884            &repo,
885            namespace_id,
886            "Added working-set retrieval primitives.",
887            "session-embed",
888            CognitiveLevel::Explicit,
889        )
890        .await;
891
892        let mock_embed = nexus_embeddings::MockEmbeddingService::new();
893        let service = DigestService::new(
894            AgentConfig::default(),
895            Arc::new(MockLlmClient::new(vec![Ok(good_digest_response())])),
896            Some(Arc::new(mock_embed)),
897        );
898
899        let result = service
900            .digest_session(namespace_id, "session-embed", &repo, false)
901            .await
902            .unwrap();
903
904        let short = repo.get_by_id(result.short_id).await.unwrap().unwrap();
905        assert!(
906            short.content_embedding.is_some(),
907            "short digest should have an embedding when service is provided"
908        );
909        assert_eq!(
910            short.content_embedding.as_ref().unwrap().len(),
911            384,
912            "short digest embedding dimension should be 384"
913        );
914
915        let long = repo.get_by_id(result.long_id).await.unwrap().unwrap();
916        assert!(
917            long.content_embedding.is_some(),
918            "long digest should have an embedding when service is provided"
919        );
920        assert_eq!(
921            long.content_embedding.as_ref().unwrap().len(),
922            384,
923            "long digest embedding dimension should be 384"
924        );
925    }
926
927    #[tokio::test]
928    async fn test_digest_memories_stored_without_embedding_when_service_absent() {
929        let (_pool, repo, namespace_id) = setup_repo().await;
930        store_session_memory(
931            &repo,
932            namespace_id,
933            "Fixed query pagination behavior.",
934            "session-no-embed",
935            CognitiveLevel::Explicit,
936        )
937        .await;
938        store_session_memory(
939            &repo,
940            namespace_id,
941            "Added working-set retrieval primitives.",
942            "session-no-embed",
943            CognitiveLevel::Explicit,
944        )
945        .await;
946
947        let service = DigestService::new(
948            AgentConfig::default(),
949            Arc::new(MockLlmClient::new(vec![Ok(good_digest_response())])),
950            None,
951        );
952
953        let result = service
954            .digest_session(namespace_id, "session-no-embed", &repo, false)
955            .await
956            .unwrap();
957
958        let short = repo.get_by_id(result.short_id).await.unwrap().unwrap();
959        assert!(
960            short.content_embedding.is_none(),
961            "short digest should NOT have embedding when no service provided"
962        );
963
964        let long = repo.get_by_id(result.long_id).await.unwrap().unwrap();
965        assert!(
966            long.content_embedding.is_none(),
967            "long digest should NOT have embedding when no service provided"
968        );
969    }
970
971    #[test]
972    fn test_fallback_short_uses_first_five_memories() {
973        let memories = vec![
974            test_memory(1, "First memory content here."),
975            test_memory(2, "Second memory."),
976        ];
977        let result = fallback_short(&memories);
978        assert!(result.contains("First memory"));
979        assert!(result.contains("Second memory"));
980    }
981
982    #[test]
983    fn test_fallback_long_concatenates_all_memories() {
984        let memories = vec![
985            test_memory(1, "Alpha."),
986            test_memory(2, "Beta."),
987            test_memory(3, "Gamma."),
988        ];
989        let result = fallback_long(&memories);
990        assert!(result.contains("Alpha"));
991        assert!(result.contains("Beta"));
992        assert!(result.contains("Gamma"));
993    }
994}