1use std::collections::{HashMap, HashSet};
4use std::sync::Arc;
5use std::time::Instant;
6
7use nexus_core::{
8 config::Config, traits::EmbeddingService, CognitiveLevel, Memory, WorkingRepresentation,
9 WorkingRepresentationRequest,
10};
11use nexus_storage::repository::{MemoryRepository, SemanticCandidateParams};
12use nexus_vectors::{SearchOptions, SemanticSearch, VectorEntry};
13use tokio::sync::OnceCell;
14use tracing::{debug, warn};
15
16use crate::error::AgentError;
17use crate::ranking::flatten_ranked_representation;
18use crate::util::{flush_metric_samples, stage_metric_sample, CognitionSnapshot};
19
20const SEMANTIC_VECTOR_OVERFETCH_MULTIPLIER: usize = 8;
21const SEMANTIC_VECTOR_THRESHOLD: f32 = 0.58;
22
23enum EmbedderProvider {
24 Disabled,
25 Static(Arc<dyn EmbeddingService>),
26 Auto(Arc<OnceCell<Option<Arc<dyn EmbeddingService>>>>),
27}
28
29pub struct RepresentationService {
30 embedder: EmbedderProvider,
31}
32
33impl EmbedderProvider {
34 async fn resolve(&self) -> Option<Arc<dyn EmbeddingService>> {
35 match self {
36 Self::Disabled => None,
37 Self::Static(embedder) => Some(embedder.clone()),
38 Self::Auto(cell) => {
39 let embedder = cell
40 .get_or_init(|| async {
41 let config = match Config::from_env() {
42 Ok(config) => config,
43 Err(error) => {
44 warn!(
45 error = %error,
46 "Failed to load Nexus config for semantic embeddings; semantic retrieval will fall back to text"
47 );
48 return None;
49 }
50 };
51 match nexus_embeddings::create_service(&config).await {
52 Ok(Some(service)) => Some(service),
53 Ok(None) => None,
54 Err(error) => {
55 warn!(
56 error = %error,
57 "Failed to initialize embedding service; semantic retrieval will fall back to text. Configure a remote embedding provider, local OpenAI-compatible runtime, or set NEXUS_EMBEDDINGS_ENABLED=false"
58 );
59 None
60 }
61 }
62 })
63 .await;
64 embedder.clone()
65 }
66 }
67 }
68}
69
70#[derive(Debug, Clone, Copy)]
71struct BucketLimits {
72 digests: i64,
73 contradictions: i64,
74 derived: i64,
75 semantic: i64,
76 recent: i64,
77}
78
79fn bucket_limits(max_items: usize) -> BucketLimits {
80 let max_items = max_items.max(1);
81 let mut digests = max_items.min(2);
82 let mut contradictions = max_items.saturating_div(10).min(2);
83 let remaining_after_fixed = max_items.saturating_sub(digests + contradictions);
84
85 let mut derived_target = if max_items >= 16 {
86 remaining_after_fixed.min(max_items.saturating_div(4).max(4))
87 } else {
88 remaining_after_fixed.min(max_items.saturating_div(4).max(1))
89 };
90 let remaining_after_derived = remaining_after_fixed.saturating_sub(derived_target);
91
92 let mut semantic_target = if max_items >= 16 {
93 remaining_after_derived.min(((max_items * 3) / 10).max(4))
94 } else {
95 remaining_after_derived.min(max_items.saturating_div(3).max(1))
96 };
97 let mut recent =
98 max_items.saturating_sub(digests + contradictions + derived_target + semantic_target);
99
100 if recent == 0 && max_items > 0 {
101 if semantic_target > 0 {
102 semantic_target = semantic_target.saturating_sub(1);
103 } else if derived_target > 0 {
104 derived_target = derived_target.saturating_sub(1);
105 } else if digests > 0 {
106 digests = digests.saturating_sub(1);
107 } else if contradictions > 0 {
108 contradictions = contradictions.saturating_sub(1);
109 }
110 recent = 1;
111 }
112
113 BucketLimits {
114 digests: digests as i64,
115 contradictions: contradictions as i64,
116 derived: derived_target as i64,
117 semantic: semantic_target as i64,
118 recent: recent as i64,
119 }
120}
121
122fn overfetch_limit(bucket_limit: i64, max_items: usize) -> i64 {
123 let bucket_limit = bucket_limit.max(1) as usize;
124 max_items.max(bucket_limit * 3) as i64
125}
126
127fn include_recent_memory(memory: &Memory, include_raw: bool) -> bool {
128 let snapshot = CognitionSnapshot::from_memory(memory);
129 match snapshot.level {
130 CognitiveLevel::Raw => include_raw,
131 CognitiveLevel::Explicit => snapshot.confidence_meets_threshold(),
132 CognitiveLevel::Derived
133 | CognitiveLevel::Contradiction
134 | CognitiveLevel::SummaryShort
135 | CognitiveLevel::SummaryLong => false,
136 }
137}
138
139fn include_semantic_memory(memory: &Memory, include_raw: bool) -> bool {
140 let snapshot = CognitionSnapshot::from_memory(memory);
141 match snapshot.level {
142 CognitiveLevel::Raw => include_raw,
143 CognitiveLevel::SummaryShort | CognitiveLevel::SummaryLong => false,
144 CognitiveLevel::Explicit | CognitiveLevel::Derived | CognitiveLevel::Contradiction => {
145 snapshot.confidence_meets_threshold()
146 }
147 }
148}
149
150fn include_derived_memory(memory: &Memory) -> bool {
151 let snapshot = CognitionSnapshot::from_memory(memory);
152 snapshot.level == CognitiveLevel::Derived && snapshot.confidence_meets_threshold()
153}
154
155fn include_contradiction_memory(memory: &Memory) -> bool {
156 let snapshot = CognitionSnapshot::from_memory(memory);
157 snapshot.level == CognitiveLevel::Contradiction && snapshot.confidence_meets_threshold()
158}
159
160impl RepresentationService {
161 pub fn new() -> Self {
162 let auto_embedder = Config::from_env()
163 .map(|config| config.embedding.enabled)
164 .unwrap_or(false);
165
166 let embedder = if auto_embedder {
167 EmbedderProvider::Auto(Arc::new(OnceCell::new()))
168 } else {
169 EmbedderProvider::Disabled
170 };
171
172 Self { embedder }
173 }
174
175 pub fn without_embedder() -> Self {
176 Self {
177 embedder: EmbedderProvider::Disabled,
178 }
179 }
180
181 pub fn with_embedder(embedder: Arc<dyn EmbeddingService>) -> Self {
182 Self {
183 embedder: EmbedderProvider::Static(embedder),
184 }
185 }
186
187 async fn build_semantic_bucket(
188 &self,
189 request: &WorkingRepresentationRequest,
190 repo: &MemoryRepository,
191 limit: i64,
192 ) -> Result<Vec<Memory>, AgentError> {
193 let Some(query) = request.query.as_deref() else {
194 return Ok(Vec::new());
195 };
196
197 let semantic_limit = limit.max(1) as usize;
198 let vector_fetch_limit = (overfetch_limit(limit, request.max_items) as usize
199 * SEMANTIC_VECTOR_OVERFETCH_MULTIPLIER) as i64;
200
201 let mut ranked = Vec::new();
202 let mut seen = HashSet::new();
203
204 if let Some(embedder) = self.embedder.resolve().await {
205 match embedder.embed(query).await {
206 Ok(query_embedding) => {
207 let candidates = repo
208 .get_semantic_candidates(SemanticCandidateParams {
209 namespace_id: request.namespace_id,
210 perspective: request.perspective.as_ref(),
211 limit: vector_fetch_limit,
212 include_raw: request.include_raw,
213 })
214 .await
215 .map_err(storage_err)?;
216
217 let mut by_id = HashMap::new();
218 let vectors: Vec<VectorEntry> = candidates
219 .into_iter()
220 .filter(|memory| include_semantic_memory(memory, request.include_raw))
221 .filter_map(|memory| {
222 let embedding = memory.content_embedding.clone()?;
223 by_id.insert(memory.id, memory.clone());
224 Some(
225 VectorEntry::new(
226 memory.id,
227 embedding,
228 memory.category.to_string(),
229 memory.namespace_id,
230 )
231 .with_memory_lane_type(
232 memory
233 .memory_lane_type
234 .as_ref()
235 .map(|lane| lane.to_string())
236 .unwrap_or_else(|| "none".to_string()),
237 ),
238 )
239 })
240 .collect();
241
242 if !vectors.is_empty() {
243 let search = SemanticSearch::new();
244 let options = SearchOptions::with_limit(semantic_limit)
245 .with_namespace(request.namespace_id)
246 .with_threshold(SEMANTIC_VECTOR_THRESHOLD);
247
248 match search.search(&query_embedding, &vectors, &options) {
249 Ok((results, _latency)) => {
250 for result in results {
251 if let Some(memory) = by_id.remove(&result.id) {
252 seen.insert(memory.id);
253 ranked.push(memory);
254 }
255 }
256 }
257 Err(error) => {
258 warn!(error = %error, "Vector semantic search failed; falling back to text search");
259 }
260 }
261 }
262 }
263 Err(error) => {
264 warn!(error = %error, "Query embedding failed; falling back to text search");
265 }
266 }
267 }
268
269 if ranked.len() < semantic_limit {
270 let top_up_limit = overfetch_limit(limit, request.max_items) as i32;
271 let fallback = repo
272 .search_by_text_memories(
273 request.namespace_id,
274 query,
275 top_up_limit,
276 request.include_raw,
277 )
278 .await
279 .map_err(storage_err)?;
280
281 for memory in fallback {
282 if include_semantic_memory(&memory, request.include_raw) && seen.insert(memory.id) {
283 ranked.push(memory);
284 if ranked.len() >= semantic_limit {
285 break;
286 }
287 }
288 }
289 }
290
291 ranked.truncate(semantic_limit);
292 Ok(ranked)
293 }
294
295 pub async fn build(
296 &self,
297 request: &WorkingRepresentationRequest,
298 repo: &MemoryRepository,
299 ) -> Result<WorkingRepresentation, AgentError> {
300 let total_started = Instant::now();
301 let limits = bucket_limits(request.max_items);
302 let mut representation = WorkingRepresentation::default();
303 let mut metrics = Vec::new();
304
305 if request.include_digests && limits.digests > 0 {
306 let started = Instant::now();
307 if let Some(perspective) = request.perspective.as_ref() {
308 if let Some(session_key) = perspective.session_key.as_deref() {
309 if let Some(short) = repo
310 .latest_digest_for_session(request.namespace_id, session_key, "short")
311 .await
312 .map_err(storage_err)?
313 {
314 representation.digests.push(short);
315 }
316 if (representation.digests.len() as i64) < limits.digests {
317 if let Some(long) = repo
318 .latest_digest_for_session(request.namespace_id, session_key, "long")
319 .await
320 .map_err(storage_err)?
321 {
322 representation.digests.push(long);
323 }
324 }
325 }
326 }
327 metrics.push(stage_metric_sample(
328 request.namespace_id,
329 "cognition.representation.digests_ms",
330 started.elapsed().as_secs_f64() * 1000.0,
331 "digests",
332 ));
333 }
334
335 if request.include_digests
338 && !request.cross_namespace_ids.is_empty()
339 && (representation.digests.len() as i64) < limits.digests
340 {
341 let started = Instant::now();
342 for &cross_ns_id in &request.cross_namespace_ids {
343 if cross_ns_id == request.namespace_id {
344 continue;
345 }
346 if representation.digests.len() as i64 >= limits.digests {
347 break;
348 }
349 let cross_digest = if let Some(session_key) = request
350 .perspective
351 .as_ref()
352 .and_then(|p| p.session_key.as_deref())
353 {
354 repo.latest_digest_for_session(cross_ns_id, session_key, "short")
355 .await
356 .ok()
357 .flatten()
358 } else {
359 repo.latest_digest_for_namespace(cross_ns_id, "short")
360 .await
361 .ok()
362 .flatten()
363 };
364
365 if let Some(digest) = cross_digest {
366 representation.digests.push(digest);
367 }
368 }
370 debug!(
371 cross_namespaces = request.cross_namespace_ids.len(),
372 cross_digests_added = representation.digests.len(),
373 "Cross-namespace digest fetch complete"
374 );
375 metrics.push(stage_metric_sample(
376 request.namespace_id,
377 "cognition.representation.cross_namespace_ms",
378 started.elapsed().as_secs_f64() * 1000.0,
379 "cross_namespace",
380 ));
381 }
382
383 if request.include_recent {
384 let started = Instant::now();
385 let fetch_limit = overfetch_limit(limits.recent, request.max_items);
386 representation.recent = if let Some(perspective) = request.perspective.as_ref() {
387 repo.get_recent_by_perspective_opts(
388 request.namespace_id,
389 perspective,
390 fetch_limit,
391 request.include_raw,
392 )
393 .await
394 .map_err(storage_err)?
395 } else {
396 repo.list_filtered(
397 request.namespace_id,
398 nexus_storage::repository::ListMemoryFilters {
399 category: None,
400 since: None,
401 until: None,
402 content_like: None,
403 include_raw: request.include_raw,
404 limit: fetch_limit,
405 offset: 0,
406 },
407 )
408 .await
409 .map_err(storage_err)?
410 };
411 representation
412 .recent
413 .retain(|memory| include_recent_memory(memory, request.include_raw));
414 representation
415 .recent
416 .truncate(limits.recent.max(0) as usize);
417 metrics.push(stage_metric_sample(
418 request.namespace_id,
419 "cognition.representation.recent_ms",
420 started.elapsed().as_secs_f64() * 1000.0,
421 "recent",
422 ));
423 }
424
425 if request.include_semantic && limits.semantic > 0 {
426 let started = Instant::now();
427 representation.semantic = self
428 .build_semantic_bucket(request, repo, limits.semantic)
429 .await?;
430 metrics.push(stage_metric_sample(
431 request.namespace_id,
432 "cognition.representation.semantic_ms",
433 started.elapsed().as_secs_f64() * 1000.0,
434 "semantic",
435 ));
436 }
437
438 if let Some(perspective) = request.perspective.as_ref() {
439 if request.include_derived && limits.derived > 0 {
440 let started = Instant::now();
441 let fetch_limit = overfetch_limit(limits.derived, request.max_items);
442 representation.derived = repo
443 .get_most_reinforced_by_perspective_opts(
444 request.namespace_id,
445 perspective,
446 fetch_limit,
447 request.include_raw,
448 )
449 .await
450 .map_err(storage_err)?
451 .into_iter()
452 .filter(include_derived_memory)
453 .collect();
454 representation.derived.truncate(limits.derived as usize);
455 metrics.push(stage_metric_sample(
456 request.namespace_id,
457 "cognition.representation.derived_ms",
458 started.elapsed().as_secs_f64() * 1000.0,
459 "derived",
460 ));
461 }
462
463 if request.include_contradictions && limits.contradictions > 0 {
464 let started = Instant::now();
465 let fetch_limit = overfetch_limit(limits.contradictions, request.max_items);
466 representation.contradictions = repo
467 .get_contradictions_by_perspective_opts(
468 request.namespace_id,
469 perspective,
470 fetch_limit,
471 request.include_raw,
472 )
473 .await
474 .map_err(storage_err)?
475 .into_iter()
476 .filter(include_contradiction_memory)
477 .collect();
478 representation
479 .contradictions
480 .truncate(limits.contradictions as usize);
481 metrics.push(stage_metric_sample(
482 request.namespace_id,
483 "cognition.representation.contradictions_ms",
484 started.elapsed().as_secs_f64() * 1000.0,
485 "contradictions",
486 ));
487 }
488 } else {
489 if request.include_derived && limits.derived > 0 {
490 let started = Instant::now();
491 let fetch_limit = overfetch_limit(limits.derived, request.max_items);
492 representation.derived = repo
493 .get_most_reinforced_by_namespace(
494 request.namespace_id,
495 fetch_limit,
496 request.include_raw,
497 )
498 .await
499 .map_err(storage_err)?
500 .into_iter()
501 .filter(include_derived_memory)
502 .collect();
503 representation.derived.truncate(limits.derived as usize);
504 metrics.push(stage_metric_sample(
505 request.namespace_id,
506 "cognition.representation.derived_ms",
507 started.elapsed().as_secs_f64() * 1000.0,
508 "derived",
509 ));
510 }
511
512 if request.include_contradictions && limits.contradictions > 0 {
513 let started = Instant::now();
514 let fetch_limit = overfetch_limit(limits.contradictions, request.max_items);
515 representation.contradictions = repo
516 .get_contradictions_by_namespace(
517 request.namespace_id,
518 fetch_limit,
519 request.include_raw,
520 )
521 .await
522 .map_err(storage_err)?
523 .into_iter()
524 .filter(include_contradiction_memory)
525 .collect();
526 representation
527 .contradictions
528 .truncate(limits.contradictions as usize);
529 metrics.push(stage_metric_sample(
530 request.namespace_id,
531 "cognition.representation.contradictions_ms",
532 started.elapsed().as_secs_f64() * 1000.0,
533 "contradictions",
534 ));
535 }
536 }
537
538 metrics.push(stage_metric_sample(
539 request.namespace_id,
540 "cognition.representation.total_ms",
541 total_started.elapsed().as_secs_f64() * 1000.0,
542 "total",
543 ));
544 flush_metric_samples(repo, &metrics).await;
545
546 Ok(representation)
547 }
548
549 pub async fn flat_working_set(
550 &self,
551 request: &WorkingRepresentationRequest,
552 repo: &MemoryRepository,
553 ) -> Result<Vec<Memory>, AgentError> {
554 let representation = self.build(request, repo).await?;
555 Ok(flatten_ranked_representation(representation, request)
556 .into_iter()
557 .map(|bucketed| bucketed.memory)
558 .collect())
559 }
560}
561
562fn storage_err(error: nexus_core::NexusError) -> AgentError {
563 AgentError::Storage(error.to_string())
564}
565
566impl Default for RepresentationService {
567 fn default() -> Self {
568 Self::new()
569 }
570}
571
572#[cfg(test)]
573mod tests {
574 use super::*;
575
576 use nexus_core::traits::EmbeddingService;
577 use nexus_core::{
578 cognitive_level_from_metadata, CognitiveMetadata, MemoryCategory, PerspectiveKey,
579 };
580 use nexus_embeddings::MockEmbeddingService;
581 use nexus_storage::repository::{
582 MemoryRepository, NamespaceRepository, StoreDigestParams, StoreMemoryParams,
583 };
584 use sqlx::sqlite::SqlitePoolOptions;
585
586 async fn setup_repo() -> (MemoryRepository, i64, PerspectiveKey) {
587 let pool = SqlitePoolOptions::new()
588 .max_connections(1)
589 .connect("sqlite::memory:")
590 .await
591 .unwrap();
592 nexus_storage::migrations::run_migrations(&pool)
593 .await
594 .unwrap();
595 let namespace_repo = NamespaceRepository::new(pool.clone());
596 let namespace = namespace_repo
597 .get_or_create("representation-test", "representation-test")
598 .await
599 .unwrap();
600 let perspective =
601 PerspectiveKey::new("claude-code", "claude-code", Some("session-1".to_string()));
602 (MemoryRepository::new(pool), namespace.id, perspective)
603 }
604
605 fn metadata(level: CognitiveLevel, perspective: &PerspectiveKey) -> serde_json::Value {
606 let mut cognitive = CognitiveMetadata::new(
607 level,
608 perspective.observer.clone(),
609 perspective.subject.clone(),
610 perspective.session_key.clone(),
611 "test",
612 );
613 cognitive.confidence = Some(0.9);
614 cognitive.merge_into(&serde_json::json!({}))
615 }
616
617 async fn store_memory(
618 repo: &MemoryRepository,
619 namespace_id: i64,
620 content: &str,
621 level: CognitiveLevel,
622 perspective: &PerspectiveKey,
623 ) -> Memory {
624 repo.store(StoreMemoryParams {
625 namespace_id,
626 content,
627 category: &MemoryCategory::Facts,
628 memory_lane_type: None,
629 labels: &[],
630 metadata: &metadata(level, perspective),
631 embedding: None,
632 embedding_model: None,
633 })
634 .await
635 .unwrap()
636 }
637
638 async fn store_memory_with_embedding(
639 repo: &MemoryRepository,
640 namespace_id: i64,
641 content: &str,
642 level: CognitiveLevel,
643 perspective: &PerspectiveKey,
644 embedding: &[f32],
645 ) -> Memory {
646 repo.store(StoreMemoryParams {
647 namespace_id,
648 content,
649 category: &MemoryCategory::Facts,
650 memory_lane_type: None,
651 labels: &[],
652 metadata: &metadata(level, perspective),
653 embedding: Some(embedding),
654 embedding_model: Some("mock-embedding"),
655 })
656 .await
657 .unwrap()
658 }
659
660 #[tokio::test]
661 async fn test_build_representation_groups_buckets() {
662 let (repo, namespace_id, perspective) = setup_repo().await;
663 let digest = store_memory(
664 &repo,
665 namespace_id,
666 "short digest",
667 CognitiveLevel::SummaryShort,
668 &perspective,
669 )
670 .await;
671 repo.store_digest(StoreDigestParams {
672 namespace_id,
673 session_key: "session-1",
674 digest_kind: "short",
675 memory_id: digest.id,
676 start_memory_id: Some(1),
677 end_memory_id: Some(2),
678 token_count: 32,
679 })
680 .await
681 .unwrap();
682
683 store_memory(
684 &repo,
685 namespace_id,
686 "recent explicit observation",
687 CognitiveLevel::Explicit,
688 &perspective,
689 )
690 .await;
691 store_memory(
692 &repo,
693 namespace_id,
694 "derived insight",
695 CognitiveLevel::Derived,
696 &perspective,
697 )
698 .await;
699 store_memory(
700 &repo,
701 namespace_id,
702 "contradiction note",
703 CognitiveLevel::Contradiction,
704 &perspective,
705 )
706 .await;
707
708 let service = RepresentationService::new();
709 let representation = service
710 .build(
711 &WorkingRepresentationRequest {
712 namespace_id,
713 perspective: Some(perspective),
714 query: Some("recent".to_string()),
715 max_items: 12,
716 include_raw: false,
717 include_recent: true,
718 include_semantic: true,
719 include_derived: true,
720 include_digests: true,
721 include_contradictions: true,
722 ..WorkingRepresentationRequest::default()
723 },
724 &repo,
725 )
726 .await
727 .unwrap();
728
729 assert_eq!(representation.digests.len(), 1);
730 assert_eq!(representation.derived.len(), 1);
731 assert_eq!(representation.contradictions.len(), 1);
732 assert!(!representation.recent.is_empty());
733 assert!(!representation.semantic.is_empty());
734 }
735
736 #[tokio::test]
737 async fn test_flat_working_set_uses_storage_primitive() {
738 let (repo, namespace_id, perspective) = setup_repo().await;
739 store_memory(
740 &repo,
741 namespace_id,
742 "derived insight",
743 CognitiveLevel::Derived,
744 &perspective,
745 )
746 .await;
747 store_memory(
748 &repo,
749 namespace_id,
750 "recent explicit observation",
751 CognitiveLevel::Explicit,
752 &perspective,
753 )
754 .await;
755
756 let service = RepresentationService::new();
757 let flat = service
758 .flat_working_set(
759 &WorkingRepresentationRequest {
760 namespace_id,
761 perspective: Some(perspective),
762 max_items: 4,
763 ..WorkingRepresentationRequest::default()
764 },
765 &repo,
766 )
767 .await
768 .unwrap();
769
770 assert!(!flat.is_empty());
771 assert!(flat.len() <= 4);
772 }
773
774 #[tokio::test]
775 async fn test_build_representation_without_perspective_excludes_raw_noise() {
776 let (repo, namespace_id, perspective) = setup_repo().await;
777 store_memory(
778 &repo,
779 namespace_id,
780 "recent explicit observation",
781 CognitiveLevel::Explicit,
782 &perspective,
783 )
784 .await;
785
786 repo.store(StoreMemoryParams {
787 namespace_id,
788 content: "raw hook payload",
789 category: &MemoryCategory::Session,
790 memory_lane_type: None,
791 labels: &["raw-activity".to_string()],
792 metadata: &serde_json::json!({
793 "raw_activity": true,
794 "cognitive": {
795 "level": "raw",
796 "observer": "claude-code",
797 "subject": "claude-code",
798 "session_key": "session-1",
799 "generated_by": "test"
800 }
801 }),
802 embedding: None,
803 embedding_model: None,
804 })
805 .await
806 .unwrap();
807
808 let representation = RepresentationService::new()
809 .build(
810 &WorkingRepresentationRequest {
811 namespace_id,
812 perspective: None,
813 query: None,
814 max_items: 10,
815 include_raw: false,
816 include_recent: true,
817 include_semantic: false,
818 include_derived: false,
819 include_digests: false,
820 include_contradictions: false,
821 ..WorkingRepresentationRequest::default()
822 },
823 &repo,
824 )
825 .await
826 .unwrap();
827
828 assert_eq!(representation.recent.len(), 1);
829 assert_eq!(
830 representation.recent[0].content,
831 "recent explicit observation"
832 );
833 }
834
835 #[tokio::test]
836 async fn test_build_representation_without_perspective_includes_cognition_outputs() {
837 let (repo, namespace_id, perspective) = setup_repo().await;
838 store_memory(
839 &repo,
840 namespace_id,
841 "derived insight",
842 CognitiveLevel::Derived,
843 &perspective,
844 )
845 .await;
846 store_memory(
847 &repo,
848 namespace_id,
849 "contradiction note",
850 CognitiveLevel::Contradiction,
851 &perspective,
852 )
853 .await;
854
855 let representation = RepresentationService::new()
856 .build(
857 &WorkingRepresentationRequest {
858 namespace_id,
859 perspective: None,
860 query: None,
861 max_items: 10,
862 include_raw: false,
863 include_recent: false,
864 include_semantic: false,
865 include_derived: true,
866 include_digests: false,
867 include_contradictions: true,
868 ..WorkingRepresentationRequest::default()
869 },
870 &repo,
871 )
872 .await
873 .unwrap();
874
875 assert_eq!(representation.derived.len(), 1);
876 assert_eq!(representation.derived[0].content, "derived insight");
877 assert_eq!(representation.contradictions.len(), 1);
878 assert_eq!(
879 representation.contradictions[0].content,
880 "contradiction note"
881 );
882 }
883
884 #[tokio::test]
885 async fn test_build_representation_without_perspective_can_include_raw_noise() {
886 let (repo, namespace_id, perspective) = setup_repo().await;
887 store_memory(
888 &repo,
889 namespace_id,
890 "recent explicit observation",
891 CognitiveLevel::Explicit,
892 &perspective,
893 )
894 .await;
895
896 repo.store(StoreMemoryParams {
897 namespace_id,
898 content: "raw hook payload",
899 category: &MemoryCategory::Session,
900 memory_lane_type: None,
901 labels: &["raw-activity".to_string()],
902 metadata: &serde_json::json!({
903 "raw_activity": true,
904 "cognitive": {
905 "level": "raw",
906 "observer": "claude-code",
907 "subject": "claude-code",
908 "session_key": "session-1",
909 "generated_by": "test"
910 }
911 }),
912 embedding: None,
913 embedding_model: None,
914 })
915 .await
916 .unwrap();
917
918 let representation = RepresentationService::new()
919 .build(
920 &WorkingRepresentationRequest {
921 namespace_id,
922 perspective: None,
923 query: None,
924 max_items: 10,
925 include_raw: true,
926 include_recent: true,
927 include_semantic: false,
928 include_derived: false,
929 include_digests: false,
930 include_contradictions: false,
931 ..WorkingRepresentationRequest::default()
932 },
933 &repo,
934 )
935 .await
936 .unwrap();
937
938 assert_eq!(representation.recent.len(), 2);
939 assert!(representation
940 .recent
941 .iter()
942 .any(|memory| memory.content == "raw hook payload"));
943 }
944
945 #[test]
946 fn test_bucket_limits_match_locked_default_allocation() {
947 let limits = bucket_limits(24);
948 assert_eq!(limits.digests, 2);
949 assert_eq!(limits.contradictions, 2);
950 assert_eq!(limits.derived, 6);
951 assert_eq!(limits.semantic, 7);
952 assert_eq!(limits.recent, 7);
953 }
954
955 #[test]
956 fn test_bucket_limits_preserve_recent_slot_for_tiny_requests() {
957 let four = bucket_limits(4);
958 assert!(four.recent >= 1);
959
960 let single = bucket_limits(1);
961 assert_eq!(single.recent, 1);
962 }
963
964 #[tokio::test]
965 async fn test_build_representation_filters_below_confidence_thresholds() {
966 let (repo, namespace_id, perspective) = setup_repo().await;
967
968 let mut low_derived = metadata(CognitiveLevel::Derived, &perspective);
969 low_derived["cognitive"]["confidence"] = serde_json::json!(0.60);
970 repo.store(StoreMemoryParams {
971 namespace_id,
972 content: "low confidence derived",
973 category: &MemoryCategory::Facts,
974 memory_lane_type: None,
975 labels: &[],
976 metadata: &low_derived,
977 embedding: None,
978 embedding_model: None,
979 })
980 .await
981 .unwrap();
982
983 let mut high_derived = metadata(CognitiveLevel::Derived, &perspective);
984 high_derived["cognitive"]["confidence"] = serde_json::json!(0.90);
985 repo.store(StoreMemoryParams {
986 namespace_id,
987 content: "high confidence derived",
988 category: &MemoryCategory::Facts,
989 memory_lane_type: None,
990 labels: &[],
991 metadata: &high_derived,
992 embedding: None,
993 embedding_model: None,
994 })
995 .await
996 .unwrap();
997
998 let representation = RepresentationService::new()
999 .build(
1000 &WorkingRepresentationRequest {
1001 namespace_id,
1002 perspective: Some(perspective),
1003 query: None,
1004 max_items: 10,
1005 include_raw: false,
1006 include_recent: false,
1007 include_semantic: false,
1008 include_derived: true,
1009 include_digests: false,
1010 include_contradictions: false,
1011 ..WorkingRepresentationRequest::default()
1012 },
1013 &repo,
1014 )
1015 .await
1016 .unwrap();
1017
1018 assert_eq!(representation.derived.len(), 1);
1019 assert_eq!(representation.derived[0].content, "high confidence derived");
1020 }
1021
1022 #[tokio::test]
1023 async fn test_build_representation_prefers_vector_semantic_matches() {
1024 let (repo, namespace_id, perspective) = setup_repo().await;
1025 let embedder = Arc::new(MockEmbeddingService::new());
1026 let semantic_query = "provider switch rollout";
1027 let vector = embedder.embed(semantic_query).await.unwrap();
1028
1029 store_memory_with_embedding(
1030 &repo,
1031 namespace_id,
1032 "migration timeline digest summary",
1033 CognitiveLevel::Derived,
1034 &perspective,
1035 &vector,
1036 )
1037 .await;
1038 store_memory(
1039 &repo,
1040 namespace_id,
1041 "provider switch rollout plain text fallback",
1042 CognitiveLevel::Explicit,
1043 &perspective,
1044 )
1045 .await;
1046
1047 let representation = RepresentationService::with_embedder(embedder)
1048 .build(
1049 &WorkingRepresentationRequest {
1050 namespace_id,
1051 perspective: Some(perspective),
1052 query: Some(semantic_query.to_string()),
1053 max_items: 12,
1054 include_raw: false,
1055 include_recent: false,
1056 include_semantic: true,
1057 include_derived: false,
1058 include_digests: false,
1059 include_contradictions: false,
1060 ..WorkingRepresentationRequest::default()
1061 },
1062 &repo,
1063 )
1064 .await
1065 .unwrap();
1066
1067 assert_eq!(representation.semantic.len(), 2);
1068 assert_eq!(
1069 representation.semantic[0].content,
1070 "migration timeline digest summary"
1071 );
1072 assert_eq!(
1073 representation.semantic[1].content,
1074 "provider switch rollout plain text fallback"
1075 );
1076 }
1077
1078 #[tokio::test]
1079 async fn test_build_representation_text_fallback_overfetches_after_filtering() {
1080 let (repo, namespace_id, perspective) = setup_repo().await;
1081 let embedder = Arc::new(MockEmbeddingService::new());
1082 let query = "provider switch rollout";
1083 let vector = embedder.embed(query).await.unwrap();
1084
1085 store_memory_with_embedding(
1086 &repo,
1087 namespace_id,
1088 "vector-only semantic memory",
1089 CognitiveLevel::Derived,
1090 &perspective,
1091 &vector,
1092 )
1093 .await;
1094
1095 let mut low_derived = metadata(CognitiveLevel::Derived, &perspective);
1096 low_derived["cognitive"]["confidence"] = serde_json::json!(0.60);
1097 repo.store(StoreMemoryParams {
1098 namespace_id,
1099 content: "provider switch rollout low confidence derived",
1100 category: &MemoryCategory::Facts,
1101 memory_lane_type: None,
1102 labels: &[],
1103 metadata: &low_derived,
1104 embedding: None,
1105 embedding_model: None,
1106 })
1107 .await
1108 .unwrap();
1109
1110 let mut low_explicit = metadata(CognitiveLevel::Explicit, &perspective);
1111 low_explicit["cognitive"]["confidence"] = serde_json::json!(0.60);
1112 repo.store(StoreMemoryParams {
1113 namespace_id,
1114 content: "provider switch rollout low confidence explicit",
1115 category: &MemoryCategory::Facts,
1116 memory_lane_type: None,
1117 labels: &[],
1118 metadata: &low_explicit,
1119 embedding: None,
1120 embedding_model: None,
1121 })
1122 .await
1123 .unwrap();
1124
1125 store_memory(
1126 &repo,
1127 namespace_id,
1128 "provider switch rollout strong explicit",
1129 CognitiveLevel::Explicit,
1130 &perspective,
1131 )
1132 .await;
1133
1134 let text_hits = repo
1135 .search_by_text_memories(namespace_id, query, 4, false)
1136 .await
1137 .unwrap();
1138 assert_eq!(text_hits.len(), 3);
1139
1140 let representation = RepresentationService::with_embedder(embedder)
1141 .build(
1142 &WorkingRepresentationRequest {
1143 namespace_id,
1144 perspective: Some(perspective),
1145 query: Some(query.to_string()),
1146 max_items: 6,
1147 include_raw: false,
1148 include_recent: false,
1149 include_semantic: true,
1150 include_derived: false,
1151 include_digests: false,
1152 include_contradictions: false,
1153 ..WorkingRepresentationRequest::default()
1154 },
1155 &repo,
1156 )
1157 .await
1158 .unwrap();
1159
1160 assert_eq!(representation.semantic.len(), 2);
1161 assert_eq!(
1162 representation.semantic[0].content,
1163 "vector-only semantic memory"
1164 );
1165 assert_eq!(
1166 representation.semantic[1].content,
1167 "provider switch rollout strong explicit"
1168 );
1169 assert_eq!(
1170 cognitive_level_from_metadata(&representation.semantic[1].metadata),
1171 CognitiveLevel::Explicit
1172 );
1173 }
1174}