1use std::collections::HashMap;
11use std::time::Instant;
12
13use nexus_core::config::AgentConfig;
14use nexus_core::traits::EmbeddingService;
15use nexus_core::{Memory, WorkingRepresentationRequest};
16use nexus_lephase::{CompressionMode, LePhaseIntegration};
17use nexus_llm::{ChatMessage, GenerateParams, LlmClient, TokenUsage};
18use nexus_storage::repository::{MemoryRelationRepository, MemoryRepository, NamespaceRepository};
19use tracing::{debug, info, warn};
20
21use crate::error::AgentError;
22use crate::identity::IdentityResolver;
23use crate::prompts::{
24 query_refinement_user_prompt, query_user_prompt_with_lineage, QUERY_SYSTEM_PROMPT,
25};
26use crate::ranking::{
27 flatten_ranked_representation_with_excluded, BucketedMemory, RankedExcludedMemory, RankedResult,
28};
29use crate::representation::RepresentationService;
30use crate::types::{
31 BucketIntrospectionStats, ExcludedCandidate, InclusionReason, InclusionSignal, MemoryBucket,
32 MemoryLineage, QueryAnswer, QueryIntrospection, RelevantReflection,
33 RepresentationConfigSnapshot,
34};
35use crate::util::{
36 extract_agent_summary, flush_metric_samples, parse_json_response, stage_metric_sample,
37 token_usage_metric_samples, CognitionSnapshot,
38};
39
40pub struct QueryService {
41 llm: std::sync::Arc<dyn LlmClient>,
42 config: AgentConfig,
43 representation: RepresentationService,
44}
45
46const PHASE_GROUPING_THRESHOLD: usize = 3;
48
49impl QueryService {
50 pub fn new(llm: std::sync::Arc<dyn LlmClient>, config: AgentConfig) -> Self {
51 Self {
52 llm,
53 config,
54 representation: RepresentationService::new(),
55 }
56 }
57
58 pub fn with_embedder(
59 llm: std::sync::Arc<dyn LlmClient>,
60 config: AgentConfig,
61 embedder: std::sync::Arc<dyn EmbeddingService>,
62 ) -> Self {
63 Self {
64 llm,
65 config,
66 representation: RepresentationService::with_embedder(embedder),
67 }
68 }
69
70 pub async fn query(
71 &self,
72 question: &str,
73 namespace_id: i64,
74 memory_repo: &MemoryRepository,
75 relation_repo: &MemoryRelationRepository<'_>,
76 ) -> Result<QueryAnswer, AgentError> {
77 let request = WorkingRepresentationRequest {
78 namespace_id,
79 perspective: None,
80 query: Some(question.to_string()),
81 max_items: self.config.query_context_limit,
82 include_raw: false,
83 ..WorkingRepresentationRequest::default()
84 };
85 let request = self.with_cross_namespace_ids(request, memory_repo).await?;
86
87 self.query_with_representation(question, request, memory_repo, relation_repo)
88 .await
89 }
90
91 pub async fn query_with_representation(
92 &self,
93 question: &str,
94 request: WorkingRepresentationRequest,
95 memory_repo: &MemoryRepository,
96 _relation_repo: &MemoryRelationRepository<'_>,
97 ) -> Result<QueryAnswer, AgentError> {
98 info!(question = %question, "Processing query");
99 let total_started = Instant::now();
100 let mut metrics = Vec::new();
101
102 let representation_started = Instant::now();
103 let representation = self
104 .representation
105 .build(&request, memory_repo)
106 .await
107 .map_err(|e| {
108 warn!(error = %e, "Failed to build working representation");
109 AgentError::Storage(e.to_string())
110 })?;
111 metrics.push(stage_metric_sample(
112 request.namespace_id,
113 "cognition.query.representation_ms",
114 representation_started.elapsed().as_secs_f64() * 1000.0,
115 "representation",
116 ));
117
118 let flatten_started = Instant::now();
119 let ranked = flatten_ranked_representation_with_excluded(representation, &request);
120 let bucketed = &ranked.included;
121 metrics.push(stage_metric_sample(
122 request.namespace_id,
123 "cognition.query.flatten_ms",
124 flatten_started.elapsed().as_secs_f64() * 1000.0,
125 "flatten",
126 ));
127 debug!(count = bucketed.len(), "Found relevant memories");
128
129 if bucketed.is_empty() {
130 let answer_started = Instant::now();
131 let (answer, usage) = self.generate_answer(question, "").await?;
132 metrics.push(stage_metric_sample(
133 request.namespace_id,
134 "cognition.query.answer_ms",
135 answer_started.elapsed().as_secs_f64() * 1000.0,
136 "answer",
137 ));
138 metrics.extend(token_usage_metric_samples(
139 request.namespace_id,
140 "cognition.query.answer",
141 "answer",
142 usage.as_ref(),
143 ));
144 metrics.push(stage_metric_sample(
145 request.namespace_id,
146 "cognition.query.total_ms",
147 total_started.elapsed().as_secs_f64() * 1000.0,
148 "total",
149 ));
150 flush_metric_samples(memory_repo, &metrics).await;
151 return Ok(answer);
152 }
153
154 let lineages = build_lineages(bucketed);
156
157 let context_started = Instant::now();
159 let context = if bucketed.len() >= PHASE_GROUPING_THRESHOLD {
160 self.build_phase_aware_context(bucketed, &lineages)?
161 } else {
162 self.build_lightweight_context(bucketed, &lineages)?
163 };
164 metrics.push(stage_metric_sample(
165 request.namespace_id,
166 "cognition.query.context_ms",
167 context_started.elapsed().as_secs_f64() * 1000.0,
168 "context",
169 ));
170
171 debug!(context_len = context.len(), "Built query context");
172
173 let answer_started = Instant::now();
174 let (answer, usage) = self.generate_answer(question, &context).await?;
175 metrics.push(stage_metric_sample(
176 request.namespace_id,
177 "cognition.query.answer_ms",
178 answer_started.elapsed().as_secs_f64() * 1000.0,
179 "answer",
180 ));
181 metrics.extend(token_usage_metric_samples(
182 request.namespace_id,
183 "cognition.query.answer",
184 "answer",
185 usage.as_ref(),
186 ));
187 let mut answer = if should_refine_answer(question, &answer, bucketed) {
188 let refine_started = Instant::now();
189 let (refined, usage) = self
190 .generate_refined_answer(question, &context, &answer)
191 .await?;
192 metrics.push(stage_metric_sample(
193 request.namespace_id,
194 "cognition.query.refine_ms",
195 refine_started.elapsed().as_secs_f64() * 1000.0,
196 "refine",
197 ));
198 metrics.extend(token_usage_metric_samples(
199 request.namespace_id,
200 "cognition.query.refine",
201 "refine",
202 usage.as_ref(),
203 ));
204 select_better_answer(answer, refined)
205 } else {
206 answer
207 };
208 answer.lineages = lineages;
209
210 metrics.push(stage_metric_sample(
211 request.namespace_id,
212 "cognition.query.total_ms",
213 total_started.elapsed().as_secs_f64() * 1000.0,
214 "total",
215 ));
216 flush_metric_samples(memory_repo, &metrics).await;
217
218 info!("Query answered successfully");
219 Ok(answer)
220 }
221
222 pub async fn query_introspection(
228 &self,
229 question: &str,
230 namespace_id: i64,
231 memory_repo: &MemoryRepository,
232 ) -> Result<QueryIntrospection, AgentError> {
233 let request = WorkingRepresentationRequest {
234 namespace_id,
235 perspective: None,
236 query: Some(question.to_string()),
237 max_items: self.config.query_context_limit,
238 include_raw: false,
239 ..WorkingRepresentationRequest::default()
240 };
241 let request = self.with_cross_namespace_ids(request, memory_repo).await?;
242
243 self.introspection_with_representation(&request, question, memory_repo)
244 .await
245 }
246
247 pub async fn introspection_with_representation(
249 &self,
250 request: &WorkingRepresentationRequest,
251 question: &str,
252 memory_repo: &MemoryRepository,
253 ) -> Result<QueryIntrospection, AgentError> {
254 introspect_query_with_representation_service(
255 &self.representation,
256 request,
257 question,
258 memory_repo,
259 )
260 .await
261 }
262
263 async fn with_cross_namespace_ids(
264 &self,
265 mut request: WorkingRepresentationRequest,
266 memory_repo: &MemoryRepository,
267 ) -> Result<WorkingRepresentationRequest, AgentError> {
268 if !request.cross_namespace_ids.is_empty() {
269 return Ok(request);
270 }
271
272 let namespace_repo = NamespaceRepository::new(memory_repo.pool().clone());
273 let namespaces = namespace_repo
274 .list_all()
275 .await
276 .map_err(|e| AgentError::Storage(e.to_string()))?;
277 let Some(current) = namespaces
278 .iter()
279 .find(|namespace| namespace.id == request.namespace_id)
280 else {
281 return Ok(request);
282 };
283
284 request.cross_namespace_ids =
285 IdentityResolver::related_namespace_ids(&namespace_repo, ¤t.name)
286 .await
287 .into_iter()
288 .filter(|id| *id != request.namespace_id)
289 .collect();
290
291 Ok(request)
292 }
293
294 fn build_lightweight_context(
299 &self,
300 bucketed: &[BucketedMemory],
301 lineages: &[MemoryLineage],
302 ) -> Result<String, AgentError> {
303 let mut parts = Vec::with_capacity(bucketed.len());
304
305 for (bm, lineage) in bucketed.iter().zip(lineages.iter()) {
306 let summary = extract_agent_summary(
307 &serde_json::to_string(&bm.memory.metadata).unwrap_or_else(|_| "{}".to_string()),
308 &bm.memory.content,
309 300,
310 );
311
312 let relevance = lineage
313 .relevance_score
314 .map_or(String::new(), |r| format!(", relevance: {:.2}", r));
315
316 parts.push(format!(
317 "[Memory #{}] {} (bucket: {}, phase: {}{})\nSummary: {}",
318 bm.memory.id,
319 bm.memory.content.chars().take(100).collect::<String>(),
320 lineage.bucket,
321 lineage.phase,
322 relevance,
323 summary,
324 ));
325 }
326
327 Ok(parts.join("\n\n"))
328 }
329
330 fn build_phase_aware_context(
336 &self,
337 bucketed: &[BucketedMemory],
338 lineages: &[MemoryLineage],
339 ) -> Result<String, AgentError> {
340 let mut lephase = LePhaseIntegration::with_mode(CompressionMode::Balanced);
341
342 for bm in bucketed {
344 lephase.register_memory(&bm.memory);
345 }
346
347 let memories: Vec<Memory> = bucketed.iter().map(|bm| bm.memory.clone()).collect();
348 let formatted = lephase.format_for_model(&memories, None);
349
350 let lineage_map: HashMap<i64, &MemoryLineage> =
352 lineages.iter().map(|l| (l.memory_id, l)).collect();
353
354 let annotated = annotate_with_buckets(&formatted, &lineage_map);
356
357 Ok(annotated)
358 }
359
360 async fn generate_answer(
361 &self,
362 question: &str,
363 context: &str,
364 ) -> Result<(QueryAnswer, Option<TokenUsage>), AgentError> {
365 let user_msg = if context.is_empty() {
366 query_user_prompt_with_lineage(question, "No relevant memories found.")
367 } else {
368 query_user_prompt_with_lineage(question, context)
369 };
370
371 let params = GenerateParams {
372 messages: vec![
373 ChatMessage::system(QUERY_SYSTEM_PROMPT),
374 ChatMessage::user(user_msg),
375 ],
376 max_tokens: 4096,
377 temperature: 0.3,
378 json_mode: true,
379 };
380
381 let response = self
382 .llm
383 .generate(params)
384 .await
385 .map_err(|e| AgentError::Llm(e.to_string()))?;
386 let usage = response.usage.clone();
387 let answer: QueryAnswer =
388 parse_json_response(&response).map_err(|e| AgentError::Llm(e.to_string()))?;
389
390 Ok((answer, usage))
391 }
392
393 async fn generate_refined_answer(
394 &self,
395 question: &str,
396 context: &str,
397 draft: &QueryAnswer,
398 ) -> Result<(QueryAnswer, Option<TokenUsage>), AgentError> {
399 let params = GenerateParams {
400 messages: vec![
401 ChatMessage::system(QUERY_SYSTEM_PROMPT),
402 ChatMessage::user(query_refinement_user_prompt(
403 question,
404 context,
405 &draft.answer,
406 )),
407 ],
408 max_tokens: 4096,
409 temperature: 0.2,
410 json_mode: true,
411 };
412
413 let response = self
414 .llm
415 .generate(params)
416 .await
417 .map_err(|e| AgentError::Llm(e.to_string()))?;
418 let usage = response.usage.clone();
419 let answer: QueryAnswer =
420 parse_json_response(&response).map_err(|e| AgentError::Llm(e.to_string()))?;
421 Ok((answer, usage))
422 }
423}
424
425fn introspection_request(request: &WorkingRepresentationRequest) -> WorkingRepresentationRequest {
426 let mut overfetch = request.clone();
427 overfetch.max_items = request
428 .max_items
429 .saturating_mul(3)
430 .max(request.max_items + 8);
431 overfetch
432}
433
434fn build_lineages(bucketed: &[BucketedMemory]) -> Vec<MemoryLineage> {
440 let analyzer = nexus_lephase::PhaseAnalyzer::new();
441
442 bucketed
443 .iter()
444 .map(|bm| {
445 let analysis = analyzer.analyze(&bm.memory);
446 MemoryLineage {
447 memory_id: bm.memory.id,
448 bucket: bm.bucket,
449 phase: analysis.phase.phase_type.to_string(),
450 relevance_score: bm
451 .memory
452 .relevance_score
453 .or(bm.memory.similarity_score)
454 .or(Some((bm.blended_score / 100.0).clamp(0.0, 1.0))),
455 }
456 })
457 .collect()
458}
459
460fn should_refine_answer(question: &str, answer: &QueryAnswer, bucketed: &[BucketedMemory]) -> bool {
461 if bucketed.is_empty() {
462 return false;
463 }
464
465 let lower_question = question.to_ascii_lowercase();
466 let question_word_count = question.split_whitespace().count();
467 let question_complex = question.len() > 120
468 || question_word_count > 18
469 || [
470 "why",
471 "how",
472 "compare",
473 "contrast",
474 "tradeoff",
475 "timeline",
476 "relationship",
477 "explain",
478 "summarize",
479 ]
480 .iter()
481 .any(|needle| lower_question.contains(needle));
482 let weak_answer =
483 answer.confidence < 0.72 || answer.citations.is_empty() || answer.answer.trim().len() < 40;
484 let broad_context = bucketed.len() >= 6;
485 let contradiction_present = bucketed
486 .iter()
487 .any(|memory| memory.bucket == crate::MemoryBucket::Contradictions);
488
489 (weak_answer && (question_complex || broad_context || contradiction_present))
490 || (question_complex && answer.confidence < 0.82 && broad_context)
491}
492
493fn select_better_answer(initial: QueryAnswer, refined: QueryAnswer) -> QueryAnswer {
494 if answer_quality(&refined) >= answer_quality(&initial) {
495 refined
496 } else {
497 initial
498 }
499}
500
501fn answer_quality(answer: &QueryAnswer) -> f32 {
502 let citation_bonus = (answer.citations.len().min(4) as f32) * 0.05;
503 let answer_length_bonus = if answer.answer.trim().len() >= 40 {
504 0.02
505 } else {
506 0.0
507 };
508 answer.confidence + citation_bonus + answer_length_bonus
509}
510
511fn annotate_with_buckets(formatted: &str, lineage_map: &HashMap<i64, &MemoryLineage>) -> String {
514 let mut out = String::with_capacity(formatted.len() + 256);
515 for line in formatted.lines() {
516 out.push_str(line);
517 if let Some(id_str) = line.strip_prefix("[Memory #") {
518 if let Some(end) = id_str.find(']') {
519 if let Ok(id) = id_str[..end].parse::<i64>() {
520 if let Some(lineage) = lineage_map.get(&id) {
521 out.push_str(&format!(" (bucket: {})", lineage.bucket));
522 }
523 }
524 }
525 }
526 out.push('\n');
527 }
528 out
529}
530
531const CONTENT_PREVIEW_LEN: usize = 80;
536const MAX_EXCLUDED_CANDIDATES: usize = 20;
537const MAX_RELEVANT_REFLECTIONS: usize = 10;
538
539pub(crate) async fn build_introspection(
543 ranked: &RankedResult,
544 question: &str,
545 request: &WorkingRepresentationRequest,
546 memory_repo: &MemoryRepository,
547 started: Instant,
548) -> Result<QueryIntrospection, AgentError> {
549 let included = build_inclusion_reasons(&ranked.included, request);
550 let excluded = build_excluded_candidates(&ranked.excluded);
551 let bucket_stats = build_bucket_stats(&ranked.included, &ranked.excluded);
552 let relevant_reflections = fetch_relevant_reflections(question, request, memory_repo).await?;
553
554 let pipeline_latency_ms = Some(started.elapsed().as_millis() as u64);
555 let representation_config = Some(RepresentationConfigSnapshot {
556 max_items: request.max_items,
557 include_raw: request.include_raw,
558 include_digests: request.include_digests,
559 include_semantic: request.include_semantic,
560 include_derived: request.include_derived,
561 include_contradictions: request.include_contradictions,
562 });
563
564 Ok(QueryIntrospection {
565 included,
566 excluded_candidates: excluded,
567 relevant_reflections,
568 bucket_stats,
569 pipeline_latency_ms,
570 representation_config,
571 })
572}
573
574fn build_inclusion_reasons(
575 bucketed: &[BucketedMemory],
576 request: &WorkingRepresentationRequest,
577) -> Vec<InclusionReason> {
578 let analyzer = nexus_lephase::PhaseAnalyzer::new();
579
580 bucketed
581 .iter()
582 .map(|bm| {
583 let analysis = analyzer.analyze(&bm.memory);
584 let relevance = bm
585 .memory
586 .relevance_score
587 .or(bm.memory.similarity_score)
588 .or(Some((bm.blended_score / 100.0).clamp(0.0, 1.0)));
589 let signals = extract_inclusion_signals(bm, request);
590
591 InclusionReason {
592 memory_id: bm.memory.id,
593 bucket: bm.bucket,
594 phase: analysis.phase.phase_type.to_string(),
595 relevance_score: relevance,
596 blended_score: bm.blended_score,
597 reason: classify_inclusion_reason(bm, relevance),
598 signals,
599 }
600 })
601 .collect()
602}
603
604fn extract_inclusion_signals(
608 bm: &BucketedMemory,
609 request: &WorkingRepresentationRequest,
610) -> Vec<InclusionSignal> {
611 use crate::util::CognitionSnapshot;
612 use nexus_core::CognitiveLevel;
613
614 let memory = &bm.memory;
615 let snapshot = CognitionSnapshot::from_memory(memory);
616 let mut signals = Vec::new();
617
618 let age_hours = (chrono::Utc::now() - memory.created_at).num_hours();
620 let recency_desc = match age_hours {
621 h if h <= 1 => "Created within the last hour".to_string(),
622 h if h <= 6 => format!("Created {h}h ago"),
623 h if h <= 24 => format!("Created {h}h ago"),
624 h if h <= 72 => format!("Created {h}h ago"),
625 h if h <= 168 => format!("Created {}d ago", h / 24),
626 _ => format!("Created {}d ago", age_hours / 24),
627 };
628 let recency_weight = match age_hours {
629 h if h <= 1 => 1.0,
630 h if h <= 6 => 0.8,
631 h if h <= 24 => 0.6,
632 h if h <= 72 => 0.35,
633 h if h <= 168 => 0.15,
634 _ => 0.0,
635 };
636 signals.push(InclusionSignal {
637 signal_type: "recency".to_string(),
638 description: recency_desc,
639 weight_contribution: recency_weight,
640 });
641
642 let level_desc = match snapshot.level {
644 CognitiveLevel::Explicit => "Explicit factual memory",
645 CognitiveLevel::Derived => "System-derived insight",
646 CognitiveLevel::Contradiction => "Detected contradiction",
647 CognitiveLevel::SummaryShort => "Short-form session digest",
648 CognitiveLevel::SummaryLong => "Long-form session digest",
649 CognitiveLevel::Raw => "Raw activity record",
650 };
651 let confidence = snapshot.confidence.unwrap_or(0.75).clamp(0.0, 1.0);
652 signals.push(InclusionSignal {
653 signal_type: "cognitive_level".to_string(),
654 description: format!("{level_desc} (confidence: {:.2})", confidence),
655 weight_contribution: confidence,
656 });
657
658 if matches!(bm.bucket, MemoryBucket::Semantic) {
660 let similarity = memory
661 .relevance_score
662 .or(memory.similarity_score)
663 .unwrap_or_default();
664 if similarity > 0.0 {
665 signals.push(InclusionSignal {
666 signal_type: "semantic_similarity".to_string(),
667 description: format!("Embedding similarity score: {:.3}", similarity),
668 weight_contribution: similarity,
669 });
670 }
671 }
672
673 if let Some(ref req_perspective) = request.perspective {
675 if let Some(ref mem_perspective) = snapshot.perspective {
676 let observer_match = mem_perspective.observer == req_perspective.observer;
677 let subject_match = mem_perspective.subject == req_perspective.subject;
678 if observer_match || subject_match {
679 let match_kind = if observer_match && subject_match {
680 "full perspective match"
681 } else if observer_match {
682 "observer match"
683 } else {
684 "subject match"
685 };
686 let weight = if observer_match && subject_match {
687 1.0
688 } else {
689 0.5
690 };
691 signals.push(InclusionSignal {
692 signal_type: "perspective_match".to_string(),
693 description: match_kind.to_string(),
694 weight_contribution: weight,
695 });
696 }
697 }
698 }
699
700 if snapshot.times_reinforced > 0 {
702 let reinforced = ((snapshot.times_reinforced as f32) / 5.0).min(1.0);
703 signals.push(InclusionSignal {
704 signal_type: "reinforcement".to_string(),
705 description: format!("Reinforced {} time(s)", snapshot.times_reinforced),
706 weight_contribution: reinforced,
707 });
708 }
709
710 let bucket_desc = match bm.bucket {
712 MemoryBucket::Digests => Some("Digest bucket — prioritized for context grounding"),
713 MemoryBucket::Contradictions => {
714 Some("Contradiction bucket — surfaced for conflict awareness")
715 }
716 MemoryBucket::Derived => Some("Derived bucket — system insight priority"),
717 MemoryBucket::Semantic => None,
718 MemoryBucket::Recent => None,
719 };
720 if let Some(desc) = bucket_desc {
721 signals.push(InclusionSignal {
722 signal_type: "bucket_boost".to_string(),
723 description: desc.to_string(),
724 weight_contribution: 0.0,
725 });
726 }
727
728 signals
729}
730
731fn classify_inclusion_reason(bm: &BucketedMemory, relevance: Option<f32>) -> String {
732 match bm.bucket {
733 MemoryBucket::Semantic => {
734 let score_str = relevance
735 .map(|s| format!("{:.2}", s))
736 .unwrap_or_else(|| "N/A".to_string());
737 format!("Semantic match (score: {}) in semantic bucket", score_str)
738 }
739 MemoryBucket::Digests => "Session digest selected for context grounding".to_string(),
740 MemoryBucket::Derived => {
741 format!(
742 "Reinforced derived insight (blended: {:.2}) in derived bucket",
743 bm.blended_score
744 )
745 }
746 MemoryBucket::Recent => {
747 format!(
748 "Recent memory (blended: {:.2}) in recent bucket",
749 bm.blended_score
750 )
751 }
752 MemoryBucket::Contradictions => {
753 "Contradiction detected — surfaced for conflict awareness".to_string()
754 }
755 }
756}
757
758fn build_excluded_candidates(excluded: &[RankedExcludedMemory]) -> Vec<ExcludedCandidate> {
759 excluded
760 .iter()
761 .take(MAX_EXCLUDED_CANDIDATES)
762 .map(|excluded| ExcludedCandidate {
763 memory_id: excluded.memory.id,
764 bucket: excluded.bucket,
765 blended_score: excluded.blended_score,
766 reason: excluded.reason.clone(),
767 content_preview: truncate_str(&excluded.memory.content, CONTENT_PREVIEW_LEN),
768 })
769 .collect()
770}
771
772fn build_bucket_stats(
773 included: &[BucketedMemory],
774 excluded: &[RankedExcludedMemory],
775) -> Vec<BucketIntrospectionStats> {
776 let mut all_buckets: Vec<(MemoryBucket, usize, usize)> = Vec::new();
777
778 for bm in included {
779 if let Some(entry) = all_buckets.iter_mut().find(|(b, _, _)| *b == bm.bucket) {
780 entry.1 += 1;
781 } else {
782 all_buckets.push((bm.bucket, 1, 0));
783 }
784 }
785
786 for excluded in excluded {
787 if let Some(entry) = all_buckets
788 .iter_mut()
789 .find(|(bucket, _, _)| *bucket == excluded.bucket)
790 {
791 entry.2 += 1;
792 } else {
793 all_buckets.push((excluded.bucket, 0, 1));
794 }
795 }
796
797 all_buckets
798 .into_iter()
799 .map(|(bucket, inc, exc)| BucketIntrospectionStats {
800 bucket,
801 fetched: inc + exc,
802 included: inc,
803 excluded: exc,
804 })
805 .collect()
806}
807
808async fn fetch_relevant_reflections(
809 question: &str,
810 request: &WorkingRepresentationRequest,
811 memory_repo: &MemoryRepository,
812) -> Result<Vec<RelevantReflection>, AgentError> {
813 let query_terms = tokenize_query(question);
814 let filters = nexus_storage::repository::ListMemoryFilters {
815 category: None,
816 since: None,
817 until: None,
818 content_like: None,
819 include_raw: false,
820 limit: (MAX_RELEVANT_REFLECTIONS as i64).max(10) * 8,
821 offset: 0,
822 };
823
824 let all = memory_repo
825 .list_filtered(request.namespace_id, filters)
826 .await
827 .map_err(|e| AgentError::Storage(e.to_string()))?;
828
829 let mut ranked: Vec<(f32, Memory)> = all
830 .into_iter()
831 .filter(|m| {
832 if !reflection_matches_request_scope(m, request) {
833 return false;
834 }
835 let level = m
836 .metadata
837 .get("cognitive")
838 .and_then(|c| c.get("level"))
839 .and_then(|v| v.as_str())
840 .unwrap_or("");
841 level == "derived" || level == "contradiction"
842 })
843 .map(|m| {
844 let content_terms = tokenize_query(&m.content);
845 let overlap = if query_terms.is_empty() {
846 0.0
847 } else {
848 let shared = query_terms.intersection(&content_terms).count() as f32;
849 shared / query_terms.len() as f32
850 };
851 let confidence = m
852 .metadata
853 .get("cognitive")
854 .and_then(|c| c.get("confidence"))
855 .and_then(|v| v.as_f64())
856 .unwrap_or(0.75) as f32;
857 let recency = {
858 let age_hours = (chrono::Utc::now() - m.created_at).num_hours();
859 match age_hours {
860 h if h <= 1 => 1.0,
861 h if h <= 6 => 0.8,
862 h if h <= 24 => 0.6,
863 h if h <= 72 => 0.35,
864 h if h <= 168 => 0.15,
865 _ => 0.0,
866 }
867 };
868 let score = if query_terms.is_empty() {
869 0.5 * confidence + 0.5 * recency
870 } else {
871 0.65 * overlap + 0.20 * confidence + 0.15 * recency
872 };
873 (score, m)
874 })
875 .filter(|(score, _)| query_terms.is_empty() || *score > 0.0)
876 .collect();
877 ranked.sort_by(|left, right| {
878 right
879 .0
880 .partial_cmp(&left.0)
881 .unwrap_or(std::cmp::Ordering::Equal)
882 .then_with(|| right.1.created_at.cmp(&left.1.created_at))
883 });
884
885 let reflections: Vec<RelevantReflection> = ranked
886 .into_iter()
887 .take(MAX_RELEVANT_REFLECTIONS)
888 .map(|(_, m)| {
889 let reflection_type = m
890 .metadata
891 .get("cognitive")
892 .and_then(|c| c.get("level"))
893 .and_then(|v| v.as_str())
894 .unwrap_or("unknown")
895 .to_string();
896 let confidence = m
897 .metadata
898 .get("cognitive")
899 .and_then(|c| c.get("confidence"))
900 .and_then(|v| v.as_f64());
901
902 RelevantReflection {
903 memory_id: m.id,
904 reflection_type,
905 content_preview: truncate_str(&m.content, CONTENT_PREVIEW_LEN),
906 confidence,
907 created_at: m.created_at.to_rfc3339(),
908 }
909 })
910 .collect();
911
912 Ok(reflections)
913}
914
915fn reflection_matches_request_scope(
916 memory: &Memory,
917 request: &WorkingRepresentationRequest,
918) -> bool {
919 let Some(request_perspective) = request.perspective.as_ref() else {
920 return true;
921 };
922
923 let snapshot = CognitionSnapshot::from_memory(memory);
924 let Some(memory_perspective) = snapshot.perspective.as_ref() else {
925 return false;
926 };
927
928 if memory_perspective.observer != request_perspective.observer
929 || memory_perspective.subject != request_perspective.subject
930 {
931 return false;
932 }
933
934 match request_perspective.session_key.as_deref() {
935 Some(session_key) => memory_perspective.session_key.as_deref() == Some(session_key),
936 None => true,
937 }
938}
939
940fn tokenize_query(text: &str) -> std::collections::BTreeSet<String> {
941 text.split(|c: char| !c.is_alphanumeric())
942 .filter_map(|segment| {
943 let term = segment.trim().to_ascii_lowercase();
944 (term.len() >= 3).then_some(term)
945 })
946 .collect()
947}
948
949fn truncate_str(s: &str, max: usize) -> String {
950 if s.len() <= max {
951 s.to_string()
952 } else {
953 let mut end = max;
954 while end > 0 && !s.is_char_boundary(end) {
955 end -= 1;
956 }
957 format!("{}...", &s[..end])
958 }
959}
960
961pub async fn introspect_query(
973 request: &WorkingRepresentationRequest,
974 question: &str,
975 memory_repo: &MemoryRepository,
976) -> Result<QueryIntrospection, AgentError> {
977 introspect_query_with_representation_service(
978 &RepresentationService::new(),
979 request,
980 question,
981 memory_repo,
982 )
983 .await
984}
985
986async fn introspect_query_with_representation_service(
987 representation: &RepresentationService,
988 request: &WorkingRepresentationRequest,
989 question: &str,
990 memory_repo: &MemoryRepository,
991) -> Result<QueryIntrospection, AgentError> {
992 let started = Instant::now();
993 let overfetch_request = introspection_request(request);
994 let representation = representation
995 .build(&overfetch_request, memory_repo)
996 .await
997 .map_err(|e| AgentError::Storage(e.to_string()))?;
998
999 let ranked = flatten_ranked_representation_with_excluded(representation, request);
1000
1001 build_introspection(&ranked, question, request, memory_repo, started).await
1002}
1003
1004#[cfg(test)]
1005mod tests {
1006 use super::*;
1007 use crate::types::{ExclusionReason, MemoryBucket};
1008 use async_trait::async_trait;
1009 use nexus_core::{CognitiveLevel, CognitiveMetadata, MemoryCategory, PerspectiveKey};
1010 use nexus_llm::GenerateResponse;
1011 use nexus_storage::repository::{
1012 MemoryRelationRepository, MemoryRepository, NamespaceRepository, StoreDigestParams,
1013 StoreMemoryParams,
1014 };
1015 use sqlx::sqlite::SqlitePoolOptions;
1016 use std::collections::VecDeque;
1017 use std::sync::{Arc, Mutex};
1018
1019 struct MockLlmClient {
1020 responses: Mutex<VecDeque<nexus_llm::Result<GenerateResponse>>>,
1021 calls: Mutex<Vec<GenerateParams>>,
1022 }
1023
1024 impl MockLlmClient {
1025 fn new(responses: Vec<nexus_llm::Result<GenerateResponse>>) -> Self {
1026 Self {
1027 responses: Mutex::new(VecDeque::from(responses)),
1028 calls: Mutex::new(Vec::new()),
1029 }
1030 }
1031
1032 fn call_count(&self) -> usize {
1033 self.calls.lock().expect("mock calls poisoned").len()
1034 }
1035
1036 fn user_messages(&self) -> Vec<String> {
1037 self.calls
1038 .lock()
1039 .expect("mock calls poisoned")
1040 .iter()
1041 .flat_map(|params| params.messages.iter())
1042 .filter(|message| message.role == "user")
1043 .map(|message| message.content.clone())
1044 .collect()
1045 }
1046 }
1047
1048 #[async_trait]
1049 impl LlmClient for MockLlmClient {
1050 async fn generate(&self, params: GenerateParams) -> nexus_llm::Result<GenerateResponse> {
1051 self.calls.lock().expect("mock calls poisoned").push(params);
1052 self.responses
1053 .lock()
1054 .expect("mock responses poisoned")
1055 .pop_front()
1056 .expect("mock response missing")
1057 }
1058
1059 fn provider_name(&self) -> String {
1060 "mock".to_string()
1061 }
1062
1063 fn model_name(&self) -> String {
1064 "mock-model".to_string()
1065 }
1066 }
1067
1068 async fn setup_repo() -> (sqlx::SqlitePool, MemoryRepository, i64, PerspectiveKey) {
1069 let pool = SqlitePoolOptions::new()
1070 .max_connections(1)
1071 .connect("sqlite::memory:")
1072 .await
1073 .unwrap();
1074 nexus_storage::migrations::run_migrations(&pool)
1075 .await
1076 .unwrap();
1077 let namespace_repo = NamespaceRepository::new(pool.clone());
1078 let namespace = namespace_repo
1079 .get_or_create("query-test", "query-test")
1080 .await
1081 .unwrap();
1082 let perspective =
1083 PerspectiveKey::new("claude-code", "claude-code", Some("session-1".to_string()));
1084 (
1085 pool.clone(),
1086 MemoryRepository::new(pool),
1087 namespace.id,
1088 perspective,
1089 )
1090 }
1091
1092 fn metadata(level: CognitiveLevel, perspective: &PerspectiveKey) -> serde_json::Value {
1093 let mut cognitive = CognitiveMetadata::new(
1094 level,
1095 perspective.observer.clone(),
1096 perspective.subject.clone(),
1097 perspective.session_key.clone(),
1098 "test",
1099 );
1100 cognitive.confidence = Some(0.9);
1101 cognitive.merge_into(&serde_json::json!({}))
1102 }
1103
1104 async fn store_memory(
1105 repo: &MemoryRepository,
1106 namespace_id: i64,
1107 content: &str,
1108 level: CognitiveLevel,
1109 perspective: &PerspectiveKey,
1110 ) -> Memory {
1111 repo.store(StoreMemoryParams {
1112 namespace_id,
1113 content,
1114 category: &MemoryCategory::Facts,
1115 memory_lane_type: None,
1116 labels: &[],
1117 metadata: &metadata(level, perspective),
1118 embedding: None,
1119 embedding_model: None,
1120 })
1121 .await
1122 .unwrap()
1123 }
1124
1125 async fn store_raw_memory(
1126 repo: &MemoryRepository,
1127 namespace_id: i64,
1128 content: &str,
1129 perspective: &PerspectiveKey,
1130 ) -> Memory {
1131 repo.store(StoreMemoryParams {
1132 namespace_id,
1133 content,
1134 category: &MemoryCategory::Session,
1135 memory_lane_type: None,
1136 labels: &["raw-activity".to_string()],
1137 metadata: &serde_json::json!({
1138 "raw_activity": true,
1139 "cognitive": {
1140 "level": "raw",
1141 "observer": perspective.observer,
1142 "subject": perspective.subject,
1143 "session_key": perspective.session_key,
1144 "generated_by": "test"
1145 }
1146 }),
1147 embedding: None,
1148 embedding_model: None,
1149 })
1150 .await
1151 .unwrap()
1152 }
1153
1154 fn answer_response(answer: &str, confidence: f32, citations: &[i64]) -> GenerateResponse {
1155 let citations_json: Vec<serde_json::Value> = citations
1156 .iter()
1157 .map(|memory_id| {
1158 serde_json::json!({
1159 "memory_id": memory_id,
1160 "title": format!("Memory {}", memory_id),
1161 "excerpt": format!("Excerpt {}", memory_id)
1162 })
1163 })
1164 .collect();
1165 GenerateResponse {
1166 content: serde_json::json!({
1167 "answer": answer,
1168 "citations": citations_json,
1169 "confidence": confidence
1170 })
1171 .to_string(),
1172 model: "mock-model".to_string(),
1173 usage: None,
1174 }
1175 }
1176
1177 fn test_memory(id: i64, content: &str) -> Memory {
1178 Memory {
1179 id,
1180 namespace_id: 1,
1181 content: content.to_string(),
1182 category: nexus_core::MemoryCategory::Facts,
1183 labels: Vec::new(),
1184 metadata: serde_json::json!({}),
1185 ..Memory::default()
1186 }
1187 }
1188
1189 fn test_memory_with_relevance(id: i64, content: &str, relevance: f32) -> Memory {
1190 Memory {
1191 id,
1192 namespace_id: 1,
1193 content: content.to_string(),
1194 category: nexus_core::MemoryCategory::Facts,
1195 labels: Vec::new(),
1196 metadata: serde_json::json!({}),
1197 relevance_score: Some(relevance),
1198 ..Memory::default()
1199 }
1200 }
1201
1202 #[test]
1203 fn test_build_lineages_detects_phases() {
1204 let bucketed = vec![
1205 BucketedMemory {
1206 memory: test_memory(1, "Plan the next sprint tasks"),
1207 bucket: MemoryBucket::Recent,
1208 blended_score: 0.91,
1209 },
1210 BucketedMemory {
1211 memory: test_memory(2, "Implement the auth feature code"),
1212 bucket: MemoryBucket::Semantic,
1213 blended_score: 0.83,
1214 },
1215 BucketedMemory {
1216 memory: test_memory(3, "Fix the bug in error handling"),
1217 bucket: MemoryBucket::Derived,
1218 blended_score: 0.88,
1219 },
1220 ];
1221
1222 let lineages = build_lineages(&bucketed);
1223 assert_eq!(lineages.len(), 3);
1224
1225 assert_eq!(lineages[0].memory_id, 1);
1226 assert_eq!(lineages[0].bucket, MemoryBucket::Recent);
1227 assert_eq!(lineages[0].phase, "planning");
1228
1229 assert_eq!(lineages[1].memory_id, 2);
1230 assert_eq!(lineages[1].bucket, MemoryBucket::Semantic);
1231 assert_eq!(lineages[1].phase, "execution");
1232
1233 assert_eq!(lineages[2].memory_id, 3);
1234 assert_eq!(lineages[2].bucket, MemoryBucket::Derived);
1235 assert_eq!(lineages[2].phase, "debugging");
1236 }
1237
1238 #[test]
1239 fn test_build_lineages_captures_relevance_scores() {
1240 let bucketed = vec![BucketedMemory {
1241 memory: test_memory_with_relevance(42, "test content", 0.87),
1242 bucket: MemoryBucket::Semantic,
1243 blended_score: 0.95,
1244 }];
1245
1246 let lineages = build_lineages(&bucketed);
1247 assert_eq!(lineages[0].relevance_score, Some(0.87));
1248 }
1249
1250 #[test]
1251 fn test_build_lineages_falls_back_to_similarity_score() {
1252 let bucketed = vec![BucketedMemory {
1253 memory: Memory {
1254 id: 1,
1255 similarity_score: Some(0.72),
1256 ..test_memory(1, "test")
1257 },
1258 bucket: MemoryBucket::Semantic,
1259 blended_score: 0.79,
1260 }];
1261
1262 let lineages = build_lineages(&bucketed);
1263 assert_eq!(lineages[0].relevance_score, Some(0.72));
1264 }
1265
1266 #[test]
1267 fn test_should_refine_answer_for_complex_low_confidence_answer() {
1268 let bucketed = vec![
1269 BucketedMemory {
1270 memory: test_memory(1, "Digest"),
1271 bucket: MemoryBucket::Digests,
1272 blended_score: 0.88,
1273 },
1274 BucketedMemory {
1275 memory: test_memory(2, "Contradiction"),
1276 bucket: MemoryBucket::Contradictions,
1277 blended_score: 0.74,
1278 },
1279 ];
1280 let answer = QueryAnswer {
1281 answer: "Maybe.".to_string(),
1282 citations: Vec::new(),
1283 confidence: 0.55,
1284 ..Default::default()
1285 };
1286
1287 assert!(should_refine_answer(
1288 "Explain the tradeoff timeline and contradictions in this session",
1289 &answer,
1290 &bucketed,
1291 ));
1292 }
1293
1294 #[test]
1295 fn test_should_not_refine_simple_high_confidence_answer() {
1296 let bucketed = vec![BucketedMemory {
1297 memory: test_memory(1, "Recent memory"),
1298 bucket: MemoryBucket::Recent,
1299 blended_score: 0.82,
1300 }];
1301 let answer = QueryAnswer {
1302 answer: "The active provider is Gemini and the setting is already applied.".to_string(),
1303 citations: vec![crate::types::MemoryCitation {
1304 memory_id: 1,
1305 title: "Provider".to_string(),
1306 excerpt: "Gemini is active".to_string(),
1307 }],
1308 confidence: 0.91,
1309 ..Default::default()
1310 };
1311
1312 assert!(!should_refine_answer(
1313 "What is the active provider?",
1314 &answer,
1315 &bucketed,
1316 ));
1317 }
1318
1319 #[test]
1320 fn test_select_better_answer_prefers_cited_refined_answer() {
1321 let initial = QueryAnswer {
1322 answer: "Short".to_string(),
1323 citations: Vec::new(),
1324 confidence: 0.78,
1325 ..Default::default()
1326 };
1327 let refined = QueryAnswer {
1328 answer: "Longer answer with supporting detail and an explicit citation.".to_string(),
1329 citations: vec![crate::types::MemoryCitation {
1330 memory_id: 3,
1331 title: "Evidence".to_string(),
1332 excerpt: "Supporting excerpt".to_string(),
1333 }],
1334 confidence: 0.76,
1335 ..Default::default()
1336 };
1337
1338 let selected = select_better_answer(initial, refined);
1339 assert_eq!(selected.citations.len(), 1);
1340 }
1341
1342 #[test]
1345 fn test_annotate_with_buckets_appends_provenance() {
1346 let lineage = MemoryLineage {
1347 memory_id: 1,
1348 bucket: MemoryBucket::Semantic,
1349 phase: "execution".to_string(),
1350 relevance_score: Some(0.9),
1351 };
1352 let mut lineage_map = HashMap::new();
1353 lineage_map.insert(1, &lineage);
1354
1355 let formatted = "[Memory #1] Implement feature\nSome content\n";
1356 let annotated = annotate_with_buckets(formatted, &lineage_map);
1357
1358 assert!(annotated.contains("[Memory #1] Implement feature (bucket: semantic)"));
1359 assert!(annotated.contains("Some content"));
1360 assert_eq!(
1362 annotated.matches("(bucket:").count(),
1363 1,
1364 "expected exactly one bucket annotation"
1365 );
1366 }
1367
1368 #[test]
1369 fn test_annotate_skips_unknown_ids() {
1370 let lineage_map = HashMap::new();
1371 let formatted = "[Memory #1] Implement feature\n";
1372 let annotated = annotate_with_buckets(formatted, &lineage_map);
1373 assert_eq!(annotated, "[Memory #1] Implement feature\n");
1375 }
1376
1377 #[test]
1380 fn test_memory_bucket_display() {
1381 assert_eq!(MemoryBucket::Digests.to_string(), "digests");
1382 assert_eq!(MemoryBucket::Recent.to_string(), "recent");
1383 assert_eq!(MemoryBucket::Semantic.to_string(), "semantic");
1384 assert_eq!(MemoryBucket::Derived.to_string(), "derived");
1385 assert_eq!(MemoryBucket::Contradictions.to_string(), "contradictions");
1386 }
1387
1388 #[tokio::test]
1389 async fn test_query_service_empty_working_set_uses_no_relevant_memories_prompt() {
1390 let (pool, repo, namespace_id, _perspective) = setup_repo().await;
1391 let relation_repo = MemoryRelationRepository::new(&pool);
1392 let llm = Arc::new(MockLlmClient::new(vec![Ok(answer_response(
1393 "No memory matched.",
1394 0.92,
1395 &[],
1396 ))]));
1397 let service = QueryService::new(llm.clone(), AgentConfig::default());
1398
1399 let answer = service
1400 .query("What happened?", namespace_id, &repo, &relation_repo)
1401 .await
1402 .unwrap();
1403
1404 assert_eq!(answer.answer, "No memory matched.");
1405 assert!(answer.lineages.is_empty());
1406 assert_eq!(llm.call_count(), 1);
1407 let prompts = llm.user_messages();
1408 assert!(prompts
1409 .iter()
1410 .any(|prompt| prompt.contains("No relevant memories found.")));
1411 }
1412
1413 #[tokio::test]
1414 async fn test_query_service_excludes_raw_noise_by_default_and_attaches_lineages() {
1415 let (pool, repo, namespace_id, perspective) = setup_repo().await;
1416 let relation_repo = MemoryRelationRepository::new(&pool);
1417 let explicit = store_memory(
1418 &repo,
1419 namespace_id,
1420 "Explicit observation about retrieval ranking.",
1421 CognitiveLevel::Explicit,
1422 &perspective,
1423 )
1424 .await;
1425 let raw = store_raw_memory(
1426 &repo,
1427 namespace_id,
1428 "raw hook payload about retrieval ranking",
1429 &perspective,
1430 )
1431 .await;
1432 let llm = Arc::new(MockLlmClient::new(vec![Ok(answer_response(
1433 "Ranking was improved.",
1434 0.9,
1435 &[explicit.id],
1436 ))]));
1437 let service = QueryService::new(llm.clone(), AgentConfig::default());
1438
1439 let answer = service
1440 .query(
1441 "What changed in retrieval ranking?",
1442 namespace_id,
1443 &repo,
1444 &relation_repo,
1445 )
1446 .await
1447 .unwrap();
1448
1449 assert!(!answer.lineages.is_empty());
1450 assert!(answer
1451 .lineages
1452 .iter()
1453 .any(|lineage| lineage.memory_id == explicit.id));
1454 assert!(!answer
1455 .lineages
1456 .iter()
1457 .any(|lineage| lineage.memory_id == raw.id));
1458 let prompts = llm.user_messages();
1459 assert!(prompts
1460 .iter()
1461 .any(|prompt| prompt.contains("Explicit observation about retrieval ranking.")));
1462 assert!(!prompts
1463 .iter()
1464 .any(|prompt| prompt.contains("raw hook payload about retrieval ranking")));
1465 }
1466
1467 #[tokio::test]
1468 async fn test_query_with_representation_can_include_raw_when_requested() {
1469 let (pool, repo, namespace_id, perspective) = setup_repo().await;
1470 let relation_repo = MemoryRelationRepository::new(&pool);
1471 store_memory(
1472 &repo,
1473 namespace_id,
1474 "Explicit observation about hook routing.",
1475 CognitiveLevel::Explicit,
1476 &perspective,
1477 )
1478 .await;
1479 let raw = store_raw_memory(
1480 &repo,
1481 namespace_id,
1482 "raw hook payload about hook routing",
1483 &perspective,
1484 )
1485 .await;
1486 let llm = Arc::new(MockLlmClient::new(vec![Ok(answer_response(
1487 "The hook routing is visible through the explicit and raw activity records.",
1488 0.88,
1489 &[raw.id],
1490 ))]));
1491 let service = QueryService::new(llm.clone(), AgentConfig::default());
1492
1493 let answer = service
1494 .query_with_representation(
1495 "What does the hook traffic show?",
1496 WorkingRepresentationRequest {
1497 namespace_id,
1498 perspective: None,
1499 query: Some("hook routing".to_string()),
1500 max_items: 10,
1501 include_raw: true,
1502 ..WorkingRepresentationRequest::default()
1503 },
1504 &repo,
1505 &relation_repo,
1506 )
1507 .await
1508 .unwrap();
1509
1510 assert!(answer
1511 .lineages
1512 .iter()
1513 .any(|lineage| lineage.memory_id == raw.id));
1514 let prompts = llm.user_messages();
1515 assert!(prompts
1516 .iter()
1517 .any(|prompt| prompt.contains("raw hook payload about hook routing")));
1518 }
1519
1520 #[tokio::test]
1521 async fn test_query_service_mixed_cognition_outputs_attach_multiple_lineages_and_phase_context()
1522 {
1523 let (pool, repo, namespace_id, perspective) = setup_repo().await;
1524 let relation_repo = MemoryRelationRepository::new(&pool);
1525 let digest = store_memory(
1526 &repo,
1527 namespace_id,
1528 "Digest summary of the session.",
1529 CognitiveLevel::SummaryShort,
1530 &perspective,
1531 )
1532 .await;
1533 repo.store_digest(StoreDigestParams {
1534 namespace_id,
1535 session_key: "session-1",
1536 digest_kind: "short",
1537 memory_id: digest.id,
1538 start_memory_id: Some(digest.id),
1539 end_memory_id: Some(digest.id),
1540 token_count: 42,
1541 })
1542 .await
1543 .unwrap();
1544 let derived = store_memory(
1545 &repo,
1546 namespace_id,
1547 "Derived insight about the refactor.",
1548 CognitiveLevel::Derived,
1549 &perspective,
1550 )
1551 .await;
1552 let contradiction = store_memory(
1553 &repo,
1554 namespace_id,
1555 "Contradiction between old and new recall paths.",
1556 CognitiveLevel::Contradiction,
1557 &perspective,
1558 )
1559 .await;
1560 let llm = Arc::new(MockLlmClient::new(vec![Ok(answer_response(
1561 "The session had a digest, an insight, and a contradiction.",
1562 0.86,
1563 &[digest.id, derived.id, contradiction.id],
1564 ))]));
1565 let service = QueryService::new(llm.clone(), AgentConfig::default());
1566
1567 let answer = service
1568 .query_with_representation(
1569 "Explain the session timeline and contradictions in context.",
1570 WorkingRepresentationRequest {
1571 namespace_id,
1572 perspective: Some(perspective.clone()),
1573 query: Some("timeline contradiction insight".to_string()),
1574 max_items: 10,
1575 include_raw: false,
1576 include_recent: false,
1577 include_semantic: false,
1578 include_derived: true,
1579 include_digests: true,
1580 include_contradictions: true,
1581 ..WorkingRepresentationRequest::default()
1582 },
1583 &repo,
1584 &relation_repo,
1585 )
1586 .await
1587 .unwrap();
1588
1589 assert!(answer
1590 .lineages
1591 .iter()
1592 .any(|lineage| lineage.memory_id == digest.id));
1593 assert!(answer
1594 .lineages
1595 .iter()
1596 .any(|lineage| lineage.memory_id == derived.id));
1597 assert!(answer
1598 .lineages
1599 .iter()
1600 .any(|lineage| lineage.memory_id == contradiction.id));
1601 let prompts = llm.user_messages();
1602 assert!(prompts
1603 .iter()
1604 .any(|prompt| prompt.contains("Digest summary of the session.")));
1605 assert!(prompts
1606 .iter()
1607 .any(|prompt| prompt.contains("Derived insight about the refactor.")));
1608 assert!(prompts
1609 .iter()
1610 .any(|prompt| prompt.contains("Contradiction between old and new recall paths.")));
1611 assert!(!prompts.iter().any(|prompt| prompt.contains("Summary:")));
1612 }
1613
1614 #[tokio::test]
1615 async fn test_query_service_representation_beats_old_like_recall_for_session_digest_context() {
1616 let (pool, repo, namespace_id, perspective) = setup_repo().await;
1617 let relation_repo = MemoryRelationRepository::new(&pool);
1618
1619 store_memory(
1620 &repo,
1621 namespace_id,
1622 "Configured Gemini as the active provider and preserved installer env settings.",
1623 CognitiveLevel::Explicit,
1624 &perspective,
1625 )
1626 .await;
1627 let digest = store_memory(
1628 &repo,
1629 namespace_id,
1630 "Digest summary: migration timeline of the provider switch, installer preservation, and bounded dreaming rollout.",
1631 CognitiveLevel::SummaryShort,
1632 &perspective,
1633 )
1634 .await;
1635 repo.store_digest(StoreDigestParams {
1636 namespace_id,
1637 session_key: perspective.session_key.as_deref().unwrap_or("session-1"),
1638 digest_kind: "short",
1639 memory_id: digest.id,
1640 start_memory_id: Some(digest.id),
1641 end_memory_id: Some(digest.id),
1642 token_count: 64,
1643 })
1644 .await
1645 .unwrap();
1646 let contradiction = store_memory(
1647 &repo,
1648 namespace_id,
1649 "Contradiction note: old recall missed the migration timeline while representation-first recall surfaced it.",
1650 CognitiveLevel::Contradiction,
1651 &perspective,
1652 )
1653 .await;
1654
1655 let question =
1656 "What does the migration timeline summary say about the provider switch rollout?";
1657 let old_like_hits = repo
1658 .search_by_text(namespace_id, question, 10, false)
1659 .await
1660 .unwrap();
1661 assert!(
1662 old_like_hits.is_empty(),
1663 "legacy LIKE recall should miss the natural-language question when no memory contains the full string"
1664 );
1665
1666 let llm = Arc::new(MockLlmClient::new(vec![Ok(answer_response(
1667 "The migration timeline shows a provider switch digest with rollout context and a contradiction note about the old recall path missing it.",
1668 0.9,
1669 &[digest.id, contradiction.id],
1670 ))]));
1671 let service = QueryService::new(llm.clone(), AgentConfig::default());
1672
1673 let answer = service
1674 .query_with_representation(
1675 question,
1676 WorkingRepresentationRequest {
1677 namespace_id,
1678 perspective: Some(perspective.clone()),
1679 query: Some(question.to_string()),
1680 max_items: 12,
1681 include_raw: false,
1682 include_recent: true,
1683 include_semantic: true,
1684 include_derived: true,
1685 include_digests: true,
1686 include_contradictions: true,
1687 ..WorkingRepresentationRequest::default()
1688 },
1689 &repo,
1690 &relation_repo,
1691 )
1692 .await
1693 .unwrap();
1694
1695 assert!(answer
1696 .lineages
1697 .iter()
1698 .any(|lineage| lineage.memory_id == digest.id));
1699 assert!(answer
1700 .lineages
1701 .iter()
1702 .any(|lineage| lineage.memory_id == contradiction.id));
1703
1704 let prompts = llm.user_messages();
1705 assert!(prompts.iter().any(|prompt| prompt.contains(
1706 "Digest summary: migration timeline of the provider switch, installer preservation, and bounded dreaming rollout."
1707 )));
1708 assert!(prompts.iter().any(|prompt| prompt.contains(
1709 "Contradiction note: old recall missed the migration timeline while representation-first recall surfaced it."
1710 )));
1711 }
1712
1713 #[tokio::test]
1714 async fn test_query_service_refinement_triggers_second_llm_call() {
1715 let (pool, repo, namespace_id, perspective) = setup_repo().await;
1716 let relation_repo = MemoryRelationRepository::new(&pool);
1717 for idx in 0..6 {
1718 let content = format!("Execution detail {idx} for the migration timeline.");
1719 store_memory(
1720 &repo,
1721 namespace_id,
1722 &content,
1723 CognitiveLevel::Explicit,
1724 &perspective,
1725 )
1726 .await;
1727 }
1728 let llm = Arc::new(MockLlmClient::new(vec![
1729 Ok(answer_response("Maybe.", 0.55, &[])),
1730 Ok(answer_response(
1731 "The migration timeline shows several execution details with stronger support.",
1732 0.9,
1733 &[1],
1734 )),
1735 ]));
1736 let service = QueryService::new(llm.clone(), AgentConfig::default());
1737
1738 let answer = service
1739 .query(
1740 "Explain the tradeoff timeline and relationship across the migration work.",
1741 namespace_id,
1742 &repo,
1743 &relation_repo,
1744 )
1745 .await
1746 .unwrap();
1747
1748 assert_eq!(
1749 answer.answer,
1750 "The migration timeline shows several execution details with stronger support."
1751 );
1752 assert_eq!(llm.call_count(), 2);
1753 }
1754
1755 #[tokio::test]
1756 async fn test_query_service_auto_includes_cross_namespace_alias_digest_without_perspective() {
1757 let pool = SqlitePoolOptions::new()
1758 .max_connections(1)
1759 .connect("sqlite::memory:")
1760 .await
1761 .unwrap();
1762 nexus_storage::migrations::run_migrations(&pool)
1763 .await
1764 .unwrap();
1765
1766 let namespace_repo = NamespaceRepository::new(pool.clone());
1767 let primary = namespace_repo
1768 .get_or_create("claude-code", "claude-code")
1769 .await
1770 .unwrap();
1771 let alias = namespace_repo
1772 .get_or_create("claude", "claude")
1773 .await
1774 .unwrap();
1775 let unrelated = namespace_repo
1776 .get_or_create("codex", "codex")
1777 .await
1778 .unwrap();
1779
1780 let repo = MemoryRepository::new(pool.clone());
1781 let relation_repo = MemoryRelationRepository::new(&pool);
1782 let perspective =
1783 PerspectiveKey::new("claude-code", "claude-code", Some("session-1".to_string()));
1784
1785 let alias_digest = store_memory(
1786 &repo,
1787 alias.id,
1788 "Alias digest summary: the claude namespace captured the provider rollout timeline.",
1789 CognitiveLevel::SummaryShort,
1790 &perspective,
1791 )
1792 .await;
1793 repo.store_digest(StoreDigestParams {
1794 namespace_id: alias.id,
1795 session_key: "session-1",
1796 digest_kind: "short",
1797 memory_id: alias_digest.id,
1798 start_memory_id: Some(alias_digest.id),
1799 end_memory_id: Some(alias_digest.id),
1800 token_count: 48,
1801 })
1802 .await
1803 .unwrap();
1804
1805 let unrelated_digest = store_memory(
1806 &repo,
1807 unrelated.id,
1808 "Unrelated codex digest summary: refactor unrelated CLI parsing bug.",
1809 CognitiveLevel::SummaryShort,
1810 &perspective,
1811 )
1812 .await;
1813 repo.store_digest(StoreDigestParams {
1814 namespace_id: unrelated.id,
1815 session_key: "session-1",
1816 digest_kind: "short",
1817 memory_id: unrelated_digest.id,
1818 start_memory_id: Some(unrelated_digest.id),
1819 end_memory_id: Some(unrelated_digest.id),
1820 token_count: 41,
1821 })
1822 .await
1823 .unwrap();
1824
1825 let llm = Arc::new(MockLlmClient::new(vec![Ok(answer_response(
1826 "The provider rollout timeline is preserved in the alias digest.",
1827 0.93,
1828 &[alias_digest.id],
1829 ))]));
1830 let service = QueryService::new(llm.clone(), AgentConfig::default());
1831
1832 let answer = service
1833 .query(
1834 "What does the provider rollout timeline say?",
1835 primary.id,
1836 &repo,
1837 &relation_repo,
1838 )
1839 .await
1840 .unwrap();
1841
1842 assert!(answer
1843 .lineages
1844 .iter()
1845 .any(|lineage| lineage.memory_id == alias_digest.id));
1846 assert!(!answer
1847 .lineages
1848 .iter()
1849 .any(|lineage| lineage.memory_id == unrelated_digest.id));
1850
1851 let prompts = llm.user_messages();
1852 assert!(prompts.iter().any(|prompt| prompt.contains(
1853 "Alias digest summary: the claude namespace captured the provider rollout timeline."
1854 )));
1855 assert!(!prompts.iter().any(|prompt| prompt
1856 .contains("Unrelated codex digest summary: refactor unrelated CLI parsing bug.")));
1857 }
1858
1859 #[tokio::test]
1860 async fn test_query_service_simple_answer_stays_single_call() {
1861 let (pool, repo, namespace_id, perspective) = setup_repo().await;
1862 let relation_repo = MemoryRelationRepository::new(&pool);
1863 let explicit = store_memory(
1864 &repo,
1865 namespace_id,
1866 "Explicit note about the active provider switch.",
1867 CognitiveLevel::Explicit,
1868 &perspective,
1869 )
1870 .await;
1871 let llm = Arc::new(MockLlmClient::new(vec![Ok(answer_response(
1872 "The active provider is Gemini and the change is already applied with explicit support.",
1873 0.94,
1874 &[explicit.id],
1875 ))]));
1876 let service = QueryService::new(llm.clone(), AgentConfig::default());
1877
1878 let answer = service
1879 .query(
1880 "What is the active provider?",
1881 namespace_id,
1882 &repo,
1883 &relation_repo,
1884 )
1885 .await
1886 .unwrap();
1887
1888 assert_eq!(
1889 answer.answer,
1890 "The active provider is Gemini and the change is already applied with explicit support."
1891 );
1892 assert_eq!(llm.call_count(), 1);
1893 }
1894
1895 #[tokio::test]
1896 async fn test_query_service_lightweight_context_below_phase_threshold() {
1897 let (pool, repo, namespace_id, perspective) = setup_repo().await;
1898 let relation_repo = MemoryRelationRepository::new(&pool);
1899 store_memory(
1900 &repo,
1901 namespace_id,
1902 "Short note about the provider switch.",
1903 CognitiveLevel::Explicit,
1904 &perspective,
1905 )
1906 .await;
1907 store_memory(
1908 &repo,
1909 namespace_id,
1910 "Follow-up note about the provider switch.",
1911 CognitiveLevel::Explicit,
1912 &perspective,
1913 )
1914 .await;
1915 let llm = Arc::new(MockLlmClient::new(vec![Ok(answer_response(
1916 "The provider changed.",
1917 0.93,
1918 &[1],
1919 ))]));
1920 let service = QueryService::new(llm.clone(), AgentConfig::default());
1921
1922 let answer = service
1923 .query(
1924 "What changed with the provider?",
1925 namespace_id,
1926 &repo,
1927 &relation_repo,
1928 )
1929 .await
1930 .unwrap();
1931
1932 assert!(!answer.lineages.is_empty());
1933 let prompts = llm.user_messages();
1934 assert!(prompts.iter().any(|prompt| prompt.contains("Summary:")));
1935 }
1936
1937 #[test]
1942 fn test_build_inclusion_reasons_produces_non_empty_reasons() {
1943 let bucketed = vec![
1944 BucketedMemory {
1945 memory: test_memory(1, "Fix the authentication bug"),
1946 bucket: MemoryBucket::Semantic,
1947 blended_score: 0.91,
1948 },
1949 BucketedMemory {
1950 memory: test_memory(2, "Session digest for sprint review"),
1951 bucket: MemoryBucket::Digests,
1952 blended_score: 0.85,
1953 },
1954 BucketedMemory {
1955 memory: test_memory(3, "Derived insight: auth patterns converge"),
1956 bucket: MemoryBucket::Derived,
1957 blended_score: 0.88,
1958 },
1959 ];
1960
1961 let default_request = WorkingRepresentationRequest::default();
1962 let reasons = build_inclusion_reasons(&bucketed, &default_request);
1963 assert_eq!(reasons.len(), 3);
1964
1965 assert_eq!(reasons[0].memory_id, 1);
1966 assert_eq!(reasons[0].bucket, MemoryBucket::Semantic);
1967 assert!(!reasons[0].reason.is_empty());
1968 assert!(reasons[0].reason.contains("Semantic match"));
1969
1970 assert_eq!(reasons[1].memory_id, 2);
1971 assert_eq!(reasons[1].bucket, MemoryBucket::Digests);
1972 assert!(reasons[1].reason.contains("digest"));
1973
1974 assert_eq!(reasons[2].memory_id, 3);
1975 assert_eq!(reasons[2].bucket, MemoryBucket::Derived);
1976 assert!(reasons[2].reason.contains("derived"));
1977 }
1978
1979 #[test]
1980 fn test_build_excluded_candidates_classifies_exclusion_reasons() {
1981 let explicit_memory = Memory {
1982 id: 10,
1983 namespace_id: 1,
1984 content: "Low-scoring memory that got cut".to_string(),
1985 category: nexus_core::MemoryCategory::Facts,
1986 labels: Vec::new(),
1987 metadata: serde_json::json!({
1988 "cognitive": {
1989 "level": "explicit",
1990 "confidence": 0.85,
1991 "observer": "",
1992 "subject": "",
1993 "generated_by": ""
1994 }
1995 }),
1996 ..Memory::default()
1997 };
1998 let duplicate_memory = test_memory(11, "Another excluded memory with more text content");
1999
2000 let excluded = vec![
2001 RankedExcludedMemory {
2002 memory: explicit_memory,
2003 bucket: MemoryBucket::Recent,
2004 blended_score: 0.30,
2005 reason: ExclusionReason::BudgetTruncation,
2006 },
2007 RankedExcludedMemory {
2008 memory: duplicate_memory,
2009 bucket: MemoryBucket::Semantic,
2010 blended_score: 0.25,
2011 reason: ExclusionReason::Deduplicated,
2012 },
2013 ];
2014
2015 let candidates = build_excluded_candidates(&excluded);
2016 assert_eq!(candidates.len(), 2);
2017
2018 assert_eq!(candidates[0].memory_id, 10);
2019 assert_eq!(candidates[0].reason, ExclusionReason::BudgetTruncation);
2020 assert!(!candidates[0].content_preview.is_empty());
2021
2022 assert_eq!(candidates[1].memory_id, 11);
2023 assert_eq!(candidates[1].reason, ExclusionReason::Deduplicated);
2024 }
2025
2026 #[test]
2027 fn test_build_bucket_stats_correct_counts() {
2028 let included = vec![
2029 BucketedMemory {
2030 memory: test_memory(1, "a"),
2031 bucket: MemoryBucket::Semantic,
2032 blended_score: 0.9,
2033 },
2034 BucketedMemory {
2035 memory: test_memory(2, "b"),
2036 bucket: MemoryBucket::Semantic,
2037 blended_score: 0.8,
2038 },
2039 BucketedMemory {
2040 memory: test_memory(3, "c"),
2041 bucket: MemoryBucket::Derived,
2042 blended_score: 0.7,
2043 },
2044 ];
2045 let excluded = vec![
2046 RankedExcludedMemory {
2047 memory: test_memory(4, "d"),
2048 bucket: MemoryBucket::Semantic,
2049 blended_score: 0.5,
2050 reason: ExclusionReason::BudgetTruncation,
2051 },
2052 RankedExcludedMemory {
2053 memory: test_memory(5, "e"),
2054 bucket: MemoryBucket::Recent,
2055 blended_score: 0.4,
2056 reason: ExclusionReason::BudgetTruncation,
2057 },
2058 ];
2059
2060 let stats = build_bucket_stats(&included, &excluded);
2061
2062 let semantic = stats
2063 .iter()
2064 .find(|s| s.bucket == MemoryBucket::Semantic)
2065 .unwrap();
2066 assert_eq!(semantic.fetched, 3);
2067 assert_eq!(semantic.included, 2);
2068 assert_eq!(semantic.excluded, 1);
2069
2070 let derived = stats
2071 .iter()
2072 .find(|s| s.bucket == MemoryBucket::Derived)
2073 .unwrap();
2074 assert_eq!(derived.fetched, 1);
2075 assert_eq!(derived.included, 1);
2076 assert_eq!(derived.excluded, 0);
2077
2078 let recent = stats
2079 .iter()
2080 .find(|s| s.bucket == MemoryBucket::Recent)
2081 .unwrap();
2082 assert_eq!(recent.fetched, 1);
2083 assert_eq!(recent.included, 0);
2084 assert_eq!(recent.excluded, 1);
2085 }
2086
2087 #[test]
2088 fn test_query_introspection_serialization_contract() {
2089 let introspection = QueryIntrospection {
2090 included: vec![InclusionReason {
2091 memory_id: 1,
2092 bucket: MemoryBucket::Semantic,
2093 phase: "execution".to_string(),
2094 relevance_score: Some(0.87),
2095 blended_score: 0.91,
2096 reason: "Semantic match (score: 0.87) in semantic bucket".to_string(),
2097 signals: vec![InclusionSignal {
2098 signal_type: "semantic_similarity".to_string(),
2099 description: "Embedding similarity score: 0.870".to_string(),
2100 weight_contribution: 0.87,
2101 }],
2102 }],
2103 excluded_candidates: vec![ExcludedCandidate {
2104 memory_id: 2,
2105 bucket: MemoryBucket::Recent,
2106 blended_score: 0.30,
2107 reason: ExclusionReason::BudgetTruncation,
2108 content_preview: "Some content...".to_string(),
2109 }],
2110 relevant_reflections: vec![RelevantReflection {
2111 memory_id: 3,
2112 reflection_type: "derived".to_string(),
2113 content_preview: "Auth patterns converge...".to_string(),
2114 confidence: Some(0.92),
2115 created_at: "2026-03-27T12:00:00Z".to_string(),
2116 }],
2117 bucket_stats: vec![BucketIntrospectionStats {
2118 bucket: MemoryBucket::Semantic,
2119 fetched: 2,
2120 included: 1,
2121 excluded: 1,
2122 }],
2123 pipeline_latency_ms: Some(12),
2124 representation_config: Some(RepresentationConfigSnapshot {
2125 max_items: 20,
2126 include_raw: false,
2127 include_digests: true,
2128 include_semantic: true,
2129 include_derived: true,
2130 include_contradictions: true,
2131 }),
2132 };
2133
2134 let json = serde_json::to_string(&introspection).expect("serialize introspection");
2135 let parsed: QueryIntrospection =
2136 serde_json::from_str(&json).expect("deserialize introspection");
2137
2138 assert_eq!(parsed.included.len(), 1);
2139 assert_eq!(parsed.excluded_candidates.len(), 1);
2140 assert_eq!(parsed.relevant_reflections.len(), 1);
2141 assert_eq!(parsed.bucket_stats.len(), 1);
2142 assert_eq!(
2143 parsed.excluded_candidates[0].reason,
2144 ExclusionReason::BudgetTruncation
2145 );
2146 assert_eq!(parsed.bucket_stats[0].fetched, 2);
2147 assert_eq!(parsed.pipeline_latency_ms, Some(12));
2148 assert!(parsed.representation_config.is_some());
2149 let config = parsed.representation_config.unwrap();
2150 assert_eq!(config.max_items, 20);
2151 assert!(!config.include_raw);
2152 assert!(config.include_digests);
2153 assert_eq!(parsed.included[0].signals.len(), 1);
2155 assert_eq!(
2156 parsed.included[0].signals[0].signal_type,
2157 "semantic_similarity"
2158 );
2159 }
2160
2161 #[test]
2162 fn test_truncate_str_behavior() {
2163 assert_eq!(truncate_str("short", 80), "short");
2164 assert_eq!(truncate_str("hello world", 5), "hello...");
2165 assert_eq!(truncate_str("a longer string here", 10), "a longer s...");
2166 }
2167
2168 #[tokio::test]
2169 async fn test_query_introspection_with_stored_memories() {
2170 let (_pool, repo, namespace_id, perspective) = setup_repo().await;
2171
2172 let explicit = store_memory(
2174 &repo,
2175 namespace_id,
2176 "The authentication module uses JWT tokens for session management",
2177 CognitiveLevel::Explicit,
2178 &perspective,
2179 )
2180 .await;
2181 let _derived = store_memory(
2182 &repo,
2183 namespace_id,
2184 "Derived: JWT token patterns show convergence across microservices",
2185 CognitiveLevel::Derived,
2186 &perspective,
2187 )
2188 .await;
2189
2190 let llm = Arc::new(MockLlmClient::new(vec![])); let service = QueryService::new(llm, AgentConfig::default());
2192
2193 let introspection = service
2194 .query_introspection("authentication tokens", namespace_id, &repo)
2195 .await
2196 .unwrap();
2197
2198 assert!(introspection
2200 .included
2201 .iter()
2202 .any(|i| i.memory_id == explicit.id));
2203 for reason in &introspection.included {
2205 assert!(!reason.reason.is_empty());
2206 assert!(!reason.signals.is_empty());
2207 }
2208 assert!(introspection.pipeline_latency_ms.is_some());
2210 assert!(introspection.representation_config.is_some());
2212 assert!(!introspection.bucket_stats.is_empty());
2214 }
2215
2216 #[tokio::test]
2217 async fn test_query_introspection_surfaces_excluded_candidates_with_small_budget() {
2218 let (_pool, repo, namespace_id, perspective) = setup_repo().await;
2219
2220 store_memory(
2221 &repo,
2222 namespace_id,
2223 "Session cookies replaced JWT for auth",
2224 CognitiveLevel::Explicit,
2225 &perspective,
2226 )
2227 .await;
2228 store_memory(
2229 &repo,
2230 namespace_id,
2231 "Derived auth insight from repeated login failures",
2232 CognitiveLevel::Derived,
2233 &perspective,
2234 )
2235 .await;
2236 store_memory(
2237 &repo,
2238 namespace_id,
2239 "Recent authentication follow-up note",
2240 CognitiveLevel::Explicit,
2241 &perspective,
2242 )
2243 .await;
2244
2245 let service =
2246 QueryService::new(Arc::new(MockLlmClient::new(vec![])), AgentConfig::default());
2247 let request = WorkingRepresentationRequest {
2248 namespace_id,
2249 perspective: Some(perspective.clone()),
2250 query: Some("authentication".to_string()),
2251 max_items: 1,
2252 include_raw: false,
2253 include_recent: true,
2254 include_semantic: true,
2255 include_derived: true,
2256 include_digests: true,
2257 include_contradictions: true,
2258 ..WorkingRepresentationRequest::default()
2259 };
2260
2261 let introspection = service
2262 .introspection_with_representation(&request, "authentication", &repo)
2263 .await
2264 .unwrap();
2265
2266 assert!(!introspection.excluded_candidates.is_empty());
2267 }
2268
2269 #[tokio::test]
2270 async fn test_fetch_relevant_reflections_uses_tokenized_matching() {
2271 let (_pool, repo, namespace_id, perspective) = setup_repo().await;
2272
2273 store_memory(
2274 &repo,
2275 namespace_id,
2276 "Derived insight: session cookies replaced JWT after CSRF review",
2277 CognitiveLevel::Derived,
2278 &perspective,
2279 )
2280 .await;
2281 store_memory(
2282 &repo,
2283 namespace_id,
2284 "Contradiction: older notes still mention JWT login flow",
2285 CognitiveLevel::Contradiction,
2286 &perspective,
2287 )
2288 .await;
2289
2290 let request = WorkingRepresentationRequest {
2291 namespace_id,
2292 perspective: Some(perspective.clone()),
2293 query: Some("why did auth move away from jwt tokens".to_string()),
2294 ..WorkingRepresentationRequest::default()
2295 };
2296
2297 let reflections =
2298 fetch_relevant_reflections("why did auth move away from jwt tokens", &request, &repo)
2299 .await
2300 .unwrap();
2301
2302 assert!(!reflections.is_empty());
2303 assert!(reflections
2304 .iter()
2305 .any(|reflection| reflection.content_preview.to_lowercase().contains("jwt")));
2306 }
2307
2308 #[tokio::test]
2309 async fn test_fetch_relevant_reflections_respects_request_scope() {
2310 let (_pool, repo, namespace_id, perspective) = setup_repo().await;
2311 let other_perspective = PerspectiveKey {
2312 observer: "codex".to_string(),
2313 subject: "codex".to_string(),
2314 session_key: Some("other-session".to_string()),
2315 };
2316
2317 store_memory(
2318 &repo,
2319 namespace_id,
2320 "Derived auth insight from the active scoped session",
2321 CognitiveLevel::Derived,
2322 &perspective,
2323 )
2324 .await;
2325 store_memory(
2326 &repo,
2327 namespace_id,
2328 "Derived auth insight from an unrelated session",
2329 CognitiveLevel::Derived,
2330 &other_perspective,
2331 )
2332 .await;
2333
2334 let request = WorkingRepresentationRequest {
2335 namespace_id,
2336 perspective: Some(perspective.clone()),
2337 query: Some("auth insight".to_string()),
2338 ..WorkingRepresentationRequest::default()
2339 };
2340
2341 let reflections = fetch_relevant_reflections("auth insight", &request, &repo)
2342 .await
2343 .unwrap();
2344
2345 assert_eq!(reflections.len(), 1);
2346 assert!(reflections[0]
2347 .content_preview
2348 .contains("active scoped session"));
2349 }
2350
2351 #[tokio::test]
2352 async fn test_standalone_introspect_query_matches_service_introspection_contract() {
2353 let (_pool, repo, namespace_id, perspective) = setup_repo().await;
2354
2355 for content in [
2356 "Session cookies replaced JWT for auth",
2357 "Derived auth insight from repeated login failures",
2358 "Recent authentication follow-up note",
2359 ] {
2360 store_memory(
2361 &repo,
2362 namespace_id,
2363 content,
2364 if content.starts_with("Derived") {
2365 CognitiveLevel::Derived
2366 } else {
2367 CognitiveLevel::Explicit
2368 },
2369 &perspective,
2370 )
2371 .await;
2372 }
2373
2374 let request = WorkingRepresentationRequest {
2375 namespace_id,
2376 perspective: Some(perspective.clone()),
2377 query: Some("authentication".to_string()),
2378 max_items: 1,
2379 include_raw: false,
2380 include_recent: true,
2381 include_semantic: true,
2382 include_derived: true,
2383 include_digests: true,
2384 include_contradictions: true,
2385 ..WorkingRepresentationRequest::default()
2386 };
2387
2388 let service =
2389 QueryService::new(Arc::new(MockLlmClient::new(vec![])), AgentConfig::default());
2390 let from_service = service
2391 .introspection_with_representation(&request, "authentication", &repo)
2392 .await
2393 .unwrap();
2394 let standalone = introspect_query(&request, "authentication", &repo)
2395 .await
2396 .unwrap();
2397
2398 assert_eq!(
2399 standalone
2400 .excluded_candidates
2401 .iter()
2402 .map(|candidate| {
2403 (
2404 candidate.memory_id,
2405 candidate.bucket,
2406 candidate.reason.clone(),
2407 )
2408 })
2409 .collect::<Vec<_>>(),
2410 from_service
2411 .excluded_candidates
2412 .iter()
2413 .map(|candidate| {
2414 (
2415 candidate.memory_id,
2416 candidate.bucket,
2417 candidate.reason.clone(),
2418 )
2419 })
2420 .collect::<Vec<_>>(),
2421 "standalone introspection should expose the same excluded candidates as service introspection"
2422 );
2423 assert_eq!(
2424 standalone
2425 .bucket_stats
2426 .iter()
2427 .map(|stats| (stats.bucket, stats.fetched, stats.included, stats.excluded))
2428 .collect::<Vec<_>>(),
2429 from_service
2430 .bucket_stats
2431 .iter()
2432 .map(|stats| (stats.bucket, stats.fetched, stats.included, stats.excluded))
2433 .collect::<Vec<_>>()
2434 );
2435 assert_eq!(
2436 standalone
2437 .included
2438 .iter()
2439 .map(|reason| (reason.memory_id, reason.bucket))
2440 .collect::<Vec<_>>(),
2441 from_service
2442 .included
2443 .iter()
2444 .map(|reason| (reason.memory_id, reason.bucket))
2445 .collect::<Vec<_>>()
2446 );
2447 }
2448
2449 #[test]
2450 fn test_query_answer_introspection_field_defaults_to_none() {
2451 let answer = QueryAnswer {
2452 answer: "test".to_string(),
2453 citations: vec![],
2454 confidence: 0.9,
2455 lineages: vec![],
2456 introspection: None,
2457 };
2458
2459 let json = serde_json::to_string(&answer).unwrap();
2460 assert!(!json.contains("introspection"));
2461 }
2462
2463 #[test]
2464 fn test_query_answer_introspection_serializes_when_present() {
2465 let answer = QueryAnswer {
2466 answer: "test".to_string(),
2467 citations: vec![],
2468 confidence: 0.9,
2469 lineages: vec![],
2470 introspection: Some(QueryIntrospection {
2471 included: vec![],
2472 excluded_candidates: vec![],
2473 relevant_reflections: vec![],
2474 bucket_stats: vec![],
2475 pipeline_latency_ms: None,
2476 representation_config: None,
2477 }),
2478 };
2479
2480 let json = serde_json::to_string(&answer).unwrap();
2481 assert!(json.contains("introspection"));
2482 let parsed: QueryAnswer = serde_json::from_str(&json).unwrap();
2483 assert!(parsed.introspection.is_some());
2484 }
2485
2486 #[test]
2491 fn test_inclusion_signals_populated_for_semantic_bucket() {
2492 let semantic_memory = Memory {
2493 id: 42,
2494 namespace_id: 1,
2495 content: "Authentication uses JWT tokens".to_string(),
2496 category: nexus_core::MemoryCategory::Facts,
2497 labels: Vec::new(),
2498 metadata: serde_json::json!({
2499 "cognitive": {
2500 "level": "explicit",
2501 "confidence": 0.90,
2502 "observer": "",
2503 "subject": "",
2504 "generated_by": ""
2505 }
2506 }),
2507 similarity_score: Some(0.85),
2508 ..Memory::default()
2509 };
2510 let bm = BucketedMemory {
2511 memory: semantic_memory,
2512 bucket: MemoryBucket::Semantic,
2513 blended_score: 0.92,
2514 };
2515 let default_request = WorkingRepresentationRequest::default();
2516 let reasons = build_inclusion_reasons(std::slice::from_ref(&bm), &default_request);
2517 assert_eq!(reasons.len(), 1);
2518 assert!(!reasons[0].signals.is_empty());
2519 let signal_types: Vec<&str> = reasons[0]
2521 .signals
2522 .iter()
2523 .map(|s| s.signal_type.as_str())
2524 .collect();
2525 assert!(signal_types.contains(&"recency"));
2526 assert!(signal_types.contains(&"cognitive_level"));
2527 assert!(signal_types.contains(&"semantic_similarity"));
2528 }
2529
2530 #[test]
2531 fn test_inclusion_signals_populated_for_digest_bucket() {
2532 let digest_memory = Memory {
2533 id: 55,
2534 namespace_id: 1,
2535 content: "Session summary: worked on auth module".to_string(),
2536 category: nexus_core::MemoryCategory::Session,
2537 labels: Vec::new(),
2538 metadata: serde_json::json!({
2539 "cognitive": {
2540 "level": "summary_short",
2541 "confidence": 0.80,
2542 "observer": "",
2543 "subject": "",
2544 "generated_by": ""
2545 }
2546 }),
2547 ..Memory::default()
2548 };
2549 let bm = BucketedMemory {
2550 memory: digest_memory,
2551 bucket: MemoryBucket::Digests,
2552 blended_score: 0.88,
2553 };
2554 let default_request = WorkingRepresentationRequest::default();
2555 let reasons = build_inclusion_reasons(&[bm], &default_request);
2556 assert_eq!(reasons.len(), 1);
2557 let signal_types: Vec<&str> = reasons[0]
2558 .signals
2559 .iter()
2560 .map(|s| s.signal_type.as_str())
2561 .collect();
2562 assert!(signal_types.contains(&"bucket_boost"));
2563 }
2564
2565 #[test]
2566 fn test_exclusion_reason_variants_serialize_correctly() {
2567 for (variant, expected) in [
2568 (ExclusionReason::BudgetTruncation, "budget_truncation"),
2569 (
2570 ExclusionReason::ConfidenceBelowThreshold,
2571 "confidence_below_threshold",
2572 ),
2573 (ExclusionReason::Deduplicated, "deduplicated"),
2574 ] {
2575 let json = serde_json::to_string(&variant).unwrap();
2576 assert_eq!(json, format!("\"{expected}\""));
2577 let parsed: ExclusionReason = serde_json::from_str(&json).unwrap();
2578 assert_eq!(parsed, variant);
2579 }
2580 }
2581
2582 #[test]
2583 fn test_representation_config_snapshot_serialization() {
2584 let config = RepresentationConfigSnapshot {
2585 max_items: 30,
2586 include_raw: true,
2587 include_digests: true,
2588 include_semantic: true,
2589 include_derived: false,
2590 include_contradictions: false,
2591 };
2592 let json = serde_json::to_string(&config).unwrap();
2593 assert!(json.contains("\"max_items\":30"));
2594 let parsed: RepresentationConfigSnapshot = serde_json::from_str(&json).unwrap();
2595 assert_eq!(parsed.max_items, 30);
2596 assert!(parsed.include_raw);
2597 assert!(!parsed.include_derived);
2598 }
2599
2600 #[test]
2601 fn test_introspection_optional_fields_default_safely() {
2602 let intro = QueryIntrospection {
2603 included: vec![],
2604 excluded_candidates: vec![],
2605 relevant_reflections: vec![],
2606 bucket_stats: vec![],
2607 pipeline_latency_ms: None,
2608 representation_config: None,
2609 };
2610 let json = serde_json::to_string(&intro).unwrap();
2611 assert!(!json.contains("pipeline_latency_ms"));
2613 assert!(!json.contains("representation_config"));
2614 let parsed: QueryIntrospection = serde_json::from_str(&json).unwrap();
2616 assert!(parsed.pipeline_latency_ms.is_none());
2617 assert!(parsed.representation_config.is_none());
2618 }
2619
2620 #[test]
2621 fn test_exclusion_reason_display_impl() {
2622 assert_eq!(
2623 format!("{}", ExclusionReason::BudgetTruncation),
2624 "budget_truncation"
2625 );
2626 assert_eq!(
2627 format!("{}", ExclusionReason::ConfidenceBelowThreshold),
2628 "confidence_below_threshold"
2629 );
2630 assert_eq!(format!("{}", ExclusionReason::Deduplicated), "deduplicated");
2631 }
2632}