1use std::sync::Arc;
4use std::time::Instant;
5
6use chrono::Utc;
7use nexus_core::config::AgentConfig;
8use nexus_core::traits::EmbeddingService;
9use nexus_core::{
10 cognitive_level_from_metadata, CognitiveLevel, CognitiveMetadata, Memory, MemoryCategory,
11 MemoryLaneCognitiveType, MemoryLaneType, PerspectiveKey,
12};
13use nexus_llm::{ChatMessage, GenerateParams, LlmClient, TokenUsage};
14use nexus_storage::repository::{
15 MemoryRepository, StoreDigestParams, StoreMemoryParams, StoreMemoryWithLineageParams,
16};
17use tracing::{debug, info, warn};
18
19use crate::error::AgentError;
20use crate::prompts::{digest_user_prompt, DIGEST_SYSTEM_PROMPT};
21use crate::util::{
22 flush_metric_samples, maybe_embed, parse_json_response, stage_metric_sample,
23 token_usage_metric_samples,
24};
25
26const DIGEST_MAX_TOKENS: u32 = 4096;
27const DIGEST_MAX_SOURCE_MEMORIES: i64 = 200;
28const DIGEST_GENERATED_BY: &str = "digest_service";
29const DIGEST_KIND_SHORT: &str = "short";
30const DIGEST_KIND_LONG: &str = "long";
31const SHORT_MAX_CHARS: usize = 4000;
32const LONG_MAX_CHARS: usize = 12000;
33const DIGESTED_FROM_ROLE: &str = "digested_from";
34
35#[derive(Debug, Clone)]
37pub struct DigestResult {
38 pub short_id: i64,
39 pub long_id: i64,
40 pub source_count: usize,
41}
42
43#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
45struct DigestEnvelope {
46 short: String,
47 long: String,
48}
49
50pub struct DigestService {
51 config: AgentConfig,
52 llm: Arc<dyn LlmClient>,
53 embeddings: Option<Arc<dyn EmbeddingService>>,
54}
55
56impl DigestService {
57 pub fn new(
58 config: AgentConfig,
59 llm: Arc<dyn LlmClient>,
60 embeddings: Option<Arc<dyn EmbeddingService>>,
61 ) -> Self {
62 Self {
63 config,
64 llm,
65 embeddings,
66 }
67 }
68
69 pub async fn digest_session(
74 &self,
75 namespace_id: i64,
76 session_key: &str,
77 repo: &MemoryRepository,
78 _force: bool,
79 ) -> Result<DigestResult, AgentError> {
80 let total_started = Instant::now();
81 let mut metrics = Vec::new();
82 if let Some(existing) = existing_digest_ids(repo, namespace_id, session_key).await? {
86 debug!(
87 namespace_id,
88 session_key,
89 short_id = existing.short_id,
90 long_id = existing.long_id,
91 "Reusing existing session digests"
92 );
93 return Ok(existing);
94 }
95
96 let gather_started = Instant::now();
97 let memories = gather_session_memories(repo, namespace_id, session_key).await?;
98 metrics.push(stage_metric_sample(
99 namespace_id,
100 "cognition.digest.gather_ms",
101 gather_started.elapsed().as_secs_f64() * 1000.0,
102 "gather",
103 ));
104 if memories.is_empty() {
105 return Err(AgentError::Digest(format!(
106 "No non-raw memories found for session \"{}\"",
107 session_key
108 )));
109 }
110
111 let source_count = memories.len();
112 let source_ids: Vec<i64> = memories.iter().map(|m| m.id).collect();
113 let min_id = source_ids.iter().copied().min().unwrap_or(0);
114 let max_id = source_ids.iter().copied().max().unwrap_or(0);
115
116 let produce_started = Instant::now();
117 let (envelope, usage) = produce_digests(self.llm.as_ref(), session_key, &memories).await;
118 metrics.push(stage_metric_sample(
119 namespace_id,
120 "cognition.digest.produce_ms",
121 produce_started.elapsed().as_secs_f64() * 1000.0,
122 "produce",
123 ));
124 metrics.extend(token_usage_metric_samples(
125 namespace_id,
126 "cognition.digest.produce",
127 "produce",
128 usage.as_ref(),
129 ));
130 let short_content = truncate(envelope.short.trim(), SHORT_MAX_CHARS);
131 let long_content = truncate(envelope.long.trim(), LONG_MAX_CHARS);
132
133 let perspective = PerspectiveKey {
134 observer: self.config.namespace.clone(),
135 subject: self.config.namespace.clone(),
136 session_key: Some(session_key.to_string()),
137 };
138
139 let embeddings = self.embeddings.as_deref();
140 let short_memory = store_digest_memory(
141 repo,
142 namespace_id,
143 &short_content,
144 CognitiveLevel::SummaryShort,
145 &perspective,
146 &source_ids,
147 embeddings,
148 )
149 .await?;
150
151 let long_memory = store_digest_memory(
152 repo,
153 namespace_id,
154 &long_content,
155 CognitiveLevel::SummaryLong,
156 &perspective,
157 &source_ids,
158 embeddings,
159 )
160 .await?;
161
162 let short_tokens = estimate_tokens(&short_content);
163 let long_tokens = estimate_tokens(&long_content);
164
165 let store_started = Instant::now();
166 repo.store_digest(StoreDigestParams {
167 namespace_id,
168 session_key,
169 digest_kind: DIGEST_KIND_SHORT,
170 memory_id: short_memory.id,
171 start_memory_id: Some(min_id),
172 end_memory_id: Some(max_id),
173 token_count: short_tokens,
174 })
175 .await
176 .map_err(|e| AgentError::Storage(e.to_string()))?;
177
178 repo.store_digest(StoreDigestParams {
179 namespace_id,
180 session_key,
181 digest_kind: DIGEST_KIND_LONG,
182 memory_id: long_memory.id,
183 start_memory_id: Some(min_id),
184 end_memory_id: Some(max_id),
185 token_count: long_tokens,
186 })
187 .await
188 .map_err(|e| AgentError::Storage(e.to_string()))?;
189 metrics.push(stage_metric_sample(
190 namespace_id,
191 "cognition.digest.store_ms",
192 store_started.elapsed().as_secs_f64() * 1000.0,
193 "store",
194 ));
195
196 info!(
197 namespace_id,
198 session_key,
199 short_id = short_memory.id,
200 long_id = long_memory.id,
201 source_count,
202 "Created session digests"
203 );
204 metrics.push(stage_metric_sample(
205 namespace_id,
206 "cognition.digest.total_ms",
207 total_started.elapsed().as_secs_f64() * 1000.0,
208 "total",
209 ));
210 flush_metric_samples(repo, &metrics).await;
211
212 Ok(DigestResult {
213 short_id: short_memory.id,
214 long_id: long_memory.id,
215 source_count,
216 })
217 }
218}
219
220async fn existing_digest_ids(
225 repo: &MemoryRepository,
226 namespace_id: i64,
227 session_key: &str,
228) -> Result<Option<DigestResult>, AgentError> {
229 let short = repo
230 .latest_digest_for_session(namespace_id, session_key, DIGEST_KIND_SHORT)
231 .await
232 .map_err(|e| AgentError::Storage(e.to_string()))?;
233
234 let long = repo
235 .latest_digest_for_session(namespace_id, session_key, DIGEST_KIND_LONG)
236 .await
237 .map_err(|e| AgentError::Storage(e.to_string()))?;
238
239 match (short, long) {
240 (Some(s), Some(l)) => Ok(Some(DigestResult {
241 short_id: s.id,
242 long_id: l.id,
243 source_count: 0,
244 })),
245 _ => Ok(None),
246 }
247}
248
249async fn gather_session_memories(
250 repo: &MemoryRepository,
251 namespace_id: i64,
252 session_key: &str,
253) -> Result<Vec<Memory>, AgentError> {
254 let matching = repo
255 .list_by_session_key(namespace_id, session_key, DIGEST_MAX_SOURCE_MEMORIES, false)
256 .await
257 .map_err(|e| AgentError::Storage(e.to_string()))?;
258
259 Ok(matching
260 .into_iter()
261 .filter(|m| {
262 let level = cognitive_level_from_metadata(&m.metadata);
263 if matches!(
264 level,
265 CognitiveLevel::Raw | CognitiveLevel::SummaryShort | CognitiveLevel::SummaryLong
266 ) {
267 return false;
268 }
269 true
270 })
271 .collect())
272}
273
274async fn produce_digests(
275 llm: &dyn LlmClient,
276 session_key: &str,
277 memories: &[Memory],
278) -> (DigestEnvelope, Option<TokenUsage>) {
279 let pairs: Vec<(i64, &str)> = memories
280 .iter()
281 .map(|m| (m.id, m.content.as_str()))
282 .collect();
283 let user_msg = digest_user_prompt(session_key, &pairs);
284
285 let params = GenerateParams {
286 messages: vec![
287 ChatMessage::system(DIGEST_SYSTEM_PROMPT),
288 ChatMessage::user(user_msg),
289 ],
290 max_tokens: DIGEST_MAX_TOKENS,
291 temperature: 0.2,
292 json_mode: true,
293 };
294
295 match llm.generate(params).await {
296 Ok(response) => {
297 let usage = response.usage.clone();
298 match parse_json_response::<DigestEnvelope>(&response) {
299 Ok(envelope)
300 if envelope.short.trim().is_empty() && envelope.long.trim().is_empty() =>
301 {
302 warn!("LLM returned empty digest, using fallback");
303 (fallback_digest(memories), usage)
304 }
305 Ok(envelope) => (envelope, usage),
306 Err(error) => {
307 warn!(%error, "LLM digest response was invalid JSON, using fallback");
308 (fallback_digest(memories), usage)
309 }
310 }
311 }
312 Err(error) => {
313 warn!(%error, "LLM digest call failed, using fallback");
314 (fallback_digest(memories), None)
315 }
316 }
317}
318
319fn fallback_digest(memories: &[Memory]) -> DigestEnvelope {
320 let short = fallback_short(memories);
321 let long = fallback_long(memories);
322 DigestEnvelope { short, long }
323}
324
325fn fallback_short(memories: &[Memory]) -> String {
326 let combined: String = memories
327 .iter()
328 .take(5)
329 .map(|m| m.content.trim())
330 .filter(|s| !s.is_empty())
331 .collect::<Vec<_>>()
332 .join(" ");
333
334 truncate(&combined, SHORT_MAX_CHARS)
335}
336
337fn fallback_long(memories: &[Memory]) -> String {
338 let combined: String = memories
339 .iter()
340 .map(|m| m.content.trim())
341 .filter(|s| !s.is_empty())
342 .collect::<Vec<_>>()
343 .join(" ");
344
345 truncate(&combined, LONG_MAX_CHARS)
346}
347
348async fn store_digest_memory(
349 repo: &MemoryRepository,
350 namespace_id: i64,
351 content: &str,
352 level: CognitiveLevel,
353 perspective: &PerspectiveKey,
354 source_ids: &[i64],
355 embeddings: Option<&dyn EmbeddingService>,
356) -> Result<Memory, AgentError> {
357 let mut cognitive = CognitiveMetadata::new(
358 level,
359 perspective.observer.clone(),
360 perspective.subject.clone(),
361 perspective.session_key.clone(),
362 DIGEST_GENERATED_BY,
363 );
364 cognitive.source_memory_ids = source_ids.to_vec();
365 cognitive.confidence = Some(0.80);
366 cognitive.times_reinforced = 0;
367 cognitive.times_contradicted = 0;
368 cognitive.derived_at = Some(Utc::now());
369 cognitive.generated_by = Some(DIGEST_GENERATED_BY.to_string());
370
371 let metadata = cognitive.merge_into(&serde_json::json!({}));
372 let (embedding, embedding_model) = maybe_embed(embeddings, content).await;
373
374 let memory = repo
375 .store_with_lineage(StoreMemoryWithLineageParams {
376 store: StoreMemoryParams {
377 namespace_id,
378 content,
379 category: &MemoryCategory::Session,
380 memory_lane_type: Some(&MemoryLaneType::Cognitive(
381 MemoryLaneCognitiveType::Explicit,
382 )),
383 labels: &["digest".to_string(), level.to_string().to_lowercase()],
384 metadata: &metadata,
385 embedding: embedding.as_deref(),
386 embedding_model: embedding_model.as_deref(),
387 },
388 source_memory_ids: source_ids,
389 evidence_role: DIGESTED_FROM_ROLE,
390 })
391 .await
392 .map_err(|e| AgentError::Storage(e.to_string()))?;
393
394 Ok(memory)
395}
396
397fn truncate(s: &str, max_chars: usize) -> String {
398 if s.len() <= max_chars {
399 return s.to_string();
400 }
401 let mut end = max_chars;
402 while end > 0 && !s.is_char_boundary(end) {
403 end -= 1;
404 }
405 let mut truncated = s[..end].to_string();
406 if let Some(last_space) = truncated.rfind(' ') {
407 truncated.truncate(last_space);
408 }
409 truncated
410}
411
412fn estimate_tokens(s: &str) -> usize {
413 s.split_whitespace().count()
414}
415
416#[cfg(test)]
421mod tests {
422 use super::*;
423
424 use std::collections::VecDeque;
425 use std::sync::Mutex;
426
427 use async_trait::async_trait;
428 use chrono::Utc;
429 use nexus_core::MemoryLanePriorityType;
430 use nexus_llm::GenerateResponse;
431 use nexus_storage::repository::NamespaceRepository;
432 use sqlx::sqlite::SqlitePoolOptions;
433
434 struct MockLlmClient {
435 responses: Mutex<VecDeque<nexus_llm::Result<GenerateResponse>>>,
436 }
437
438 impl MockLlmClient {
439 fn new(responses: Vec<nexus_llm::Result<GenerateResponse>>) -> Self {
440 Self {
441 responses: Mutex::new(VecDeque::from(responses)),
442 }
443 }
444 }
445
446 #[async_trait]
447 impl LlmClient for MockLlmClient {
448 async fn generate(&self, _params: GenerateParams) -> nexus_llm::Result<GenerateResponse> {
449 self.responses
450 .lock()
451 .expect("mock responses poisoned")
452 .pop_front()
453 .expect("mock response missing")
454 }
455
456 fn provider_name(&self) -> String {
457 "mock".to_string()
458 }
459
460 fn model_name(&self) -> String {
461 "mock-model".to_string()
462 }
463 }
464
465 async fn setup_repo() -> (sqlx::SqlitePool, MemoryRepository, i64) {
466 let pool = SqlitePoolOptions::new()
467 .max_connections(1)
468 .connect("sqlite::memory:")
469 .await
470 .unwrap();
471 nexus_storage::migrations::run_migrations(&pool)
472 .await
473 .unwrap();
474 let namespace_repo = NamespaceRepository::new(pool.clone());
475 let namespace = namespace_repo
476 .get_or_create("digest-test", "digest-test")
477 .await
478 .unwrap();
479 let repo = MemoryRepository::new(pool.clone());
480 (pool, repo, namespace.id)
481 }
482
483 fn session_metadata(session_key: &str, level: CognitiveLevel) -> serde_json::Value {
484 let cognitive = CognitiveMetadata::new(
485 level,
486 "claude-code",
487 "claude-code",
488 Some(session_key.to_string()),
489 "derive_service",
490 );
491 cognitive.merge_into(&serde_json::json!({}))
492 }
493
494 async fn store_session_memory(
495 repo: &MemoryRepository,
496 namespace_id: i64,
497 content: &str,
498 session_key: &str,
499 level: CognitiveLevel,
500 ) -> Memory {
501 repo.store(StoreMemoryParams {
502 namespace_id,
503 content,
504 category: &MemoryCategory::Session,
505 memory_lane_type: Some(&MemoryLaneType::Priority(MemoryLanePriorityType::Decision)),
506 labels: &["test".to_string()],
507 metadata: &session_metadata(session_key, level),
508 embedding: None,
509 embedding_model: None,
510 })
511 .await
512 .unwrap()
513 }
514
515 fn good_digest_response() -> GenerateResponse {
516 GenerateResponse {
517 content: r#"{"short":"Fixed query pagination and added working-set retrieval.","long":"Session focused on fixing query pagination behavior in the memory system. The team tightened the ranking logic and added bounded working-set retrieval. New digest infrastructure was introduced to track session summaries."}"#
518 .to_string(),
519 model: "mock-model".to_string(),
520 usage: None,
521 }
522 }
523
524 #[tokio::test]
525 async fn test_digest_session_errors_on_empty_session() {
526 let (_pool, repo, namespace_id) = setup_repo().await;
527 let service = DigestService::new(
528 AgentConfig::default(),
529 Arc::new(MockLlmClient::new(Vec::new())),
530 None,
531 );
532
533 let result = service
534 .digest_session(namespace_id, "empty-session", &repo, false)
535 .await;
536 assert!(result.is_err());
537 let err = result.unwrap_err().to_string();
538 assert!(err.contains("No non-raw memories"));
539 }
540
541 #[tokio::test]
542 async fn test_digest_session_creates_short_and_long() {
543 let (pool, repo, namespace_id) = setup_repo().await;
544 store_session_memory(
545 &repo,
546 namespace_id,
547 "Fixed query pagination behavior.",
548 "session-1",
549 CognitiveLevel::Explicit,
550 )
551 .await;
552 store_session_memory(
553 &repo,
554 namespace_id,
555 "Added working-set retrieval primitives.",
556 "session-1",
557 CognitiveLevel::Explicit,
558 )
559 .await;
560
561 let service = DigestService::new(
562 AgentConfig::default(),
563 Arc::new(MockLlmClient::new(vec![Ok(good_digest_response())])),
564 None,
565 );
566
567 let result = service
568 .digest_session(namespace_id, "session-1", &repo, false)
569 .await
570 .unwrap();
571
572 assert_eq!(result.source_count, 2);
573 assert!(result.short_id > 0);
574 assert!(result.long_id > 0);
575
576 let short = repo.get_by_id(result.short_id).await.unwrap().unwrap();
577 assert_eq!(
578 cognitive_level_from_metadata(&short.metadata),
579 CognitiveLevel::SummaryShort
580 );
581 assert!(short.content.contains("pagination"));
582
583 let long = repo.get_by_id(result.long_id).await.unwrap().unwrap();
584 assert_eq!(
585 cognitive_level_from_metadata(&long.metadata),
586 CognitiveLevel::SummaryLong
587 );
588 assert!(long.content.contains("digest"));
589
590 let short_digests: i64 =
592 sqlx::query_scalar("SELECT COUNT(*) FROM session_digests WHERE digest_kind = ?")
593 .bind(DIGEST_KIND_SHORT)
594 .fetch_one(&pool)
595 .await
596 .unwrap();
597 let long_digests: i64 =
598 sqlx::query_scalar("SELECT COUNT(*) FROM session_digests WHERE digest_kind = ?")
599 .bind(DIGEST_KIND_LONG)
600 .fetch_one(&pool)
601 .await
602 .unwrap();
603 assert_eq!(short_digests, 1);
604 assert_eq!(long_digests, 1);
605 }
606
607 #[tokio::test]
608 async fn test_digest_session_falls_back_on_llm_failure() {
609 let (_pool, repo, namespace_id) = setup_repo().await;
610 store_session_memory(
611 &repo,
612 namespace_id,
613 "Implemented digest service with fallback behavior.",
614 "session-2",
615 CognitiveLevel::Explicit,
616 )
617 .await;
618
619 let bad_response = GenerateResponse {
620 content: "not valid json at all".to_string(),
621 model: "mock-model".to_string(),
622 usage: None,
623 };
624 let service = DigestService::new(
625 AgentConfig::default(),
626 Arc::new(MockLlmClient::new(vec![Ok(bad_response)])),
627 None,
628 );
629
630 let result = service
631 .digest_session(namespace_id, "session-2", &repo, false)
632 .await
633 .unwrap();
634
635 assert_eq!(result.source_count, 1);
636
637 let short = repo.get_by_id(result.short_id).await.unwrap().unwrap();
638 assert_eq!(
639 cognitive_level_from_metadata(&short.metadata),
640 CognitiveLevel::SummaryShort
641 );
642 assert!(short.content.len() <= SHORT_MAX_CHARS);
644
645 let long = repo.get_by_id(result.long_id).await.unwrap().unwrap();
646 assert!(long.content.contains("digest service"));
647 }
648
649 #[tokio::test]
650 async fn test_digest_session_is_idempotent() {
651 let (_pool, repo, namespace_id) = setup_repo().await;
652 store_session_memory(
653 &repo,
654 namespace_id,
655 "Refactored the memory consolidation pipeline.",
656 "session-3",
657 CognitiveLevel::Explicit,
658 )
659 .await;
660
661 let service = DigestService::new(
662 AgentConfig::default(),
663 Arc::new(MockLlmClient::new(vec![Ok(good_digest_response())])),
664 None,
665 );
666
667 let first = service
668 .digest_session(namespace_id, "session-3", &repo, false)
669 .await
670 .unwrap();
671 let second = service
672 .digest_session(namespace_id, "session-3", &repo, false)
673 .await
674 .unwrap();
675
676 assert_eq!(first.short_id, second.short_id);
677 assert_eq!(first.long_id, second.long_id);
678 }
679
680 #[tokio::test]
681 async fn test_digest_session_force_respects_existing() {
682 let (pool, repo, namespace_id) = setup_repo().await;
683 store_session_memory(
684 &repo,
685 namespace_id,
686 "Added token counting to digest service.",
687 "session-4",
688 CognitiveLevel::Explicit,
689 )
690 .await;
691
692 let service = DigestService::new(
693 AgentConfig::default(),
694 Arc::new(MockLlmClient::new(vec![
695 Ok(good_digest_response()),
696 Ok(good_digest_response()),
697 ])),
698 None,
699 );
700
701 let first = service
702 .digest_session(namespace_id, "session-4", &repo, false)
703 .await
704 .unwrap();
705 let forced = service
706 .digest_session(namespace_id, "session-4", &repo, true)
707 .await
708 .unwrap();
709
710 assert_eq!(first.short_id, forced.short_id);
712 assert_eq!(first.long_id, forced.long_id);
713
714 let total_digests: i64 =
716 sqlx::query_scalar("SELECT COUNT(*) FROM session_digests WHERE session_key = ?")
717 .bind("session-4")
718 .fetch_one(&pool)
719 .await
720 .unwrap();
721 assert_eq!(total_digests, 2);
722
723 let latest_short = repo
724 .latest_digest_for_session(namespace_id, "session-4", DIGEST_KIND_SHORT)
725 .await
726 .unwrap()
727 .unwrap();
728 let latest_long = repo
729 .latest_digest_for_session(namespace_id, "session-4", DIGEST_KIND_LONG)
730 .await
731 .unwrap()
732 .unwrap();
733 assert_eq!(latest_short.id, forced.short_id);
734 assert_eq!(latest_long.id, forced.long_id);
735 }
736
737 #[tokio::test]
738 async fn test_digest_session_ignores_raw_memories() {
739 let (_pool, repo, namespace_id) = setup_repo().await;
740 store_session_memory(
741 &repo,
742 namespace_id,
743 "Raw noise from hook capture",
744 "session-5",
745 CognitiveLevel::Raw,
746 )
747 .await;
748
749 let service = DigestService::new(
750 AgentConfig::default(),
751 Arc::new(MockLlmClient::new(Vec::new())),
752 None,
753 );
754
755 let result = service
756 .digest_session(namespace_id, "session-5", &repo, false)
757 .await;
758 assert!(result.is_err());
759 }
760
761 #[tokio::test]
762 async fn test_digest_session_ignores_other_sessions() {
763 let (_pool, repo, namespace_id) = setup_repo().await;
764 store_session_memory(
765 &repo,
766 namespace_id,
767 "Memory from a different session entirely.",
768 "other-session",
769 CognitiveLevel::Explicit,
770 )
771 .await;
772
773 let service = DigestService::new(
774 AgentConfig::default(),
775 Arc::new(MockLlmClient::new(Vec::new())),
776 None,
777 );
778
779 let result = service
780 .digest_session(namespace_id, "session-6", &repo, false)
781 .await;
782 assert!(result.is_err());
783 }
784
785 #[tokio::test]
786 async fn test_digest_session_creates_evidence_lineage() {
787 let (_pool, repo, namespace_id) = setup_repo().await;
788 let m1 = store_session_memory(
789 &repo,
790 namespace_id,
791 "Source memory one.",
792 "session-7",
793 CognitiveLevel::Explicit,
794 )
795 .await;
796 let m2 = store_session_memory(
797 &repo,
798 namespace_id,
799 "Source memory two.",
800 "session-7",
801 CognitiveLevel::Explicit,
802 )
803 .await;
804
805 let service = DigestService::new(
806 AgentConfig::default(),
807 Arc::new(MockLlmClient::new(vec![Ok(good_digest_response())])),
808 None,
809 );
810
811 let result = service
812 .digest_session(namespace_id, "session-7", &repo, false)
813 .await
814 .unwrap();
815
816 let short_lineage = repo.load_lineage(result.short_id).await.unwrap();
817 assert_eq!(short_lineage.len(), 2);
818 let short_sources: Vec<i64> = short_lineage.iter().map(|e| e.source_memory_id).collect();
819 assert!(short_sources.contains(&m1.id));
820 assert!(short_sources.contains(&m2.id));
821 assert_eq!(short_lineage[0].evidence_role, DIGESTED_FROM_ROLE);
822
823 let long_lineage = repo.load_lineage(result.long_id).await.unwrap();
824 assert_eq!(long_lineage.len(), 2);
825 }
826
827 #[test]
828 fn test_truncate_within_limit() {
829 assert_eq!(truncate("hello", 10), "hello");
830 }
831
832 #[test]
833 fn test_truncate_at_boundary() {
834 let input = "a".repeat(50);
835 assert_eq!(truncate(&input, 50), input);
836 assert_eq!(truncate(&input, 30).len(), 30);
837 }
838
839 #[test]
840 fn test_truncate_splits_at_word() {
841 let input = "the quick brown fox jumps over the lazy dog";
842 let result = truncate(input, 20);
843 assert!(result.len() <= 20);
844 assert!(!result.ends_with(' ') || result.is_empty());
845 }
846
847 #[test]
848 fn test_estimate_tokens() {
849 assert_eq!(estimate_tokens("hello world"), 2);
850 assert_eq!(estimate_tokens(""), 0);
851 assert_eq!(estimate_tokens(" spaced out "), 2);
852 }
853
854 fn test_memory(id: i64, content: &str) -> Memory {
855 Memory {
856 id,
857 namespace_id: 1,
858 content: content.to_string(),
859 category: MemoryCategory::Session,
860 memory_lane_type: None,
861 labels: vec![],
862 metadata: serde_json::json!({}),
863 similarity_score: None,
864 relevance_score: None,
865 content_embedding: None,
866 embedding_model: None,
867 created_at: Utc::now(),
868 updated_at: None,
869 last_accessed: None,
870 is_active: true,
871 is_archived: false,
872 access_count: 0,
873 }
874 }
875
876 #[tokio::test]
877 async fn test_digest_memories_get_embeddings_when_service_provided() {
878 let (_pool, repo, namespace_id) = setup_repo().await;
879 store_session_memory(
880 &repo,
881 namespace_id,
882 "Fixed query pagination behavior.",
883 "session-embed",
884 CognitiveLevel::Explicit,
885 )
886 .await;
887 store_session_memory(
888 &repo,
889 namespace_id,
890 "Added working-set retrieval primitives.",
891 "session-embed",
892 CognitiveLevel::Explicit,
893 )
894 .await;
895
896 let mock_embed = nexus_embeddings::MockEmbeddingService::new();
897 let service = DigestService::new(
898 AgentConfig::default(),
899 Arc::new(MockLlmClient::new(vec![Ok(good_digest_response())])),
900 Some(Arc::new(mock_embed)),
901 );
902
903 let result = service
904 .digest_session(namespace_id, "session-embed", &repo, false)
905 .await
906 .unwrap();
907
908 let short = repo.get_by_id(result.short_id).await.unwrap().unwrap();
909 assert!(
910 short.content_embedding.is_some(),
911 "short digest should have an embedding when service is provided"
912 );
913 assert_eq!(
914 short.content_embedding.as_ref().unwrap().len(),
915 384,
916 "short digest embedding dimension should be 384"
917 );
918
919 let long = repo.get_by_id(result.long_id).await.unwrap().unwrap();
920 assert!(
921 long.content_embedding.is_some(),
922 "long digest should have an embedding when service is provided"
923 );
924 assert_eq!(
925 long.content_embedding.as_ref().unwrap().len(),
926 384,
927 "long digest embedding dimension should be 384"
928 );
929 }
930
931 #[tokio::test]
932 async fn test_digest_memories_stored_without_embedding_when_service_absent() {
933 let (_pool, repo, namespace_id) = setup_repo().await;
934 store_session_memory(
935 &repo,
936 namespace_id,
937 "Fixed query pagination behavior.",
938 "session-no-embed",
939 CognitiveLevel::Explicit,
940 )
941 .await;
942 store_session_memory(
943 &repo,
944 namespace_id,
945 "Added working-set retrieval primitives.",
946 "session-no-embed",
947 CognitiveLevel::Explicit,
948 )
949 .await;
950
951 let service = DigestService::new(
952 AgentConfig::default(),
953 Arc::new(MockLlmClient::new(vec![Ok(good_digest_response())])),
954 None,
955 );
956
957 let result = service
958 .digest_session(namespace_id, "session-no-embed", &repo, false)
959 .await
960 .unwrap();
961
962 let short = repo.get_by_id(result.short_id).await.unwrap().unwrap();
963 assert!(
964 short.content_embedding.is_none(),
965 "short digest should NOT have embedding when no service provided"
966 );
967
968 let long = repo.get_by_id(result.long_id).await.unwrap().unwrap();
969 assert!(
970 long.content_embedding.is_none(),
971 "long digest should NOT have embedding when no service provided"
972 );
973 }
974
975 #[test]
976 fn test_fallback_short_uses_first_five_memories() {
977 let memories = vec![
978 test_memory(1, "First memory content here."),
979 test_memory(2, "Second memory."),
980 ];
981 let result = fallback_short(&memories);
982 assert!(result.contains("First memory"));
983 assert!(result.contains("Second memory"));
984 }
985
986 #[test]
987 fn test_fallback_long_concatenates_all_memories() {
988 let memories = vec![
989 test_memory(1, "Alpha."),
990 test_memory(2, "Beta."),
991 test_memory(3, "Gamma."),
992 ];
993 let result = fallback_long(&memories);
994 assert!(result.contains("Alpha"));
995 assert!(result.contains("Beta"));
996 assert!(result.contains("Gamma"));
997 }
998}