Skip to main content

nexus_memory_agent/
representation.rs

1//! Representation service - assembles bounded working-memory context.
2
3use std::collections::{HashMap, HashSet};
4use std::sync::Arc;
5use std::time::Instant;
6
7use nexus_core::{
8    config::Config, traits::EmbeddingService, CognitiveLevel, Memory, WorkingRepresentation,
9    WorkingRepresentationRequest,
10};
11use nexus_storage::repository::{MemoryRepository, SemanticCandidateParams};
12use nexus_vectors::{SearchOptions, SemanticSearch, VectorEntry};
13use tokio::sync::OnceCell;
14use tracing::{debug, warn};
15
16use crate::error::AgentError;
17use crate::ranking::flatten_ranked_representation;
18use crate::util::{flush_metric_samples, stage_metric_sample, CognitionSnapshot};
19
20const SEMANTIC_VECTOR_OVERFETCH_MULTIPLIER: usize = 8;
21const SEMANTIC_VECTOR_THRESHOLD: f32 = 0.58;
22
23enum EmbedderProvider {
24    Disabled,
25    Static(Arc<dyn EmbeddingService>),
26    Auto(Arc<OnceCell<Option<Arc<dyn EmbeddingService>>>>),
27}
28
29pub struct RepresentationService {
30    embedder: EmbedderProvider,
31}
32
33impl EmbedderProvider {
34    async fn resolve(&self) -> Option<Arc<dyn EmbeddingService>> {
35        match self {
36            Self::Disabled => None,
37            Self::Static(embedder) => Some(embedder.clone()),
38            Self::Auto(cell) => {
39                let embedder = cell
40                    .get_or_init(|| async {
41                        let config = match Config::from_env() {
42                            Ok(config) => config,
43                            Err(error) => {
44                                warn!(
45                                    error = %error,
46                                    "Failed to load Nexus config for semantic embeddings; semantic retrieval will fall back to text"
47                                );
48                                return None;
49                            }
50                        };
51                        match nexus_embeddings::create_service(&config).await {
52                            Ok(Some(service)) => Some(service),
53                            Ok(None) => None,
54                            Err(error) => {
55                                warn!(
56                                    error = %error,
57                                    "Failed to initialize embedding service; semantic retrieval will fall back to text. Configure a remote embedding provider, local OpenAI-compatible runtime, or set NEXUS_EMBEDDINGS_ENABLED=false"
58                                );
59                                None
60                            }
61                        }
62                    })
63                    .await;
64                embedder.clone()
65            }
66        }
67    }
68}
69
70#[derive(Debug, Clone, Copy)]
71struct BucketLimits {
72    digests: i64,
73    contradictions: i64,
74    derived: i64,
75    semantic: i64,
76    recent: i64,
77}
78
79fn bucket_limits(max_items: usize) -> BucketLimits {
80    let max_items = max_items.max(1);
81    let mut digests = max_items.min(2);
82    let mut contradictions = max_items.saturating_div(10).min(2);
83    let remaining_after_fixed = max_items.saturating_sub(digests + contradictions);
84
85    let mut derived_target = if max_items >= 16 {
86        remaining_after_fixed.min(max_items.saturating_div(4).max(4))
87    } else {
88        remaining_after_fixed.min(max_items.saturating_div(4).max(1))
89    };
90    let remaining_after_derived = remaining_after_fixed.saturating_sub(derived_target);
91
92    let mut semantic_target = if max_items >= 16 {
93        remaining_after_derived.min(((max_items * 3) / 10).max(4))
94    } else {
95        remaining_after_derived.min(max_items.saturating_div(3).max(1))
96    };
97    let mut recent =
98        max_items.saturating_sub(digests + contradictions + derived_target + semantic_target);
99
100    if recent == 0 && max_items > 0 {
101        if semantic_target > 0 {
102            semantic_target = semantic_target.saturating_sub(1);
103        } else if derived_target > 0 {
104            derived_target = derived_target.saturating_sub(1);
105        } else if digests > 0 {
106            digests = digests.saturating_sub(1);
107        } else if contradictions > 0 {
108            contradictions = contradictions.saturating_sub(1);
109        }
110        recent = 1;
111    }
112
113    BucketLimits {
114        digests: digests as i64,
115        contradictions: contradictions as i64,
116        derived: derived_target as i64,
117        semantic: semantic_target as i64,
118        recent: recent as i64,
119    }
120}
121
122fn overfetch_limit(bucket_limit: i64, max_items: usize) -> i64 {
123    let bucket_limit = bucket_limit.max(1) as usize;
124    max_items.max(bucket_limit * 3) as i64
125}
126
127fn include_recent_memory(memory: &Memory, include_raw: bool) -> bool {
128    let snapshot = CognitionSnapshot::from_memory(memory);
129    match snapshot.level {
130        CognitiveLevel::Raw => include_raw,
131        CognitiveLevel::Explicit => snapshot.confidence_meets_threshold(),
132        CognitiveLevel::Derived
133        | CognitiveLevel::Contradiction
134        | CognitiveLevel::SummaryShort
135        | CognitiveLevel::SummaryLong => false,
136    }
137}
138
139fn include_semantic_memory(memory: &Memory, include_raw: bool) -> bool {
140    let snapshot = CognitionSnapshot::from_memory(memory);
141    match snapshot.level {
142        CognitiveLevel::Raw => include_raw,
143        CognitiveLevel::SummaryShort | CognitiveLevel::SummaryLong => false,
144        CognitiveLevel::Explicit | CognitiveLevel::Derived | CognitiveLevel::Contradiction => {
145            snapshot.confidence_meets_threshold()
146        }
147    }
148}
149
150fn include_derived_memory(memory: &Memory) -> bool {
151    let snapshot = CognitionSnapshot::from_memory(memory);
152    snapshot.level == CognitiveLevel::Derived && snapshot.confidence_meets_threshold()
153}
154
155fn include_contradiction_memory(memory: &Memory) -> bool {
156    let snapshot = CognitionSnapshot::from_memory(memory);
157    snapshot.level == CognitiveLevel::Contradiction && snapshot.confidence_meets_threshold()
158}
159
160impl RepresentationService {
161    pub fn new() -> Self {
162        let auto_embedder = Config::from_env()
163            .map(|config| config.embedding.enabled)
164            .unwrap_or(false);
165
166        let embedder = if auto_embedder {
167            EmbedderProvider::Auto(Arc::new(OnceCell::new()))
168        } else {
169            EmbedderProvider::Disabled
170        };
171
172        Self { embedder }
173    }
174
175    pub fn without_embedder() -> Self {
176        Self {
177            embedder: EmbedderProvider::Disabled,
178        }
179    }
180
181    pub fn with_embedder(embedder: Arc<dyn EmbeddingService>) -> Self {
182        Self {
183            embedder: EmbedderProvider::Static(embedder),
184        }
185    }
186
187    async fn build_semantic_bucket(
188        &self,
189        request: &WorkingRepresentationRequest,
190        repo: &MemoryRepository,
191        limit: i64,
192    ) -> Result<Vec<Memory>, AgentError> {
193        let Some(query) = request.query.as_deref() else {
194            return Ok(Vec::new());
195        };
196
197        let semantic_limit = limit.max(1) as usize;
198        let vector_fetch_limit = (overfetch_limit(limit, request.max_items) as usize
199            * SEMANTIC_VECTOR_OVERFETCH_MULTIPLIER) as i64;
200
201        let mut ranked = Vec::new();
202        let mut seen = HashSet::new();
203
204        if let Some(embedder) = self.embedder.resolve().await {
205            match embedder.embed(query).await {
206                Ok(query_embedding) => {
207                    let candidates = repo
208                        .get_semantic_candidates(SemanticCandidateParams {
209                            namespace_id: request.namespace_id,
210                            perspective: request.perspective.as_ref(),
211                            limit: vector_fetch_limit,
212                            include_raw: request.include_raw,
213                        })
214                        .await
215                        .map_err(storage_err)?;
216
217                    let mut by_id = HashMap::new();
218                    let vectors: Vec<VectorEntry> = candidates
219                        .into_iter()
220                        .filter(|memory| include_semantic_memory(memory, request.include_raw))
221                        .filter_map(|memory| {
222                            let embedding = memory.content_embedding.clone()?;
223                            by_id.insert(memory.id, memory.clone());
224                            Some(
225                                VectorEntry::new(
226                                    memory.id,
227                                    embedding,
228                                    memory.category.to_string(),
229                                    memory.namespace_id,
230                                )
231                                .with_memory_lane_type(
232                                    memory
233                                        .memory_lane_type
234                                        .as_ref()
235                                        .map(|lane| lane.to_string())
236                                        .unwrap_or_else(|| "none".to_string()),
237                                ),
238                            )
239                        })
240                        .collect();
241
242                    if !vectors.is_empty() {
243                        let search = SemanticSearch::new();
244                        let options = SearchOptions::with_limit(semantic_limit)
245                            .with_namespace(request.namespace_id)
246                            .with_threshold(SEMANTIC_VECTOR_THRESHOLD);
247
248                        match search.search(&query_embedding, &vectors, &options) {
249                            Ok((results, _latency)) => {
250                                for result in results {
251                                    if let Some(memory) = by_id.remove(&result.id) {
252                                        seen.insert(memory.id);
253                                        ranked.push(memory);
254                                    }
255                                }
256                            }
257                            Err(error) => {
258                                warn!(error = %error, "Vector semantic search failed; falling back to text search");
259                            }
260                        }
261                    }
262                }
263                Err(error) => {
264                    warn!(error = %error, "Query embedding failed; falling back to text search");
265                }
266            }
267        }
268
269        if ranked.len() < semantic_limit {
270            let top_up_limit = overfetch_limit(limit, request.max_items) as i32;
271            let fallback = repo
272                .search_by_text_memories(
273                    request.namespace_id,
274                    query,
275                    top_up_limit,
276                    request.include_raw,
277                )
278                .await
279                .map_err(storage_err)?;
280
281            for memory in fallback {
282                if include_semantic_memory(&memory, request.include_raw) && seen.insert(memory.id) {
283                    ranked.push(memory);
284                    if ranked.len() >= semantic_limit {
285                        break;
286                    }
287                }
288            }
289        }
290
291        ranked.truncate(semantic_limit);
292        Ok(ranked)
293    }
294
295    pub async fn build(
296        &self,
297        request: &WorkingRepresentationRequest,
298        repo: &MemoryRepository,
299    ) -> Result<WorkingRepresentation, AgentError> {
300        let total_started = Instant::now();
301        let limits = bucket_limits(request.max_items);
302        let mut representation = WorkingRepresentation::default();
303        let mut metrics = Vec::new();
304
305        if request.include_digests && limits.digests > 0 {
306            let started = Instant::now();
307            if let Some(perspective) = request.perspective.as_ref() {
308                if let Some(session_key) = perspective.session_key.as_deref() {
309                    if let Some(short) = repo
310                        .latest_digest_for_session(request.namespace_id, session_key, "short")
311                        .await
312                        .map_err(storage_err)?
313                    {
314                        representation.digests.push(short);
315                    }
316                    if (representation.digests.len() as i64) < limits.digests {
317                        if let Some(long) = repo
318                            .latest_digest_for_session(request.namespace_id, session_key, "long")
319                            .await
320                            .map_err(storage_err)?
321                        {
322                            representation.digests.push(long);
323                        }
324                    }
325                }
326            }
327            metrics.push(stage_metric_sample(
328                request.namespace_id,
329                "cognition.representation.digests_ms",
330                started.elapsed().as_secs_f64() * 1000.0,
331                "digests",
332            ));
333        }
334
335        // Cross-namespace digests: fetch bounded digests from related namespaces
336        // (e.g. alias namespaces for the same canonical agent).
337        if request.include_digests
338            && !request.cross_namespace_ids.is_empty()
339            && (representation.digests.len() as i64) < limits.digests
340        {
341            let started = Instant::now();
342            for &cross_ns_id in &request.cross_namespace_ids {
343                if cross_ns_id == request.namespace_id {
344                    continue;
345                }
346                if representation.digests.len() as i64 >= limits.digests {
347                    break;
348                }
349                let cross_digest = if let Some(session_key) = request
350                    .perspective
351                    .as_ref()
352                    .and_then(|p| p.session_key.as_deref())
353                {
354                    repo.latest_digest_for_session(cross_ns_id, session_key, "short")
355                        .await
356                        .ok()
357                        .flatten()
358                } else {
359                    repo.latest_digest_for_namespace(cross_ns_id, "short")
360                        .await
361                        .ok()
362                        .flatten()
363                };
364
365                if let Some(digest) = cross_digest {
366                    representation.digests.push(digest);
367                }
368                // Only take one cross-namespace digest per namespace to stay bounded.
369            }
370            debug!(
371                cross_namespaces = request.cross_namespace_ids.len(),
372                cross_digests_added = representation.digests.len(),
373                "Cross-namespace digest fetch complete"
374            );
375            metrics.push(stage_metric_sample(
376                request.namespace_id,
377                "cognition.representation.cross_namespace_ms",
378                started.elapsed().as_secs_f64() * 1000.0,
379                "cross_namespace",
380            ));
381        }
382
383        if request.include_recent {
384            let started = Instant::now();
385            let fetch_limit = overfetch_limit(limits.recent, request.max_items);
386            representation.recent = if let Some(perspective) = request.perspective.as_ref() {
387                repo.get_recent_by_perspective_opts(
388                    request.namespace_id,
389                    perspective,
390                    fetch_limit,
391                    request.include_raw,
392                )
393                .await
394                .map_err(storage_err)?
395            } else {
396                repo.list_filtered(
397                    request.namespace_id,
398                    nexus_storage::repository::ListMemoryFilters {
399                        category: None,
400                        since: None,
401                        until: None,
402                        content_like: None,
403                        include_raw: request.include_raw,
404                        limit: fetch_limit,
405                        offset: 0,
406                    },
407                )
408                .await
409                .map_err(storage_err)?
410            };
411            representation
412                .recent
413                .retain(|memory| include_recent_memory(memory, request.include_raw));
414            representation
415                .recent
416                .truncate(limits.recent.max(0) as usize);
417            metrics.push(stage_metric_sample(
418                request.namespace_id,
419                "cognition.representation.recent_ms",
420                started.elapsed().as_secs_f64() * 1000.0,
421                "recent",
422            ));
423        }
424
425        if request.include_semantic && limits.semantic > 0 {
426            let started = Instant::now();
427            representation.semantic = self
428                .build_semantic_bucket(request, repo, limits.semantic)
429                .await?;
430            metrics.push(stage_metric_sample(
431                request.namespace_id,
432                "cognition.representation.semantic_ms",
433                started.elapsed().as_secs_f64() * 1000.0,
434                "semantic",
435            ));
436        }
437
438        if let Some(perspective) = request.perspective.as_ref() {
439            if request.include_derived && limits.derived > 0 {
440                let started = Instant::now();
441                let fetch_limit = overfetch_limit(limits.derived, request.max_items);
442                representation.derived = repo
443                    .get_most_reinforced_by_perspective_opts(
444                        request.namespace_id,
445                        perspective,
446                        fetch_limit,
447                        request.include_raw,
448                    )
449                    .await
450                    .map_err(storage_err)?
451                    .into_iter()
452                    .filter(include_derived_memory)
453                    .collect();
454                representation.derived.truncate(limits.derived as usize);
455                metrics.push(stage_metric_sample(
456                    request.namespace_id,
457                    "cognition.representation.derived_ms",
458                    started.elapsed().as_secs_f64() * 1000.0,
459                    "derived",
460                ));
461            }
462
463            if request.include_contradictions && limits.contradictions > 0 {
464                let started = Instant::now();
465                let fetch_limit = overfetch_limit(limits.contradictions, request.max_items);
466                representation.contradictions = repo
467                    .get_contradictions_by_perspective_opts(
468                        request.namespace_id,
469                        perspective,
470                        fetch_limit,
471                        request.include_raw,
472                    )
473                    .await
474                    .map_err(storage_err)?
475                    .into_iter()
476                    .filter(include_contradiction_memory)
477                    .collect();
478                representation
479                    .contradictions
480                    .truncate(limits.contradictions as usize);
481                metrics.push(stage_metric_sample(
482                    request.namespace_id,
483                    "cognition.representation.contradictions_ms",
484                    started.elapsed().as_secs_f64() * 1000.0,
485                    "contradictions",
486                ));
487            }
488        } else {
489            if request.include_derived && limits.derived > 0 {
490                let started = Instant::now();
491                let fetch_limit = overfetch_limit(limits.derived, request.max_items);
492                representation.derived = repo
493                    .get_most_reinforced_by_namespace(
494                        request.namespace_id,
495                        fetch_limit,
496                        request.include_raw,
497                    )
498                    .await
499                    .map_err(storage_err)?
500                    .into_iter()
501                    .filter(include_derived_memory)
502                    .collect();
503                representation.derived.truncate(limits.derived as usize);
504                metrics.push(stage_metric_sample(
505                    request.namespace_id,
506                    "cognition.representation.derived_ms",
507                    started.elapsed().as_secs_f64() * 1000.0,
508                    "derived",
509                ));
510            }
511
512            if request.include_contradictions && limits.contradictions > 0 {
513                let started = Instant::now();
514                let fetch_limit = overfetch_limit(limits.contradictions, request.max_items);
515                representation.contradictions = repo
516                    .get_contradictions_by_namespace(
517                        request.namespace_id,
518                        fetch_limit,
519                        request.include_raw,
520                    )
521                    .await
522                    .map_err(storage_err)?
523                    .into_iter()
524                    .filter(include_contradiction_memory)
525                    .collect();
526                representation
527                    .contradictions
528                    .truncate(limits.contradictions as usize);
529                metrics.push(stage_metric_sample(
530                    request.namespace_id,
531                    "cognition.representation.contradictions_ms",
532                    started.elapsed().as_secs_f64() * 1000.0,
533                    "contradictions",
534                ));
535            }
536        }
537
538        metrics.push(stage_metric_sample(
539            request.namespace_id,
540            "cognition.representation.total_ms",
541            total_started.elapsed().as_secs_f64() * 1000.0,
542            "total",
543        ));
544        flush_metric_samples(repo, &metrics).await;
545
546        Ok(representation)
547    }
548
549    pub async fn flat_working_set(
550        &self,
551        request: &WorkingRepresentationRequest,
552        repo: &MemoryRepository,
553    ) -> Result<Vec<Memory>, AgentError> {
554        let representation = self.build(request, repo).await?;
555        let flat: Vec<Memory> = flatten_ranked_representation(representation, request)
556            .into_iter()
557            .map(|bucketed| bucketed.memory)
558            .collect();
559
560        // Fall back to storage primitive when the representation yields nothing.
561        // This covers cases where include_* flags are all false or perspective
562        // queries return empty but data exists in the namespace.
563        if flat.is_empty()
564            && !request.include_raw
565            && !request.include_digests
566            && !request.include_recent
567            && !request.include_semantic
568            && !request.include_derived
569            && !request.include_contradictions
570        {
571            let limit = request.max_items.max(1) as i64;
572            return repo
573                .list_filtered(
574                    request.namespace_id,
575                    nexus_storage::repository::ListMemoryFilters {
576                        category: None,
577                        since: None,
578                        until: None,
579                        content_like: None,
580                        include_raw: request.include_raw,
581                        limit,
582                        offset: 0,
583                    },
584                )
585                .await
586                .map_err(|e| AgentError::Storage(e.to_string()));
587        }
588
589        Ok(flat)
590    }
591}
592
593fn storage_err(error: nexus_core::NexusError) -> AgentError {
594    AgentError::Storage(error.to_string())
595}
596
597impl Default for RepresentationService {
598    fn default() -> Self {
599        Self::new()
600    }
601}
602
603#[cfg(test)]
604mod tests {
605    use super::*;
606
607    use nexus_core::traits::EmbeddingService;
608    use nexus_core::{
609        cognitive_level_from_metadata, CognitiveMetadata, MemoryCategory, PerspectiveKey,
610    };
611    use nexus_embeddings::MockEmbeddingService;
612    use nexus_storage::repository::{
613        MemoryRepository, NamespaceRepository, StoreDigestParams, StoreMemoryParams,
614    };
615    use sqlx::sqlite::SqlitePoolOptions;
616
617    async fn setup_repo() -> (MemoryRepository, i64, PerspectiveKey) {
618        let pool = SqlitePoolOptions::new()
619            .max_connections(1)
620            .connect("sqlite::memory:")
621            .await
622            .unwrap();
623        nexus_storage::migrations::run_migrations(&pool)
624            .await
625            .unwrap();
626        let namespace_repo = NamespaceRepository::new(pool.clone());
627        let namespace = namespace_repo
628            .get_or_create("representation-test", "representation-test")
629            .await
630            .unwrap();
631        let perspective =
632            PerspectiveKey::new("claude-code", "claude-code", Some("session-1".to_string()));
633        (MemoryRepository::new(pool), namespace.id, perspective)
634    }
635
636    fn metadata(level: CognitiveLevel, perspective: &PerspectiveKey) -> serde_json::Value {
637        let mut cognitive = CognitiveMetadata::new(
638            level,
639            perspective.observer.clone(),
640            perspective.subject.clone(),
641            perspective.session_key.clone(),
642            "test",
643        );
644        cognitive.confidence = Some(0.9);
645        cognitive.merge_into(&serde_json::json!({}))
646    }
647
648    async fn store_memory(
649        repo: &MemoryRepository,
650        namespace_id: i64,
651        content: &str,
652        level: CognitiveLevel,
653        perspective: &PerspectiveKey,
654    ) -> Memory {
655        repo.store(StoreMemoryParams {
656            namespace_id,
657            content,
658            category: &MemoryCategory::Facts,
659            memory_lane_type: None,
660            labels: &[],
661            metadata: &metadata(level, perspective),
662            embedding: None,
663            embedding_model: None,
664        })
665        .await
666        .unwrap()
667    }
668
669    async fn store_memory_with_embedding(
670        repo: &MemoryRepository,
671        namespace_id: i64,
672        content: &str,
673        level: CognitiveLevel,
674        perspective: &PerspectiveKey,
675        embedding: &[f32],
676    ) -> Memory {
677        repo.store(StoreMemoryParams {
678            namespace_id,
679            content,
680            category: &MemoryCategory::Facts,
681            memory_lane_type: None,
682            labels: &[],
683            metadata: &metadata(level, perspective),
684            embedding: Some(embedding),
685            embedding_model: Some("mock-embedding"),
686        })
687        .await
688        .unwrap()
689    }
690
691    #[tokio::test]
692    async fn test_build_representation_groups_buckets() {
693        let (repo, namespace_id, perspective) = setup_repo().await;
694        let digest = store_memory(
695            &repo,
696            namespace_id,
697            "short digest",
698            CognitiveLevel::SummaryShort,
699            &perspective,
700        )
701        .await;
702        repo.store_digest(StoreDigestParams {
703            namespace_id,
704            session_key: "session-1",
705            digest_kind: "short",
706            memory_id: digest.id,
707            start_memory_id: Some(1),
708            end_memory_id: Some(2),
709            token_count: 32,
710        })
711        .await
712        .unwrap();
713
714        store_memory(
715            &repo,
716            namespace_id,
717            "recent explicit observation",
718            CognitiveLevel::Explicit,
719            &perspective,
720        )
721        .await;
722        store_memory(
723            &repo,
724            namespace_id,
725            "derived insight",
726            CognitiveLevel::Derived,
727            &perspective,
728        )
729        .await;
730        store_memory(
731            &repo,
732            namespace_id,
733            "contradiction note",
734            CognitiveLevel::Contradiction,
735            &perspective,
736        )
737        .await;
738
739        let service = RepresentationService::new();
740        let representation = service
741            .build(
742                &WorkingRepresentationRequest {
743                    namespace_id,
744                    perspective: Some(perspective),
745                    query: Some("recent".to_string()),
746                    max_items: 12,
747                    include_raw: false,
748                    include_recent: true,
749                    include_semantic: true,
750                    include_derived: true,
751                    include_digests: true,
752                    include_contradictions: true,
753                    ..WorkingRepresentationRequest::default()
754                },
755                &repo,
756            )
757            .await
758            .unwrap();
759
760        assert_eq!(representation.digests.len(), 1);
761        assert_eq!(representation.derived.len(), 1);
762        assert_eq!(representation.contradictions.len(), 1);
763        assert!(!representation.recent.is_empty());
764        assert!(!representation.semantic.is_empty());
765    }
766
767    #[tokio::test]
768    async fn test_flat_working_set_uses_storage_primitive() {
769        let (repo, namespace_id, perspective) = setup_repo().await;
770        store_memory(
771            &repo,
772            namespace_id,
773            "derived insight",
774            CognitiveLevel::Derived,
775            &perspective,
776        )
777        .await;
778        store_memory(
779            &repo,
780            namespace_id,
781            "recent explicit observation",
782            CognitiveLevel::Explicit,
783            &perspective,
784        )
785        .await;
786
787        let service = RepresentationService::new();
788        let flat = service
789            .flat_working_set(
790                &WorkingRepresentationRequest {
791                    namespace_id,
792                    perspective: Some(perspective),
793                    max_items: 4,
794                    ..WorkingRepresentationRequest::default()
795                },
796                &repo,
797            )
798            .await
799            .unwrap();
800
801        assert!(!flat.is_empty());
802        assert!(flat.len() <= 4);
803    }
804
805    #[tokio::test]
806    async fn test_build_representation_without_perspective_excludes_raw_noise() {
807        let (repo, namespace_id, perspective) = setup_repo().await;
808        store_memory(
809            &repo,
810            namespace_id,
811            "recent explicit observation",
812            CognitiveLevel::Explicit,
813            &perspective,
814        )
815        .await;
816
817        repo.store(StoreMemoryParams {
818            namespace_id,
819            content: "raw hook payload",
820            category: &MemoryCategory::Session,
821            memory_lane_type: None,
822            labels: &["raw-activity".to_string()],
823            metadata: &serde_json::json!({
824                "raw_activity": true,
825                "cognitive": {
826                    "level": "raw",
827                    "observer": "claude-code",
828                    "subject": "claude-code",
829                    "session_key": "session-1",
830                    "generated_by": "test"
831                }
832            }),
833            embedding: None,
834            embedding_model: None,
835        })
836        .await
837        .unwrap();
838
839        let representation = RepresentationService::new()
840            .build(
841                &WorkingRepresentationRequest {
842                    namespace_id,
843                    perspective: None,
844                    query: None,
845                    max_items: 10,
846                    include_raw: false,
847                    include_recent: true,
848                    include_semantic: false,
849                    include_derived: false,
850                    include_digests: false,
851                    include_contradictions: false,
852                    ..WorkingRepresentationRequest::default()
853                },
854                &repo,
855            )
856            .await
857            .unwrap();
858
859        assert_eq!(representation.recent.len(), 1);
860        assert_eq!(
861            representation.recent[0].content,
862            "recent explicit observation"
863        );
864    }
865
866    #[tokio::test]
867    async fn test_build_representation_without_perspective_includes_cognition_outputs() {
868        let (repo, namespace_id, perspective) = setup_repo().await;
869        store_memory(
870            &repo,
871            namespace_id,
872            "derived insight",
873            CognitiveLevel::Derived,
874            &perspective,
875        )
876        .await;
877        store_memory(
878            &repo,
879            namespace_id,
880            "contradiction note",
881            CognitiveLevel::Contradiction,
882            &perspective,
883        )
884        .await;
885
886        let representation = RepresentationService::new()
887            .build(
888                &WorkingRepresentationRequest {
889                    namespace_id,
890                    perspective: None,
891                    query: None,
892                    max_items: 10,
893                    include_raw: false,
894                    include_recent: false,
895                    include_semantic: false,
896                    include_derived: true,
897                    include_digests: false,
898                    include_contradictions: true,
899                    ..WorkingRepresentationRequest::default()
900                },
901                &repo,
902            )
903            .await
904            .unwrap();
905
906        assert_eq!(representation.derived.len(), 1);
907        assert_eq!(representation.derived[0].content, "derived insight");
908        assert_eq!(representation.contradictions.len(), 1);
909        assert_eq!(
910            representation.contradictions[0].content,
911            "contradiction note"
912        );
913    }
914
915    #[tokio::test]
916    async fn test_build_representation_without_perspective_can_include_raw_noise() {
917        let (repo, namespace_id, perspective) = setup_repo().await;
918        store_memory(
919            &repo,
920            namespace_id,
921            "recent explicit observation",
922            CognitiveLevel::Explicit,
923            &perspective,
924        )
925        .await;
926
927        repo.store(StoreMemoryParams {
928            namespace_id,
929            content: "raw hook payload",
930            category: &MemoryCategory::Session,
931            memory_lane_type: None,
932            labels: &["raw-activity".to_string()],
933            metadata: &serde_json::json!({
934                "raw_activity": true,
935                "cognitive": {
936                    "level": "raw",
937                    "observer": "claude-code",
938                    "subject": "claude-code",
939                    "session_key": "session-1",
940                    "generated_by": "test"
941                }
942            }),
943            embedding: None,
944            embedding_model: None,
945        })
946        .await
947        .unwrap();
948
949        let representation = RepresentationService::new()
950            .build(
951                &WorkingRepresentationRequest {
952                    namespace_id,
953                    perspective: None,
954                    query: None,
955                    max_items: 10,
956                    include_raw: true,
957                    include_recent: true,
958                    include_semantic: false,
959                    include_derived: false,
960                    include_digests: false,
961                    include_contradictions: false,
962                    ..WorkingRepresentationRequest::default()
963                },
964                &repo,
965            )
966            .await
967            .unwrap();
968
969        assert_eq!(representation.recent.len(), 2);
970        assert!(representation
971            .recent
972            .iter()
973            .any(|memory| memory.content == "raw hook payload"));
974    }
975
976    #[test]
977    fn test_bucket_limits_match_locked_default_allocation() {
978        let limits = bucket_limits(24);
979        assert_eq!(limits.digests, 2);
980        assert_eq!(limits.contradictions, 2);
981        assert_eq!(limits.derived, 6);
982        assert_eq!(limits.semantic, 7);
983        assert_eq!(limits.recent, 7);
984    }
985
986    #[test]
987    fn test_bucket_limits_preserve_recent_slot_for_tiny_requests() {
988        let four = bucket_limits(4);
989        assert!(four.recent >= 1);
990
991        let single = bucket_limits(1);
992        assert_eq!(single.recent, 1);
993    }
994
995    #[tokio::test]
996    async fn test_build_representation_filters_below_confidence_thresholds() {
997        let (repo, namespace_id, perspective) = setup_repo().await;
998
999        let mut low_derived = metadata(CognitiveLevel::Derived, &perspective);
1000        low_derived["cognitive"]["confidence"] = serde_json::json!(0.60);
1001        repo.store(StoreMemoryParams {
1002            namespace_id,
1003            content: "low confidence derived",
1004            category: &MemoryCategory::Facts,
1005            memory_lane_type: None,
1006            labels: &[],
1007            metadata: &low_derived,
1008            embedding: None,
1009            embedding_model: None,
1010        })
1011        .await
1012        .unwrap();
1013
1014        let mut high_derived = metadata(CognitiveLevel::Derived, &perspective);
1015        high_derived["cognitive"]["confidence"] = serde_json::json!(0.90);
1016        repo.store(StoreMemoryParams {
1017            namespace_id,
1018            content: "high confidence derived",
1019            category: &MemoryCategory::Facts,
1020            memory_lane_type: None,
1021            labels: &[],
1022            metadata: &high_derived,
1023            embedding: None,
1024            embedding_model: None,
1025        })
1026        .await
1027        .unwrap();
1028
1029        let representation = RepresentationService::new()
1030            .build(
1031                &WorkingRepresentationRequest {
1032                    namespace_id,
1033                    perspective: Some(perspective),
1034                    query: None,
1035                    max_items: 10,
1036                    include_raw: false,
1037                    include_recent: false,
1038                    include_semantic: false,
1039                    include_derived: true,
1040                    include_digests: false,
1041                    include_contradictions: false,
1042                    ..WorkingRepresentationRequest::default()
1043                },
1044                &repo,
1045            )
1046            .await
1047            .unwrap();
1048
1049        assert_eq!(representation.derived.len(), 1);
1050        assert_eq!(representation.derived[0].content, "high confidence derived");
1051    }
1052
1053    #[tokio::test]
1054    async fn test_build_representation_prefers_vector_semantic_matches() {
1055        let (repo, namespace_id, perspective) = setup_repo().await;
1056        let embedder = Arc::new(MockEmbeddingService::new());
1057        let semantic_query = "provider switch rollout";
1058        let vector = embedder.embed(semantic_query).await.unwrap();
1059
1060        store_memory_with_embedding(
1061            &repo,
1062            namespace_id,
1063            "migration timeline digest summary",
1064            CognitiveLevel::Derived,
1065            &perspective,
1066            &vector,
1067        )
1068        .await;
1069        store_memory(
1070            &repo,
1071            namespace_id,
1072            "provider switch rollout plain text fallback",
1073            CognitiveLevel::Explicit,
1074            &perspective,
1075        )
1076        .await;
1077
1078        let representation = RepresentationService::with_embedder(embedder)
1079            .build(
1080                &WorkingRepresentationRequest {
1081                    namespace_id,
1082                    perspective: Some(perspective),
1083                    query: Some(semantic_query.to_string()),
1084                    max_items: 12,
1085                    include_raw: false,
1086                    include_recent: false,
1087                    include_semantic: true,
1088                    include_derived: false,
1089                    include_digests: false,
1090                    include_contradictions: false,
1091                    ..WorkingRepresentationRequest::default()
1092                },
1093                &repo,
1094            )
1095            .await
1096            .unwrap();
1097
1098        assert_eq!(representation.semantic.len(), 2);
1099        assert_eq!(
1100            representation.semantic[0].content,
1101            "migration timeline digest summary"
1102        );
1103        assert_eq!(
1104            representation.semantic[1].content,
1105            "provider switch rollout plain text fallback"
1106        );
1107    }
1108
1109    #[tokio::test]
1110    async fn test_build_representation_text_fallback_overfetches_after_filtering() {
1111        let (repo, namespace_id, perspective) = setup_repo().await;
1112        let embedder = Arc::new(MockEmbeddingService::new());
1113        let query = "provider switch rollout";
1114        let vector = embedder.embed(query).await.unwrap();
1115
1116        store_memory_with_embedding(
1117            &repo,
1118            namespace_id,
1119            "vector-only semantic memory",
1120            CognitiveLevel::Derived,
1121            &perspective,
1122            &vector,
1123        )
1124        .await;
1125
1126        let mut low_derived = metadata(CognitiveLevel::Derived, &perspective);
1127        low_derived["cognitive"]["confidence"] = serde_json::json!(0.60);
1128        repo.store(StoreMemoryParams {
1129            namespace_id,
1130            content: "provider switch rollout low confidence derived",
1131            category: &MemoryCategory::Facts,
1132            memory_lane_type: None,
1133            labels: &[],
1134            metadata: &low_derived,
1135            embedding: None,
1136            embedding_model: None,
1137        })
1138        .await
1139        .unwrap();
1140
1141        let mut low_explicit = metadata(CognitiveLevel::Explicit, &perspective);
1142        low_explicit["cognitive"]["confidence"] = serde_json::json!(0.60);
1143        repo.store(StoreMemoryParams {
1144            namespace_id,
1145            content: "provider switch rollout low confidence explicit",
1146            category: &MemoryCategory::Facts,
1147            memory_lane_type: None,
1148            labels: &[],
1149            metadata: &low_explicit,
1150            embedding: None,
1151            embedding_model: None,
1152        })
1153        .await
1154        .unwrap();
1155
1156        store_memory(
1157            &repo,
1158            namespace_id,
1159            "provider switch rollout strong explicit",
1160            CognitiveLevel::Explicit,
1161            &perspective,
1162        )
1163        .await;
1164
1165        let text_hits = repo
1166            .search_by_text_memories(namespace_id, query, 4, false)
1167            .await
1168            .unwrap();
1169        assert_eq!(text_hits.len(), 3);
1170
1171        let representation = RepresentationService::with_embedder(embedder)
1172            .build(
1173                &WorkingRepresentationRequest {
1174                    namespace_id,
1175                    perspective: Some(perspective),
1176                    query: Some(query.to_string()),
1177                    max_items: 6,
1178                    include_raw: false,
1179                    include_recent: false,
1180                    include_semantic: true,
1181                    include_derived: false,
1182                    include_digests: false,
1183                    include_contradictions: false,
1184                    ..WorkingRepresentationRequest::default()
1185                },
1186                &repo,
1187            )
1188            .await
1189            .unwrap();
1190
1191        assert_eq!(representation.semantic.len(), 2);
1192        assert_eq!(
1193            representation.semantic[0].content,
1194            "vector-only semantic memory"
1195        );
1196        assert_eq!(
1197            representation.semantic[1].content,
1198            "provider switch rollout strong explicit"
1199        );
1200        assert_eq!(
1201            cognitive_level_from_metadata(&representation.semantic[1].metadata),
1202            CognitiveLevel::Explicit
1203        );
1204    }
1205}