1use std::collections::{HashMap, HashSet};
4use std::sync::Arc;
5use std::time::Instant;
6
7use nexus_core::{
8 config::Config, traits::EmbeddingService, CognitiveLevel, Memory, WorkingRepresentation,
9 WorkingRepresentationRequest,
10};
11use nexus_storage::repository::{MemoryRepository, SemanticCandidateParams};
12use nexus_vectors::{SearchOptions, SemanticSearch, VectorEntry};
13use tokio::sync::OnceCell;
14use tracing::{debug, warn};
15
16use crate::error::AgentError;
17use crate::ranking::flatten_ranked_representation;
18use crate::util::{flush_metric_samples, stage_metric_sample, CognitionSnapshot};
19
20const SEMANTIC_VECTOR_OVERFETCH_MULTIPLIER: usize = 8;
21const SEMANTIC_VECTOR_THRESHOLD: f32 = 0.58;
22
23enum EmbedderProvider {
24 Disabled,
25 Static(Arc<dyn EmbeddingService>),
26 Auto(Arc<OnceCell<Option<Arc<dyn EmbeddingService>>>>),
27}
28
29pub struct RepresentationService {
30 embedder: EmbedderProvider,
31}
32
33impl EmbedderProvider {
34 async fn resolve(&self) -> Option<Arc<dyn EmbeddingService>> {
35 match self {
36 Self::Disabled => None,
37 Self::Static(embedder) => Some(embedder.clone()),
38 Self::Auto(cell) => {
39 let embedder = cell
40 .get_or_init(|| async {
41 let config = match Config::from_env() {
42 Ok(config) => config,
43 Err(error) => {
44 warn!(
45 error = %error,
46 "Failed to load Nexus config for semantic embeddings; semantic retrieval will fall back to text"
47 );
48 return None;
49 }
50 };
51 match nexus_embeddings::create_service(&config).await {
52 Ok(Some(service)) => Some(service),
53 Ok(None) => None,
54 Err(error) => {
55 warn!(
56 error = %error,
57 "Failed to initialize embedding service; semantic retrieval will fall back to text. Configure a remote embedding provider, local OpenAI-compatible runtime, or set NEXUS_EMBEDDINGS_ENABLED=false"
58 );
59 None
60 }
61 }
62 })
63 .await;
64 embedder.clone()
65 }
66 }
67 }
68}
69
70#[derive(Debug, Clone, Copy)]
71struct BucketLimits {
72 digests: i64,
73 contradictions: i64,
74 derived: i64,
75 semantic: i64,
76 recent: i64,
77}
78
79fn bucket_limits(max_items: usize) -> BucketLimits {
80 let max_items = max_items.max(1);
81 let mut digests = max_items.min(2);
82 let mut contradictions = max_items.saturating_div(10).min(2);
83 let remaining_after_fixed = max_items.saturating_sub(digests + contradictions);
84
85 let mut derived_target = if max_items >= 16 {
86 remaining_after_fixed.min(max_items.saturating_div(4).max(4))
87 } else {
88 remaining_after_fixed.min(max_items.saturating_div(4).max(1))
89 };
90 let remaining_after_derived = remaining_after_fixed.saturating_sub(derived_target);
91
92 let mut semantic_target = if max_items >= 16 {
93 remaining_after_derived.min(((max_items * 3) / 10).max(4))
94 } else {
95 remaining_after_derived.min(max_items.saturating_div(3).max(1))
96 };
97 let mut recent =
98 max_items.saturating_sub(digests + contradictions + derived_target + semantic_target);
99
100 if recent == 0 && max_items > 0 {
101 if semantic_target > 0 {
102 semantic_target = semantic_target.saturating_sub(1);
103 } else if derived_target > 0 {
104 derived_target = derived_target.saturating_sub(1);
105 } else if digests > 0 {
106 digests = digests.saturating_sub(1);
107 } else if contradictions > 0 {
108 contradictions = contradictions.saturating_sub(1);
109 }
110 recent = 1;
111 }
112
113 BucketLimits {
114 digests: digests as i64,
115 contradictions: contradictions as i64,
116 derived: derived_target as i64,
117 semantic: semantic_target as i64,
118 recent: recent as i64,
119 }
120}
121
122fn overfetch_limit(bucket_limit: i64, max_items: usize) -> i64 {
123 let bucket_limit = bucket_limit.max(1) as usize;
124 max_items.max(bucket_limit * 3) as i64
125}
126
127fn include_recent_memory(memory: &Memory, include_raw: bool) -> bool {
128 let snapshot = CognitionSnapshot::from_memory(memory);
129 match snapshot.level {
130 CognitiveLevel::Raw => include_raw,
131 CognitiveLevel::Explicit => snapshot.confidence_meets_threshold(),
132 CognitiveLevel::Derived
133 | CognitiveLevel::Contradiction
134 | CognitiveLevel::SummaryShort
135 | CognitiveLevel::SummaryLong => false,
136 }
137}
138
139fn include_semantic_memory(memory: &Memory, include_raw: bool) -> bool {
140 let snapshot = CognitionSnapshot::from_memory(memory);
141 match snapshot.level {
142 CognitiveLevel::Raw => include_raw,
143 CognitiveLevel::SummaryShort | CognitiveLevel::SummaryLong => false,
144 CognitiveLevel::Explicit | CognitiveLevel::Derived | CognitiveLevel::Contradiction => {
145 snapshot.confidence_meets_threshold()
146 }
147 }
148}
149
150fn include_derived_memory(memory: &Memory) -> bool {
151 let snapshot = CognitionSnapshot::from_memory(memory);
152 snapshot.level == CognitiveLevel::Derived && snapshot.confidence_meets_threshold()
153}
154
155fn include_contradiction_memory(memory: &Memory) -> bool {
156 let snapshot = CognitionSnapshot::from_memory(memory);
157 snapshot.level == CognitiveLevel::Contradiction && snapshot.confidence_meets_threshold()
158}
159
160impl RepresentationService {
161 pub fn new() -> Self {
162 let auto_embedder = Config::from_env()
163 .map(|config| config.embedding.enabled)
164 .unwrap_or(false);
165
166 let embedder = if auto_embedder {
167 EmbedderProvider::Auto(Arc::new(OnceCell::new()))
168 } else {
169 EmbedderProvider::Disabled
170 };
171
172 Self { embedder }
173 }
174
175 pub fn without_embedder() -> Self {
176 Self {
177 embedder: EmbedderProvider::Disabled,
178 }
179 }
180
181 pub fn with_embedder(embedder: Arc<dyn EmbeddingService>) -> Self {
182 Self {
183 embedder: EmbedderProvider::Static(embedder),
184 }
185 }
186
187 async fn build_semantic_bucket(
188 &self,
189 request: &WorkingRepresentationRequest,
190 repo: &MemoryRepository,
191 limit: i64,
192 ) -> Result<Vec<Memory>, AgentError> {
193 let Some(query) = request.query.as_deref() else {
194 return Ok(Vec::new());
195 };
196
197 let semantic_limit = limit.max(1) as usize;
198 let vector_fetch_limit = (overfetch_limit(limit, request.max_items) as usize
199 * SEMANTIC_VECTOR_OVERFETCH_MULTIPLIER) as i64;
200
201 let mut ranked = Vec::new();
202 let mut seen = HashSet::new();
203
204 if let Some(embedder) = self.embedder.resolve().await {
205 match embedder.embed(query).await {
206 Ok(query_embedding) => {
207 let candidates = repo
208 .get_semantic_candidates(SemanticCandidateParams {
209 namespace_id: request.namespace_id,
210 perspective: request.perspective.as_ref(),
211 limit: vector_fetch_limit,
212 include_raw: request.include_raw,
213 })
214 .await
215 .map_err(storage_err)?;
216
217 let mut by_id = HashMap::new();
218 let vectors: Vec<VectorEntry> = candidates
219 .into_iter()
220 .filter(|memory| include_semantic_memory(memory, request.include_raw))
221 .filter_map(|memory| {
222 let embedding = memory.content_embedding.clone()?;
223 by_id.insert(memory.id, memory.clone());
224 Some(
225 VectorEntry::new(
226 memory.id,
227 embedding,
228 memory.category.to_string(),
229 memory.namespace_id,
230 )
231 .with_memory_lane_type(
232 memory
233 .memory_lane_type
234 .as_ref()
235 .map(|lane| lane.to_string())
236 .unwrap_or_else(|| "none".to_string()),
237 ),
238 )
239 })
240 .collect();
241
242 if !vectors.is_empty() {
243 let search = SemanticSearch::new();
244 let options = SearchOptions::with_limit(semantic_limit)
245 .with_namespace(request.namespace_id)
246 .with_threshold(SEMANTIC_VECTOR_THRESHOLD);
247
248 match search.search(&query_embedding, &vectors, &options) {
249 Ok((results, _latency)) => {
250 for result in results {
251 if let Some(memory) = by_id.remove(&result.id) {
252 seen.insert(memory.id);
253 ranked.push(memory);
254 }
255 }
256 }
257 Err(error) => {
258 warn!(error = %error, "Vector semantic search failed; falling back to text search");
259 }
260 }
261 }
262 }
263 Err(error) => {
264 warn!(error = %error, "Query embedding failed; falling back to text search");
265 }
266 }
267 }
268
269 if ranked.len() < semantic_limit {
270 let top_up_limit = overfetch_limit(limit, request.max_items) as i32;
271 let fallback = repo
272 .search_by_text_memories(
273 request.namespace_id,
274 query,
275 top_up_limit,
276 request.include_raw,
277 )
278 .await
279 .map_err(storage_err)?;
280
281 for memory in fallback {
282 if include_semantic_memory(&memory, request.include_raw) && seen.insert(memory.id) {
283 ranked.push(memory);
284 if ranked.len() >= semantic_limit {
285 break;
286 }
287 }
288 }
289 }
290
291 ranked.truncate(semantic_limit);
292 Ok(ranked)
293 }
294
295 pub async fn build(
296 &self,
297 request: &WorkingRepresentationRequest,
298 repo: &MemoryRepository,
299 ) -> Result<WorkingRepresentation, AgentError> {
300 let total_started = Instant::now();
301 let limits = bucket_limits(request.max_items);
302 let mut representation = WorkingRepresentation::default();
303 let mut metrics = Vec::new();
304
305 if request.include_digests && limits.digests > 0 {
306 let started = Instant::now();
307 if let Some(perspective) = request.perspective.as_ref() {
308 if let Some(session_key) = perspective.session_key.as_deref() {
309 if let Some(short) = repo
310 .latest_digest_for_session(request.namespace_id, session_key, "short")
311 .await
312 .map_err(storage_err)?
313 {
314 representation.digests.push(short);
315 }
316 if (representation.digests.len() as i64) < limits.digests {
317 if let Some(long) = repo
318 .latest_digest_for_session(request.namespace_id, session_key, "long")
319 .await
320 .map_err(storage_err)?
321 {
322 representation.digests.push(long);
323 }
324 }
325 }
326 }
327 metrics.push(stage_metric_sample(
328 request.namespace_id,
329 "cognition.representation.digests_ms",
330 started.elapsed().as_secs_f64() * 1000.0,
331 "digests",
332 ));
333 }
334
335 if request.include_digests
338 && !request.cross_namespace_ids.is_empty()
339 && (representation.digests.len() as i64) < limits.digests
340 {
341 let started = Instant::now();
342 for &cross_ns_id in &request.cross_namespace_ids {
343 if cross_ns_id == request.namespace_id {
344 continue;
345 }
346 if representation.digests.len() as i64 >= limits.digests {
347 break;
348 }
349 let cross_digest = if let Some(session_key) = request
350 .perspective
351 .as_ref()
352 .and_then(|p| p.session_key.as_deref())
353 {
354 repo.latest_digest_for_session(cross_ns_id, session_key, "short")
355 .await
356 .ok()
357 .flatten()
358 } else {
359 repo.latest_digest_for_namespace(cross_ns_id, "short")
360 .await
361 .ok()
362 .flatten()
363 };
364
365 if let Some(digest) = cross_digest {
366 representation.digests.push(digest);
367 }
368 }
370 debug!(
371 cross_namespaces = request.cross_namespace_ids.len(),
372 cross_digests_added = representation.digests.len(),
373 "Cross-namespace digest fetch complete"
374 );
375 metrics.push(stage_metric_sample(
376 request.namespace_id,
377 "cognition.representation.cross_namespace_ms",
378 started.elapsed().as_secs_f64() * 1000.0,
379 "cross_namespace",
380 ));
381 }
382
383 if request.include_recent {
384 let started = Instant::now();
385 let fetch_limit = overfetch_limit(limits.recent, request.max_items);
386 representation.recent = if let Some(perspective) = request.perspective.as_ref() {
387 repo.get_recent_by_perspective_opts(
388 request.namespace_id,
389 perspective,
390 fetch_limit,
391 request.include_raw,
392 )
393 .await
394 .map_err(storage_err)?
395 } else {
396 repo.list_filtered(
397 request.namespace_id,
398 nexus_storage::repository::ListMemoryFilters {
399 category: None,
400 since: None,
401 until: None,
402 content_like: None,
403 include_raw: request.include_raw,
404 limit: fetch_limit,
405 offset: 0,
406 },
407 )
408 .await
409 .map_err(storage_err)?
410 };
411 representation
412 .recent
413 .retain(|memory| include_recent_memory(memory, request.include_raw));
414 representation
415 .recent
416 .truncate(limits.recent.max(0) as usize);
417 metrics.push(stage_metric_sample(
418 request.namespace_id,
419 "cognition.representation.recent_ms",
420 started.elapsed().as_secs_f64() * 1000.0,
421 "recent",
422 ));
423 }
424
425 if request.include_semantic && limits.semantic > 0 {
426 let started = Instant::now();
427 representation.semantic = self
428 .build_semantic_bucket(request, repo, limits.semantic)
429 .await?;
430 metrics.push(stage_metric_sample(
431 request.namespace_id,
432 "cognition.representation.semantic_ms",
433 started.elapsed().as_secs_f64() * 1000.0,
434 "semantic",
435 ));
436 }
437
438 if let Some(perspective) = request.perspective.as_ref() {
439 if request.include_derived && limits.derived > 0 {
440 let started = Instant::now();
441 let fetch_limit = overfetch_limit(limits.derived, request.max_items);
442 representation.derived = repo
443 .get_most_reinforced_by_perspective_opts(
444 request.namespace_id,
445 perspective,
446 fetch_limit,
447 request.include_raw,
448 )
449 .await
450 .map_err(storage_err)?
451 .into_iter()
452 .filter(include_derived_memory)
453 .collect();
454 representation.derived.truncate(limits.derived as usize);
455 metrics.push(stage_metric_sample(
456 request.namespace_id,
457 "cognition.representation.derived_ms",
458 started.elapsed().as_secs_f64() * 1000.0,
459 "derived",
460 ));
461 }
462
463 if request.include_contradictions && limits.contradictions > 0 {
464 let started = Instant::now();
465 let fetch_limit = overfetch_limit(limits.contradictions, request.max_items);
466 representation.contradictions = repo
467 .get_contradictions_by_perspective_opts(
468 request.namespace_id,
469 perspective,
470 fetch_limit,
471 request.include_raw,
472 )
473 .await
474 .map_err(storage_err)?
475 .into_iter()
476 .filter(include_contradiction_memory)
477 .collect();
478 representation
479 .contradictions
480 .truncate(limits.contradictions as usize);
481 metrics.push(stage_metric_sample(
482 request.namespace_id,
483 "cognition.representation.contradictions_ms",
484 started.elapsed().as_secs_f64() * 1000.0,
485 "contradictions",
486 ));
487 }
488 } else {
489 if request.include_derived && limits.derived > 0 {
490 let started = Instant::now();
491 let fetch_limit = overfetch_limit(limits.derived, request.max_items);
492 representation.derived = repo
493 .get_most_reinforced_by_namespace(
494 request.namespace_id,
495 fetch_limit,
496 request.include_raw,
497 )
498 .await
499 .map_err(storage_err)?
500 .into_iter()
501 .filter(include_derived_memory)
502 .collect();
503 representation.derived.truncate(limits.derived as usize);
504 metrics.push(stage_metric_sample(
505 request.namespace_id,
506 "cognition.representation.derived_ms",
507 started.elapsed().as_secs_f64() * 1000.0,
508 "derived",
509 ));
510 }
511
512 if request.include_contradictions && limits.contradictions > 0 {
513 let started = Instant::now();
514 let fetch_limit = overfetch_limit(limits.contradictions, request.max_items);
515 representation.contradictions = repo
516 .get_contradictions_by_namespace(
517 request.namespace_id,
518 fetch_limit,
519 request.include_raw,
520 )
521 .await
522 .map_err(storage_err)?
523 .into_iter()
524 .filter(include_contradiction_memory)
525 .collect();
526 representation
527 .contradictions
528 .truncate(limits.contradictions as usize);
529 metrics.push(stage_metric_sample(
530 request.namespace_id,
531 "cognition.representation.contradictions_ms",
532 started.elapsed().as_secs_f64() * 1000.0,
533 "contradictions",
534 ));
535 }
536 }
537
538 metrics.push(stage_metric_sample(
539 request.namespace_id,
540 "cognition.representation.total_ms",
541 total_started.elapsed().as_secs_f64() * 1000.0,
542 "total",
543 ));
544 flush_metric_samples(repo, &metrics).await;
545
546 Ok(representation)
547 }
548
549 pub async fn flat_working_set(
550 &self,
551 request: &WorkingRepresentationRequest,
552 repo: &MemoryRepository,
553 ) -> Result<Vec<Memory>, AgentError> {
554 let representation = self.build(request, repo).await?;
555 let flat: Vec<Memory> = flatten_ranked_representation(representation, request)
556 .into_iter()
557 .map(|bucketed| bucketed.memory)
558 .collect();
559
560 if flat.is_empty()
564 && !request.include_raw
565 && !request.include_digests
566 && !request.include_recent
567 && !request.include_semantic
568 && !request.include_derived
569 && !request.include_contradictions
570 {
571 let limit = request.max_items.max(1) as i64;
572 return repo
573 .list_filtered(
574 request.namespace_id,
575 nexus_storage::repository::ListMemoryFilters {
576 category: None,
577 since: None,
578 until: None,
579 content_like: None,
580 include_raw: request.include_raw,
581 limit,
582 offset: 0,
583 },
584 )
585 .await
586 .map_err(|e| AgentError::Storage(e.to_string()));
587 }
588
589 Ok(flat)
590 }
591}
592
593fn storage_err(error: nexus_core::NexusError) -> AgentError {
594 AgentError::Storage(error.to_string())
595}
596
597impl Default for RepresentationService {
598 fn default() -> Self {
599 Self::new()
600 }
601}
602
603#[cfg(test)]
604mod tests {
605 use super::*;
606
607 use nexus_core::traits::EmbeddingService;
608 use nexus_core::{
609 cognitive_level_from_metadata, CognitiveMetadata, MemoryCategory, PerspectiveKey,
610 };
611 use nexus_embeddings::MockEmbeddingService;
612 use nexus_storage::repository::{
613 MemoryRepository, NamespaceRepository, StoreDigestParams, StoreMemoryParams,
614 };
615 use sqlx::sqlite::SqlitePoolOptions;
616
617 async fn setup_repo() -> (MemoryRepository, i64, PerspectiveKey) {
618 let pool = SqlitePoolOptions::new()
619 .max_connections(1)
620 .connect("sqlite::memory:")
621 .await
622 .unwrap();
623 nexus_storage::migrations::run_migrations(&pool)
624 .await
625 .unwrap();
626 let namespace_repo = NamespaceRepository::new(pool.clone());
627 let namespace = namespace_repo
628 .get_or_create("representation-test", "representation-test")
629 .await
630 .unwrap();
631 let perspective =
632 PerspectiveKey::new("claude-code", "claude-code", Some("session-1".to_string()));
633 (MemoryRepository::new(pool), namespace.id, perspective)
634 }
635
636 fn metadata(level: CognitiveLevel, perspective: &PerspectiveKey) -> serde_json::Value {
637 let mut cognitive = CognitiveMetadata::new(
638 level,
639 perspective.observer.clone(),
640 perspective.subject.clone(),
641 perspective.session_key.clone(),
642 "test",
643 );
644 cognitive.confidence = Some(0.9);
645 cognitive.merge_into(&serde_json::json!({}))
646 }
647
648 async fn store_memory(
649 repo: &MemoryRepository,
650 namespace_id: i64,
651 content: &str,
652 level: CognitiveLevel,
653 perspective: &PerspectiveKey,
654 ) -> Memory {
655 repo.store(StoreMemoryParams {
656 namespace_id,
657 content,
658 category: &MemoryCategory::Facts,
659 memory_lane_type: None,
660 labels: &[],
661 metadata: &metadata(level, perspective),
662 embedding: None,
663 embedding_model: None,
664 })
665 .await
666 .unwrap()
667 }
668
669 async fn store_memory_with_embedding(
670 repo: &MemoryRepository,
671 namespace_id: i64,
672 content: &str,
673 level: CognitiveLevel,
674 perspective: &PerspectiveKey,
675 embedding: &[f32],
676 ) -> Memory {
677 repo.store(StoreMemoryParams {
678 namespace_id,
679 content,
680 category: &MemoryCategory::Facts,
681 memory_lane_type: None,
682 labels: &[],
683 metadata: &metadata(level, perspective),
684 embedding: Some(embedding),
685 embedding_model: Some("mock-embedding"),
686 })
687 .await
688 .unwrap()
689 }
690
691 #[tokio::test]
692 async fn test_build_representation_groups_buckets() {
693 let (repo, namespace_id, perspective) = setup_repo().await;
694 let digest = store_memory(
695 &repo,
696 namespace_id,
697 "short digest",
698 CognitiveLevel::SummaryShort,
699 &perspective,
700 )
701 .await;
702 repo.store_digest(StoreDigestParams {
703 namespace_id,
704 session_key: "session-1",
705 digest_kind: "short",
706 memory_id: digest.id,
707 start_memory_id: Some(1),
708 end_memory_id: Some(2),
709 token_count: 32,
710 })
711 .await
712 .unwrap();
713
714 store_memory(
715 &repo,
716 namespace_id,
717 "recent explicit observation",
718 CognitiveLevel::Explicit,
719 &perspective,
720 )
721 .await;
722 store_memory(
723 &repo,
724 namespace_id,
725 "derived insight",
726 CognitiveLevel::Derived,
727 &perspective,
728 )
729 .await;
730 store_memory(
731 &repo,
732 namespace_id,
733 "contradiction note",
734 CognitiveLevel::Contradiction,
735 &perspective,
736 )
737 .await;
738
739 let service = RepresentationService::new();
740 let representation = service
741 .build(
742 &WorkingRepresentationRequest {
743 namespace_id,
744 perspective: Some(perspective),
745 query: Some("recent".to_string()),
746 max_items: 12,
747 include_raw: false,
748 include_recent: true,
749 include_semantic: true,
750 include_derived: true,
751 include_digests: true,
752 include_contradictions: true,
753 ..WorkingRepresentationRequest::default()
754 },
755 &repo,
756 )
757 .await
758 .unwrap();
759
760 assert_eq!(representation.digests.len(), 1);
761 assert_eq!(representation.derived.len(), 1);
762 assert_eq!(representation.contradictions.len(), 1);
763 assert!(!representation.recent.is_empty());
764 assert!(!representation.semantic.is_empty());
765 }
766
767 #[tokio::test]
768 async fn test_flat_working_set_uses_storage_primitive() {
769 let (repo, namespace_id, perspective) = setup_repo().await;
770 store_memory(
771 &repo,
772 namespace_id,
773 "derived insight",
774 CognitiveLevel::Derived,
775 &perspective,
776 )
777 .await;
778 store_memory(
779 &repo,
780 namespace_id,
781 "recent explicit observation",
782 CognitiveLevel::Explicit,
783 &perspective,
784 )
785 .await;
786
787 let service = RepresentationService::new();
788 let flat = service
789 .flat_working_set(
790 &WorkingRepresentationRequest {
791 namespace_id,
792 perspective: Some(perspective),
793 max_items: 4,
794 ..WorkingRepresentationRequest::default()
795 },
796 &repo,
797 )
798 .await
799 .unwrap();
800
801 assert!(!flat.is_empty());
802 assert!(flat.len() <= 4);
803 }
804
805 #[tokio::test]
806 async fn test_build_representation_without_perspective_excludes_raw_noise() {
807 let (repo, namespace_id, perspective) = setup_repo().await;
808 store_memory(
809 &repo,
810 namespace_id,
811 "recent explicit observation",
812 CognitiveLevel::Explicit,
813 &perspective,
814 )
815 .await;
816
817 repo.store(StoreMemoryParams {
818 namespace_id,
819 content: "raw hook payload",
820 category: &MemoryCategory::Session,
821 memory_lane_type: None,
822 labels: &["raw-activity".to_string()],
823 metadata: &serde_json::json!({
824 "raw_activity": true,
825 "cognitive": {
826 "level": "raw",
827 "observer": "claude-code",
828 "subject": "claude-code",
829 "session_key": "session-1",
830 "generated_by": "test"
831 }
832 }),
833 embedding: None,
834 embedding_model: None,
835 })
836 .await
837 .unwrap();
838
839 let representation = RepresentationService::new()
840 .build(
841 &WorkingRepresentationRequest {
842 namespace_id,
843 perspective: None,
844 query: None,
845 max_items: 10,
846 include_raw: false,
847 include_recent: true,
848 include_semantic: false,
849 include_derived: false,
850 include_digests: false,
851 include_contradictions: false,
852 ..WorkingRepresentationRequest::default()
853 },
854 &repo,
855 )
856 .await
857 .unwrap();
858
859 assert_eq!(representation.recent.len(), 1);
860 assert_eq!(
861 representation.recent[0].content,
862 "recent explicit observation"
863 );
864 }
865
866 #[tokio::test]
867 async fn test_build_representation_without_perspective_includes_cognition_outputs() {
868 let (repo, namespace_id, perspective) = setup_repo().await;
869 store_memory(
870 &repo,
871 namespace_id,
872 "derived insight",
873 CognitiveLevel::Derived,
874 &perspective,
875 )
876 .await;
877 store_memory(
878 &repo,
879 namespace_id,
880 "contradiction note",
881 CognitiveLevel::Contradiction,
882 &perspective,
883 )
884 .await;
885
886 let representation = RepresentationService::new()
887 .build(
888 &WorkingRepresentationRequest {
889 namespace_id,
890 perspective: None,
891 query: None,
892 max_items: 10,
893 include_raw: false,
894 include_recent: false,
895 include_semantic: false,
896 include_derived: true,
897 include_digests: false,
898 include_contradictions: true,
899 ..WorkingRepresentationRequest::default()
900 },
901 &repo,
902 )
903 .await
904 .unwrap();
905
906 assert_eq!(representation.derived.len(), 1);
907 assert_eq!(representation.derived[0].content, "derived insight");
908 assert_eq!(representation.contradictions.len(), 1);
909 assert_eq!(
910 representation.contradictions[0].content,
911 "contradiction note"
912 );
913 }
914
915 #[tokio::test]
916 async fn test_build_representation_without_perspective_can_include_raw_noise() {
917 let (repo, namespace_id, perspective) = setup_repo().await;
918 store_memory(
919 &repo,
920 namespace_id,
921 "recent explicit observation",
922 CognitiveLevel::Explicit,
923 &perspective,
924 )
925 .await;
926
927 repo.store(StoreMemoryParams {
928 namespace_id,
929 content: "raw hook payload",
930 category: &MemoryCategory::Session,
931 memory_lane_type: None,
932 labels: &["raw-activity".to_string()],
933 metadata: &serde_json::json!({
934 "raw_activity": true,
935 "cognitive": {
936 "level": "raw",
937 "observer": "claude-code",
938 "subject": "claude-code",
939 "session_key": "session-1",
940 "generated_by": "test"
941 }
942 }),
943 embedding: None,
944 embedding_model: None,
945 })
946 .await
947 .unwrap();
948
949 let representation = RepresentationService::new()
950 .build(
951 &WorkingRepresentationRequest {
952 namespace_id,
953 perspective: None,
954 query: None,
955 max_items: 10,
956 include_raw: true,
957 include_recent: true,
958 include_semantic: false,
959 include_derived: false,
960 include_digests: false,
961 include_contradictions: false,
962 ..WorkingRepresentationRequest::default()
963 },
964 &repo,
965 )
966 .await
967 .unwrap();
968
969 assert_eq!(representation.recent.len(), 2);
970 assert!(representation
971 .recent
972 .iter()
973 .any(|memory| memory.content == "raw hook payload"));
974 }
975
976 #[test]
977 fn test_bucket_limits_match_locked_default_allocation() {
978 let limits = bucket_limits(24);
979 assert_eq!(limits.digests, 2);
980 assert_eq!(limits.contradictions, 2);
981 assert_eq!(limits.derived, 6);
982 assert_eq!(limits.semantic, 7);
983 assert_eq!(limits.recent, 7);
984 }
985
986 #[test]
987 fn test_bucket_limits_preserve_recent_slot_for_tiny_requests() {
988 let four = bucket_limits(4);
989 assert!(four.recent >= 1);
990
991 let single = bucket_limits(1);
992 assert_eq!(single.recent, 1);
993 }
994
995 #[tokio::test]
996 async fn test_build_representation_filters_below_confidence_thresholds() {
997 let (repo, namespace_id, perspective) = setup_repo().await;
998
999 let mut low_derived = metadata(CognitiveLevel::Derived, &perspective);
1000 low_derived["cognitive"]["confidence"] = serde_json::json!(0.60);
1001 repo.store(StoreMemoryParams {
1002 namespace_id,
1003 content: "low confidence derived",
1004 category: &MemoryCategory::Facts,
1005 memory_lane_type: None,
1006 labels: &[],
1007 metadata: &low_derived,
1008 embedding: None,
1009 embedding_model: None,
1010 })
1011 .await
1012 .unwrap();
1013
1014 let mut high_derived = metadata(CognitiveLevel::Derived, &perspective);
1015 high_derived["cognitive"]["confidence"] = serde_json::json!(0.90);
1016 repo.store(StoreMemoryParams {
1017 namespace_id,
1018 content: "high confidence derived",
1019 category: &MemoryCategory::Facts,
1020 memory_lane_type: None,
1021 labels: &[],
1022 metadata: &high_derived,
1023 embedding: None,
1024 embedding_model: None,
1025 })
1026 .await
1027 .unwrap();
1028
1029 let representation = RepresentationService::new()
1030 .build(
1031 &WorkingRepresentationRequest {
1032 namespace_id,
1033 perspective: Some(perspective),
1034 query: None,
1035 max_items: 10,
1036 include_raw: false,
1037 include_recent: false,
1038 include_semantic: false,
1039 include_derived: true,
1040 include_digests: false,
1041 include_contradictions: false,
1042 ..WorkingRepresentationRequest::default()
1043 },
1044 &repo,
1045 )
1046 .await
1047 .unwrap();
1048
1049 assert_eq!(representation.derived.len(), 1);
1050 assert_eq!(representation.derived[0].content, "high confidence derived");
1051 }
1052
1053 #[tokio::test]
1054 async fn test_build_representation_prefers_vector_semantic_matches() {
1055 let (repo, namespace_id, perspective) = setup_repo().await;
1056 let embedder = Arc::new(MockEmbeddingService::new());
1057 let semantic_query = "provider switch rollout";
1058 let vector = embedder.embed(semantic_query).await.unwrap();
1059
1060 store_memory_with_embedding(
1061 &repo,
1062 namespace_id,
1063 "migration timeline digest summary",
1064 CognitiveLevel::Derived,
1065 &perspective,
1066 &vector,
1067 )
1068 .await;
1069 store_memory(
1070 &repo,
1071 namespace_id,
1072 "provider switch rollout plain text fallback",
1073 CognitiveLevel::Explicit,
1074 &perspective,
1075 )
1076 .await;
1077
1078 let representation = RepresentationService::with_embedder(embedder)
1079 .build(
1080 &WorkingRepresentationRequest {
1081 namespace_id,
1082 perspective: Some(perspective),
1083 query: Some(semantic_query.to_string()),
1084 max_items: 12,
1085 include_raw: false,
1086 include_recent: false,
1087 include_semantic: true,
1088 include_derived: false,
1089 include_digests: false,
1090 include_contradictions: false,
1091 ..WorkingRepresentationRequest::default()
1092 },
1093 &repo,
1094 )
1095 .await
1096 .unwrap();
1097
1098 assert_eq!(representation.semantic.len(), 2);
1099 assert_eq!(
1100 representation.semantic[0].content,
1101 "migration timeline digest summary"
1102 );
1103 assert_eq!(
1104 representation.semantic[1].content,
1105 "provider switch rollout plain text fallback"
1106 );
1107 }
1108
1109 #[tokio::test]
1110 async fn test_build_representation_text_fallback_overfetches_after_filtering() {
1111 let (repo, namespace_id, perspective) = setup_repo().await;
1112 let embedder = Arc::new(MockEmbeddingService::new());
1113 let query = "provider switch rollout";
1114 let vector = embedder.embed(query).await.unwrap();
1115
1116 store_memory_with_embedding(
1117 &repo,
1118 namespace_id,
1119 "vector-only semantic memory",
1120 CognitiveLevel::Derived,
1121 &perspective,
1122 &vector,
1123 )
1124 .await;
1125
1126 let mut low_derived = metadata(CognitiveLevel::Derived, &perspective);
1127 low_derived["cognitive"]["confidence"] = serde_json::json!(0.60);
1128 repo.store(StoreMemoryParams {
1129 namespace_id,
1130 content: "provider switch rollout low confidence derived",
1131 category: &MemoryCategory::Facts,
1132 memory_lane_type: None,
1133 labels: &[],
1134 metadata: &low_derived,
1135 embedding: None,
1136 embedding_model: None,
1137 })
1138 .await
1139 .unwrap();
1140
1141 let mut low_explicit = metadata(CognitiveLevel::Explicit, &perspective);
1142 low_explicit["cognitive"]["confidence"] = serde_json::json!(0.60);
1143 repo.store(StoreMemoryParams {
1144 namespace_id,
1145 content: "provider switch rollout low confidence explicit",
1146 category: &MemoryCategory::Facts,
1147 memory_lane_type: None,
1148 labels: &[],
1149 metadata: &low_explicit,
1150 embedding: None,
1151 embedding_model: None,
1152 })
1153 .await
1154 .unwrap();
1155
1156 store_memory(
1157 &repo,
1158 namespace_id,
1159 "provider switch rollout strong explicit",
1160 CognitiveLevel::Explicit,
1161 &perspective,
1162 )
1163 .await;
1164
1165 let text_hits = repo
1166 .search_by_text_memories(namespace_id, query, 4, false)
1167 .await
1168 .unwrap();
1169 assert_eq!(text_hits.len(), 3);
1170
1171 let representation = RepresentationService::with_embedder(embedder)
1172 .build(
1173 &WorkingRepresentationRequest {
1174 namespace_id,
1175 perspective: Some(perspective),
1176 query: Some(query.to_string()),
1177 max_items: 6,
1178 include_raw: false,
1179 include_recent: false,
1180 include_semantic: true,
1181 include_derived: false,
1182 include_digests: false,
1183 include_contradictions: false,
1184 ..WorkingRepresentationRequest::default()
1185 },
1186 &repo,
1187 )
1188 .await
1189 .unwrap();
1190
1191 assert_eq!(representation.semantic.len(), 2);
1192 assert_eq!(
1193 representation.semantic[0].content,
1194 "vector-only semantic memory"
1195 );
1196 assert_eq!(
1197 representation.semantic[1].content,
1198 "provider switch rollout strong explicit"
1199 );
1200 assert_eq!(
1201 cognitive_level_from_metadata(&representation.semantic[1].metadata),
1202 CognitiveLevel::Explicit
1203 );
1204 }
1205}