1use std::sync::Arc;
4use std::time::Instant;
5
6use nexus_core::config::AgentConfig;
7use nexus_core::traits::EmbeddingService;
8use nexus_core::{
9 cognitive_level_from_metadata, CognitiveLevel, CognitiveMetadata, Memory, MemoryCategory,
10 MemoryLaneCognitiveType, MemoryLaneType, PerspectiveKey,
11};
12use nexus_llm::{ChatMessage, GenerateParams, LlmClient, TokenUsage};
13use nexus_storage::repository::{
14 MemoryRepository, StoreDigestParams, StoreMemoryParams, StoreMemoryWithLineageParams,
15};
16use tracing::{debug, info, warn};
17
18use crate::error::AgentError;
19use crate::prompts::{digest_user_prompt, DIGEST_SYSTEM_PROMPT};
20use crate::util::{
21 flush_metric_samples, maybe_embed, parse_json_response, stage_metric_sample,
22 token_usage_metric_samples,
23};
24
25const DIGEST_MAX_TOKENS: u32 = 4096;
26const DIGEST_MAX_SOURCE_MEMORIES: i64 = 200;
27const DIGEST_GENERATED_BY: &str = "digest_service";
28const DIGEST_KIND_SHORT: &str = "short";
29const DIGEST_KIND_LONG: &str = "long";
30const SHORT_MAX_CHARS: usize = 300;
31const LONG_MAX_CHARS: usize = 1500;
32const DIGESTED_FROM_ROLE: &str = "digested_from";
33
34#[derive(Debug, Clone)]
36pub struct DigestResult {
37 pub short_id: i64,
38 pub long_id: i64,
39 pub source_count: usize,
40}
41
42#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
44struct DigestEnvelope {
45 short: String,
46 long: String,
47}
48
49pub struct DigestService {
50 config: AgentConfig,
51 llm: Arc<dyn LlmClient>,
52 embeddings: Option<Arc<dyn EmbeddingService>>,
53}
54
55impl DigestService {
56 pub fn new(
57 config: AgentConfig,
58 llm: Arc<dyn LlmClient>,
59 embeddings: Option<Arc<dyn EmbeddingService>>,
60 ) -> Self {
61 Self {
62 config,
63 llm,
64 embeddings,
65 }
66 }
67
68 pub async fn digest_session(
73 &self,
74 namespace_id: i64,
75 session_key: &str,
76 repo: &MemoryRepository,
77 force: bool,
78 ) -> Result<DigestResult, AgentError> {
79 let total_started = Instant::now();
80 let mut metrics = Vec::new();
81 if !force {
83 if let Some(existing) = existing_digest_ids(repo, namespace_id, session_key).await? {
84 debug!(
85 namespace_id,
86 session_key,
87 short_id = existing.short_id,
88 long_id = existing.long_id,
89 "Reusing existing session digests"
90 );
91 return Ok(existing);
92 }
93 }
94
95 let gather_started = Instant::now();
96 let memories = gather_session_memories(repo, namespace_id, session_key).await?;
97 metrics.push(stage_metric_sample(
98 namespace_id,
99 "cognition.digest.gather_ms",
100 gather_started.elapsed().as_secs_f64() * 1000.0,
101 "gather",
102 ));
103 if memories.is_empty() {
104 return Err(AgentError::Digest(format!(
105 "No non-raw memories found for session \"{}\"",
106 session_key
107 )));
108 }
109
110 let source_count = memories.len();
111 let source_ids: Vec<i64> = memories.iter().map(|m| m.id).collect();
112 let min_id = source_ids.iter().copied().min().unwrap_or(0);
113 let max_id = source_ids.iter().copied().max().unwrap_or(0);
114
115 let produce_started = Instant::now();
116 let (envelope, usage) = produce_digests(self.llm.as_ref(), session_key, &memories).await;
117 metrics.push(stage_metric_sample(
118 namespace_id,
119 "cognition.digest.produce_ms",
120 produce_started.elapsed().as_secs_f64() * 1000.0,
121 "produce",
122 ));
123 metrics.extend(token_usage_metric_samples(
124 namespace_id,
125 "cognition.digest.produce",
126 "produce",
127 usage.as_ref(),
128 ));
129 let short_content = truncate(envelope.short.trim(), SHORT_MAX_CHARS);
130 let long_content = truncate(envelope.long.trim(), LONG_MAX_CHARS);
131
132 let perspective = PerspectiveKey {
133 observer: self.config.namespace.clone(),
134 subject: self.config.namespace.clone(),
135 session_key: Some(session_key.to_string()),
136 };
137
138 let embeddings = self.embeddings.as_deref();
139 let short_memory = store_digest_memory(
140 repo,
141 namespace_id,
142 &short_content,
143 CognitiveLevel::SummaryShort,
144 &perspective,
145 &source_ids,
146 embeddings,
147 )
148 .await?;
149
150 let long_memory = store_digest_memory(
151 repo,
152 namespace_id,
153 &long_content,
154 CognitiveLevel::SummaryLong,
155 &perspective,
156 &source_ids,
157 embeddings,
158 )
159 .await?;
160
161 let short_tokens = estimate_tokens(&short_content);
162 let long_tokens = estimate_tokens(&long_content);
163
164 let store_started = Instant::now();
165 repo.store_digest(StoreDigestParams {
166 namespace_id,
167 session_key,
168 digest_kind: DIGEST_KIND_SHORT,
169 memory_id: short_memory.id,
170 start_memory_id: Some(min_id),
171 end_memory_id: Some(max_id),
172 token_count: short_tokens,
173 })
174 .await
175 .map_err(|e| AgentError::Storage(e.to_string()))?;
176
177 repo.store_digest(StoreDigestParams {
178 namespace_id,
179 session_key,
180 digest_kind: DIGEST_KIND_LONG,
181 memory_id: long_memory.id,
182 start_memory_id: Some(min_id),
183 end_memory_id: Some(max_id),
184 token_count: long_tokens,
185 })
186 .await
187 .map_err(|e| AgentError::Storage(e.to_string()))?;
188 metrics.push(stage_metric_sample(
189 namespace_id,
190 "cognition.digest.store_ms",
191 store_started.elapsed().as_secs_f64() * 1000.0,
192 "store",
193 ));
194
195 info!(
196 namespace_id,
197 session_key,
198 short_id = short_memory.id,
199 long_id = long_memory.id,
200 source_count,
201 "Created session digests"
202 );
203 metrics.push(stage_metric_sample(
204 namespace_id,
205 "cognition.digest.total_ms",
206 total_started.elapsed().as_secs_f64() * 1000.0,
207 "total",
208 ));
209 flush_metric_samples(repo, &metrics).await;
210
211 Ok(DigestResult {
212 short_id: short_memory.id,
213 long_id: long_memory.id,
214 source_count,
215 })
216 }
217}
218
219async fn existing_digest_ids(
224 repo: &MemoryRepository,
225 namespace_id: i64,
226 session_key: &str,
227) -> Result<Option<DigestResult>, AgentError> {
228 let short = repo
229 .latest_digest_for_session(namespace_id, session_key, DIGEST_KIND_SHORT)
230 .await
231 .map_err(|e| AgentError::Storage(e.to_string()))?;
232
233 let long = repo
234 .latest_digest_for_session(namespace_id, session_key, DIGEST_KIND_LONG)
235 .await
236 .map_err(|e| AgentError::Storage(e.to_string()))?;
237
238 match (short, long) {
239 (Some(s), Some(l)) => Ok(Some(DigestResult {
240 short_id: s.id,
241 long_id: l.id,
242 source_count: 0,
243 })),
244 _ => Ok(None),
245 }
246}
247
248async fn gather_session_memories(
249 repo: &MemoryRepository,
250 namespace_id: i64,
251 session_key: &str,
252) -> Result<Vec<Memory>, AgentError> {
253 let matching = repo
254 .list_by_session_key(namespace_id, session_key, DIGEST_MAX_SOURCE_MEMORIES, false)
255 .await
256 .map_err(|e| AgentError::Storage(e.to_string()))?;
257
258 Ok(matching
259 .into_iter()
260 .filter(|m| {
261 let level = cognitive_level_from_metadata(&m.metadata);
262 if matches!(
263 level,
264 CognitiveLevel::Raw | CognitiveLevel::SummaryShort | CognitiveLevel::SummaryLong
265 ) {
266 return false;
267 }
268 true
269 })
270 .collect())
271}
272
273async fn produce_digests(
274 llm: &dyn LlmClient,
275 session_key: &str,
276 memories: &[Memory],
277) -> (DigestEnvelope, Option<TokenUsage>) {
278 let pairs: Vec<(i64, &str)> = memories
279 .iter()
280 .map(|m| (m.id, m.content.as_str()))
281 .collect();
282 let user_msg = digest_user_prompt(session_key, &pairs);
283
284 let params = GenerateParams {
285 messages: vec![
286 ChatMessage::system(DIGEST_SYSTEM_PROMPT),
287 ChatMessage::user(user_msg),
288 ],
289 max_tokens: DIGEST_MAX_TOKENS,
290 temperature: 0.2,
291 json_mode: true,
292 };
293
294 match llm.generate(params).await {
295 Ok(response) => {
296 let usage = response.usage.clone();
297 match parse_json_response::<DigestEnvelope>(&response) {
298 Ok(envelope)
299 if envelope.short.trim().is_empty() && envelope.long.trim().is_empty() =>
300 {
301 warn!("LLM returned empty digest, using fallback");
302 (fallback_digest(memories), usage)
303 }
304 Ok(envelope) => (envelope, usage),
305 Err(error) => {
306 warn!(%error, "LLM digest response was invalid JSON, using fallback");
307 (fallback_digest(memories), usage)
308 }
309 }
310 }
311 Err(error) => {
312 warn!(%error, "LLM digest call failed, using fallback");
313 (fallback_digest(memories), None)
314 }
315 }
316}
317
318fn fallback_digest(memories: &[Memory]) -> DigestEnvelope {
319 let short = fallback_short(memories);
320 let long = fallback_long(memories);
321 DigestEnvelope { short, long }
322}
323
324fn fallback_short(memories: &[Memory]) -> String {
325 let combined: String = memories
326 .iter()
327 .take(5)
328 .map(|m| m.content.trim())
329 .filter(|s| !s.is_empty())
330 .collect::<Vec<_>>()
331 .join(" ");
332
333 truncate(&combined, SHORT_MAX_CHARS)
334}
335
336fn fallback_long(memories: &[Memory]) -> String {
337 let combined: String = memories
338 .iter()
339 .map(|m| m.content.trim())
340 .filter(|s| !s.is_empty())
341 .collect::<Vec<_>>()
342 .join(" ");
343
344 truncate(&combined, LONG_MAX_CHARS)
345}
346
347async fn store_digest_memory(
348 repo: &MemoryRepository,
349 namespace_id: i64,
350 content: &str,
351 level: CognitiveLevel,
352 perspective: &PerspectiveKey,
353 source_ids: &[i64],
354 embeddings: Option<&dyn EmbeddingService>,
355) -> Result<Memory, AgentError> {
356 let mut cognitive = CognitiveMetadata::new(
357 level,
358 perspective.observer.clone(),
359 perspective.subject.clone(),
360 perspective.session_key.clone(),
361 DIGEST_GENERATED_BY,
362 );
363 cognitive.source_memory_ids = source_ids.to_vec();
364 cognitive.confidence = Some(0.80);
365
366 let metadata = cognitive.merge_into(&serde_json::json!({}));
367 let (embedding, embedding_model) = maybe_embed(embeddings, content).await;
368
369 let memory = repo
370 .store_with_lineage(StoreMemoryWithLineageParams {
371 store: StoreMemoryParams {
372 namespace_id,
373 content,
374 category: &MemoryCategory::Session,
375 memory_lane_type: Some(&MemoryLaneType::Cognitive(
376 MemoryLaneCognitiveType::Explicit,
377 )),
378 labels: &["digest".to_string(), level.to_string().to_lowercase()],
379 metadata: &metadata,
380 embedding: embedding.as_deref(),
381 embedding_model: embedding_model.as_deref(),
382 },
383 source_memory_ids: source_ids,
384 evidence_role: DIGESTED_FROM_ROLE,
385 })
386 .await
387 .map_err(|e| AgentError::Storage(e.to_string()))?;
388
389 Ok(memory)
390}
391
392fn truncate(s: &str, max_chars: usize) -> String {
393 if s.len() <= max_chars {
394 return s.to_string();
395 }
396 let mut end = max_chars;
397 while end > 0 && !s.is_char_boundary(end) {
398 end -= 1;
399 }
400 let mut truncated = s[..end].to_string();
401 if let Some(last_space) = truncated.rfind(' ') {
402 truncated.truncate(last_space);
403 }
404 truncated
405}
406
407fn estimate_tokens(s: &str) -> usize {
408 s.split_whitespace().count()
409}
410
411#[cfg(test)]
416mod tests {
417 use super::*;
418
419 use std::collections::VecDeque;
420 use std::sync::Mutex;
421
422 use async_trait::async_trait;
423 use chrono::Utc;
424 use nexus_core::MemoryLanePriorityType;
425 use nexus_llm::GenerateResponse;
426 use nexus_storage::repository::NamespaceRepository;
427 use sqlx::sqlite::SqlitePoolOptions;
428
429 struct MockLlmClient {
430 responses: Mutex<VecDeque<nexus_llm::Result<GenerateResponse>>>,
431 }
432
433 impl MockLlmClient {
434 fn new(responses: Vec<nexus_llm::Result<GenerateResponse>>) -> Self {
435 Self {
436 responses: Mutex::new(VecDeque::from(responses)),
437 }
438 }
439 }
440
441 #[async_trait]
442 impl LlmClient for MockLlmClient {
443 async fn generate(&self, _params: GenerateParams) -> nexus_llm::Result<GenerateResponse> {
444 self.responses
445 .lock()
446 .expect("mock responses poisoned")
447 .pop_front()
448 .expect("mock response missing")
449 }
450
451 fn provider_name(&self) -> String {
452 "mock".to_string()
453 }
454
455 fn model_name(&self) -> String {
456 "mock-model".to_string()
457 }
458 }
459
460 async fn setup_repo() -> (sqlx::SqlitePool, MemoryRepository, i64) {
461 let pool = SqlitePoolOptions::new()
462 .max_connections(1)
463 .connect("sqlite::memory:")
464 .await
465 .unwrap();
466 nexus_storage::migrations::run_migrations(&pool)
467 .await
468 .unwrap();
469 let namespace_repo = NamespaceRepository::new(pool.clone());
470 let namespace = namespace_repo
471 .get_or_create("digest-test", "digest-test")
472 .await
473 .unwrap();
474 let repo = MemoryRepository::new(pool.clone());
475 (pool, repo, namespace.id)
476 }
477
478 fn session_metadata(session_key: &str, level: CognitiveLevel) -> serde_json::Value {
479 let cognitive = CognitiveMetadata::new(
480 level,
481 "claude-code",
482 "claude-code",
483 Some(session_key.to_string()),
484 "derive_service",
485 );
486 cognitive.merge_into(&serde_json::json!({}))
487 }
488
489 async fn store_session_memory(
490 repo: &MemoryRepository,
491 namespace_id: i64,
492 content: &str,
493 session_key: &str,
494 level: CognitiveLevel,
495 ) -> Memory {
496 repo.store(StoreMemoryParams {
497 namespace_id,
498 content,
499 category: &MemoryCategory::Session,
500 memory_lane_type: Some(&MemoryLaneType::Priority(MemoryLanePriorityType::Decision)),
501 labels: &["test".to_string()],
502 metadata: &session_metadata(session_key, level),
503 embedding: None,
504 embedding_model: None,
505 })
506 .await
507 .unwrap()
508 }
509
510 fn good_digest_response() -> GenerateResponse {
511 GenerateResponse {
512 content: r#"{"short":"Fixed query pagination and added working-set retrieval.","long":"Session focused on fixing query pagination behavior in the memory system. The team tightened the ranking logic and added bounded working-set retrieval. New digest infrastructure was introduced to track session summaries."}"#
513 .to_string(),
514 model: "mock-model".to_string(),
515 usage: None,
516 }
517 }
518
519 #[tokio::test]
520 async fn test_digest_session_errors_on_empty_session() {
521 let (_pool, repo, namespace_id) = setup_repo().await;
522 let service = DigestService::new(
523 AgentConfig::default(),
524 Arc::new(MockLlmClient::new(Vec::new())),
525 None,
526 );
527
528 let result = service
529 .digest_session(namespace_id, "empty-session", &repo, false)
530 .await;
531 assert!(result.is_err());
532 let err = result.unwrap_err().to_string();
533 assert!(err.contains("No non-raw memories"));
534 }
535
536 #[tokio::test]
537 async fn test_digest_session_creates_short_and_long() {
538 let (pool, repo, namespace_id) = setup_repo().await;
539 store_session_memory(
540 &repo,
541 namespace_id,
542 "Fixed query pagination behavior.",
543 "session-1",
544 CognitiveLevel::Explicit,
545 )
546 .await;
547 store_session_memory(
548 &repo,
549 namespace_id,
550 "Added working-set retrieval primitives.",
551 "session-1",
552 CognitiveLevel::Explicit,
553 )
554 .await;
555
556 let service = DigestService::new(
557 AgentConfig::default(),
558 Arc::new(MockLlmClient::new(vec![Ok(good_digest_response())])),
559 None,
560 );
561
562 let result = service
563 .digest_session(namespace_id, "session-1", &repo, false)
564 .await
565 .unwrap();
566
567 assert_eq!(result.source_count, 2);
568 assert!(result.short_id > 0);
569 assert!(result.long_id > 0);
570
571 let short = repo.get_by_id(result.short_id).await.unwrap().unwrap();
572 assert_eq!(
573 cognitive_level_from_metadata(&short.metadata),
574 CognitiveLevel::SummaryShort
575 );
576 assert!(short.content.contains("pagination"));
577
578 let long = repo.get_by_id(result.long_id).await.unwrap().unwrap();
579 assert_eq!(
580 cognitive_level_from_metadata(&long.metadata),
581 CognitiveLevel::SummaryLong
582 );
583 assert!(long.content.contains("digest"));
584
585 let short_digests: i64 =
587 sqlx::query_scalar("SELECT COUNT(*) FROM session_digests WHERE digest_kind = ?")
588 .bind(DIGEST_KIND_SHORT)
589 .fetch_one(&pool)
590 .await
591 .unwrap();
592 let long_digests: i64 =
593 sqlx::query_scalar("SELECT COUNT(*) FROM session_digests WHERE digest_kind = ?")
594 .bind(DIGEST_KIND_LONG)
595 .fetch_one(&pool)
596 .await
597 .unwrap();
598 assert_eq!(short_digests, 1);
599 assert_eq!(long_digests, 1);
600 }
601
602 #[tokio::test]
603 async fn test_digest_session_falls_back_on_llm_failure() {
604 let (_pool, repo, namespace_id) = setup_repo().await;
605 store_session_memory(
606 &repo,
607 namespace_id,
608 "Implemented digest service with fallback behavior.",
609 "session-2",
610 CognitiveLevel::Explicit,
611 )
612 .await;
613
614 let bad_response = GenerateResponse {
615 content: "not valid json at all".to_string(),
616 model: "mock-model".to_string(),
617 usage: None,
618 };
619 let service = DigestService::new(
620 AgentConfig::default(),
621 Arc::new(MockLlmClient::new(vec![Ok(bad_response)])),
622 None,
623 );
624
625 let result = service
626 .digest_session(namespace_id, "session-2", &repo, false)
627 .await
628 .unwrap();
629
630 assert_eq!(result.source_count, 1);
631
632 let short = repo.get_by_id(result.short_id).await.unwrap().unwrap();
633 assert_eq!(
634 cognitive_level_from_metadata(&short.metadata),
635 CognitiveLevel::SummaryShort
636 );
637 assert!(short.content.len() <= SHORT_MAX_CHARS);
639
640 let long = repo.get_by_id(result.long_id).await.unwrap().unwrap();
641 assert!(long.content.contains("digest service"));
642 }
643
644 #[tokio::test]
645 async fn test_digest_session_is_idempotent() {
646 let (_pool, repo, namespace_id) = setup_repo().await;
647 store_session_memory(
648 &repo,
649 namespace_id,
650 "Refactored the memory consolidation pipeline.",
651 "session-3",
652 CognitiveLevel::Explicit,
653 )
654 .await;
655
656 let service = DigestService::new(
657 AgentConfig::default(),
658 Arc::new(MockLlmClient::new(vec![Ok(good_digest_response())])),
659 None,
660 );
661
662 let first = service
663 .digest_session(namespace_id, "session-3", &repo, false)
664 .await
665 .unwrap();
666 let second = service
667 .digest_session(namespace_id, "session-3", &repo, false)
668 .await
669 .unwrap();
670
671 assert_eq!(first.short_id, second.short_id);
672 assert_eq!(first.long_id, second.long_id);
673 }
674
675 #[tokio::test]
676 async fn test_digest_session_force_regenerates() {
677 let (pool, repo, namespace_id) = setup_repo().await;
678 store_session_memory(
679 &repo,
680 namespace_id,
681 "Added token counting to digest service.",
682 "session-4",
683 CognitiveLevel::Explicit,
684 )
685 .await;
686
687 let service = DigestService::new(
688 AgentConfig::default(),
689 Arc::new(MockLlmClient::new(vec![
690 Ok(good_digest_response()),
691 Ok(good_digest_response()),
692 ])),
693 None,
694 );
695
696 let first = service
697 .digest_session(namespace_id, "session-4", &repo, false)
698 .await
699 .unwrap();
700 let forced = service
701 .digest_session(namespace_id, "session-4", &repo, true)
702 .await
703 .unwrap();
704
705 assert_ne!(first.short_id, forced.short_id);
707 assert_ne!(first.long_id, forced.long_id);
708
709 let total_digests: i64 =
712 sqlx::query_scalar("SELECT COUNT(*) FROM session_digests WHERE session_key = ?")
713 .bind("session-4")
714 .fetch_one(&pool)
715 .await
716 .unwrap();
717 assert_eq!(total_digests, 2);
718
719 let latest_short = repo
720 .latest_digest_for_session(namespace_id, "session-4", DIGEST_KIND_SHORT)
721 .await
722 .unwrap()
723 .unwrap();
724 let latest_long = repo
725 .latest_digest_for_session(namespace_id, "session-4", DIGEST_KIND_LONG)
726 .await
727 .unwrap()
728 .unwrap();
729 assert_eq!(latest_short.id, forced.short_id);
730 assert_eq!(latest_long.id, forced.long_id);
731 }
732
733 #[tokio::test]
734 async fn test_digest_session_ignores_raw_memories() {
735 let (_pool, repo, namespace_id) = setup_repo().await;
736 store_session_memory(
737 &repo,
738 namespace_id,
739 "Raw noise from hook capture",
740 "session-5",
741 CognitiveLevel::Raw,
742 )
743 .await;
744
745 let service = DigestService::new(
746 AgentConfig::default(),
747 Arc::new(MockLlmClient::new(Vec::new())),
748 None,
749 );
750
751 let result = service
752 .digest_session(namespace_id, "session-5", &repo, false)
753 .await;
754 assert!(result.is_err());
755 }
756
757 #[tokio::test]
758 async fn test_digest_session_ignores_other_sessions() {
759 let (_pool, repo, namespace_id) = setup_repo().await;
760 store_session_memory(
761 &repo,
762 namespace_id,
763 "Memory from a different session entirely.",
764 "other-session",
765 CognitiveLevel::Explicit,
766 )
767 .await;
768
769 let service = DigestService::new(
770 AgentConfig::default(),
771 Arc::new(MockLlmClient::new(Vec::new())),
772 None,
773 );
774
775 let result = service
776 .digest_session(namespace_id, "session-6", &repo, false)
777 .await;
778 assert!(result.is_err());
779 }
780
781 #[tokio::test]
782 async fn test_digest_session_creates_evidence_lineage() {
783 let (_pool, repo, namespace_id) = setup_repo().await;
784 let m1 = store_session_memory(
785 &repo,
786 namespace_id,
787 "Source memory one.",
788 "session-7",
789 CognitiveLevel::Explicit,
790 )
791 .await;
792 let m2 = store_session_memory(
793 &repo,
794 namespace_id,
795 "Source memory two.",
796 "session-7",
797 CognitiveLevel::Explicit,
798 )
799 .await;
800
801 let service = DigestService::new(
802 AgentConfig::default(),
803 Arc::new(MockLlmClient::new(vec![Ok(good_digest_response())])),
804 None,
805 );
806
807 let result = service
808 .digest_session(namespace_id, "session-7", &repo, false)
809 .await
810 .unwrap();
811
812 let short_lineage = repo.load_lineage(result.short_id).await.unwrap();
813 assert_eq!(short_lineage.len(), 2);
814 let short_sources: Vec<i64> = short_lineage.iter().map(|e| e.source_memory_id).collect();
815 assert!(short_sources.contains(&m1.id));
816 assert!(short_sources.contains(&m2.id));
817 assert_eq!(short_lineage[0].evidence_role, DIGESTED_FROM_ROLE);
818
819 let long_lineage = repo.load_lineage(result.long_id).await.unwrap();
820 assert_eq!(long_lineage.len(), 2);
821 }
822
823 #[test]
824 fn test_truncate_within_limit() {
825 assert_eq!(truncate("hello", 10), "hello");
826 }
827
828 #[test]
829 fn test_truncate_at_boundary() {
830 let input = "a".repeat(50);
831 assert_eq!(truncate(&input, 50), input);
832 assert_eq!(truncate(&input, 30).len(), 30);
833 }
834
835 #[test]
836 fn test_truncate_splits_at_word() {
837 let input = "the quick brown fox jumps over the lazy dog";
838 let result = truncate(input, 20);
839 assert!(result.len() <= 20);
840 assert!(!result.ends_with(' ') || result.is_empty());
841 }
842
843 #[test]
844 fn test_estimate_tokens() {
845 assert_eq!(estimate_tokens("hello world"), 2);
846 assert_eq!(estimate_tokens(""), 0);
847 assert_eq!(estimate_tokens(" spaced out "), 2);
848 }
849
850 fn test_memory(id: i64, content: &str) -> Memory {
851 Memory {
852 id,
853 namespace_id: 1,
854 content: content.to_string(),
855 category: MemoryCategory::Session,
856 memory_lane_type: None,
857 labels: vec![],
858 metadata: serde_json::json!({}),
859 similarity_score: None,
860 relevance_score: None,
861 content_embedding: None,
862 embedding_model: None,
863 created_at: Utc::now(),
864 updated_at: None,
865 last_accessed: None,
866 is_active: true,
867 is_archived: false,
868 access_count: 0,
869 }
870 }
871
872 #[tokio::test]
873 async fn test_digest_memories_get_embeddings_when_service_provided() {
874 let (_pool, repo, namespace_id) = setup_repo().await;
875 store_session_memory(
876 &repo,
877 namespace_id,
878 "Fixed query pagination behavior.",
879 "session-embed",
880 CognitiveLevel::Explicit,
881 )
882 .await;
883 store_session_memory(
884 &repo,
885 namespace_id,
886 "Added working-set retrieval primitives.",
887 "session-embed",
888 CognitiveLevel::Explicit,
889 )
890 .await;
891
892 let mock_embed = nexus_embeddings::MockEmbeddingService::new();
893 let service = DigestService::new(
894 AgentConfig::default(),
895 Arc::new(MockLlmClient::new(vec![Ok(good_digest_response())])),
896 Some(Arc::new(mock_embed)),
897 );
898
899 let result = service
900 .digest_session(namespace_id, "session-embed", &repo, false)
901 .await
902 .unwrap();
903
904 let short = repo.get_by_id(result.short_id).await.unwrap().unwrap();
905 assert!(
906 short.content_embedding.is_some(),
907 "short digest should have an embedding when service is provided"
908 );
909 assert_eq!(
910 short.content_embedding.as_ref().unwrap().len(),
911 384,
912 "short digest embedding dimension should be 384"
913 );
914
915 let long = repo.get_by_id(result.long_id).await.unwrap().unwrap();
916 assert!(
917 long.content_embedding.is_some(),
918 "long digest should have an embedding when service is provided"
919 );
920 assert_eq!(
921 long.content_embedding.as_ref().unwrap().len(),
922 384,
923 "long digest embedding dimension should be 384"
924 );
925 }
926
927 #[tokio::test]
928 async fn test_digest_memories_stored_without_embedding_when_service_absent() {
929 let (_pool, repo, namespace_id) = setup_repo().await;
930 store_session_memory(
931 &repo,
932 namespace_id,
933 "Fixed query pagination behavior.",
934 "session-no-embed",
935 CognitiveLevel::Explicit,
936 )
937 .await;
938 store_session_memory(
939 &repo,
940 namespace_id,
941 "Added working-set retrieval primitives.",
942 "session-no-embed",
943 CognitiveLevel::Explicit,
944 )
945 .await;
946
947 let service = DigestService::new(
948 AgentConfig::default(),
949 Arc::new(MockLlmClient::new(vec![Ok(good_digest_response())])),
950 None,
951 );
952
953 let result = service
954 .digest_session(namespace_id, "session-no-embed", &repo, false)
955 .await
956 .unwrap();
957
958 let short = repo.get_by_id(result.short_id).await.unwrap().unwrap();
959 assert!(
960 short.content_embedding.is_none(),
961 "short digest should NOT have embedding when no service provided"
962 );
963
964 let long = repo.get_by_id(result.long_id).await.unwrap().unwrap();
965 assert!(
966 long.content_embedding.is_none(),
967 "long digest should NOT have embedding when no service provided"
968 );
969 }
970
971 #[test]
972 fn test_fallback_short_uses_first_five_memories() {
973 let memories = vec![
974 test_memory(1, "First memory content here."),
975 test_memory(2, "Second memory."),
976 ];
977 let result = fallback_short(&memories);
978 assert!(result.contains("First memory"));
979 assert!(result.contains("Second memory"));
980 }
981
982 #[test]
983 fn test_fallback_long_concatenates_all_memories() {
984 let memories = vec![
985 test_memory(1, "Alpha."),
986 test_memory(2, "Beta."),
987 test_memory(3, "Gamma."),
988 ];
989 let result = fallback_long(&memories);
990 assert!(result.contains("Alpha"));
991 assert!(result.contains("Beta"));
992 assert!(result.contains("Gamma"));
993 }
994}