1use std::collections::HashMap;
11use std::time::Instant;
12
13use nexus_core::config::AgentConfig;
14use nexus_core::traits::EmbeddingService;
15use nexus_core::{Memory, WorkingRepresentationRequest};
16use nexus_lephase::{CompressionMode, LePhaseIntegration};
17use nexus_llm::{ChatMessage, GenerateParams, LlmClient, TokenUsage};
18use nexus_storage::repository::{MemoryRelationRepository, MemoryRepository, NamespaceRepository};
19use tracing::{debug, info, warn};
20
21use crate::error::AgentError;
22use crate::identity::IdentityResolver;
23use crate::prompts::{
24 query_refinement_user_prompt, query_user_prompt_with_lineage, QUERY_SYSTEM_PROMPT,
25};
26use crate::ranking::{
27 flatten_ranked_representation_with_excluded, BucketedMemory, RankedExcludedMemory, RankedResult,
28};
29use crate::representation::RepresentationService;
30use crate::types::{
31 BucketIntrospectionStats, ExcludedCandidate, InclusionReason, InclusionSignal, MemoryBucket,
32 MemoryLineage, QueryAnswer, QueryIntrospection, RelevantReflection,
33 RepresentationConfigSnapshot,
34};
35use crate::util::{
36 extract_agent_summary, flush_metric_samples, parse_json_response, stage_metric_sample,
37 token_usage_metric_samples, CognitionSnapshot,
38};
39
40pub struct QueryService {
41 llm: std::sync::Arc<dyn LlmClient>,
42 config: AgentConfig,
43 representation: RepresentationService,
44}
45
46const PHASE_GROUPING_THRESHOLD: usize = 3;
48
49impl QueryService {
50 pub fn new(llm: std::sync::Arc<dyn LlmClient>, config: AgentConfig) -> Self {
51 Self {
52 llm,
53 config,
54 representation: RepresentationService::new(),
55 }
56 }
57
58 pub fn with_embedder(
59 llm: std::sync::Arc<dyn LlmClient>,
60 config: AgentConfig,
61 embedder: std::sync::Arc<dyn EmbeddingService>,
62 ) -> Self {
63 Self {
64 llm,
65 config,
66 representation: RepresentationService::with_embedder(embedder),
67 }
68 }
69
70 pub async fn query(
71 &self,
72 question: &str,
73 namespace_id: i64,
74 memory_repo: &MemoryRepository,
75 relation_repo: &MemoryRelationRepository<'_>,
76 ) -> Result<QueryAnswer, AgentError> {
77 let request = WorkingRepresentationRequest {
79 namespace_id,
80 perspective: None,
81 query: Some(question.to_string()),
82 max_items: self.config.query_context_limit,
83 include_raw: false,
84 include_digests: true,
85 include_recent: true,
86 include_semantic: true,
87 include_derived: true,
88 include_contradictions: true,
89 ..WorkingRepresentationRequest::default()
90 };
91 let request = match self.with_cross_namespace_ids(request, memory_repo).await {
93 Ok(req) => req,
94 Err(e) => {
95 warn!(error = %e, "Failed to prepare representation request, falling back to legacy search");
96 return self
97 .query_legacy(question, namespace_id, memory_repo, relation_repo)
98 .await;
99 }
100 };
101
102 match self
104 .query_with_representation(question, request, memory_repo, relation_repo)
105 .await
106 {
107 Ok(answer) => Ok(answer),
108 Err(e) => {
109 warn!(error = %e, "Representation pipeline failed, falling back to legacy search");
110 self.query_legacy(question, namespace_id, memory_repo, relation_repo)
111 .await
112 }
113 }
114 }
115
116 pub async fn query_with_representation(
117 &self,
118 question: &str,
119 request: WorkingRepresentationRequest,
120 memory_repo: &MemoryRepository,
121 _relation_repo: &MemoryRelationRepository<'_>,
122 ) -> Result<QueryAnswer, AgentError> {
123 info!(question = %question, "Processing query");
124 let total_started = Instant::now();
125 let mut metrics = Vec::new();
126
127 let representation_started = Instant::now();
128 let representation = self
129 .representation
130 .build(&request, memory_repo)
131 .await
132 .map_err(|e| {
133 warn!(error = %e, "Failed to build working representation");
134 AgentError::Storage(e.to_string())
135 })?;
136 metrics.push(stage_metric_sample(
137 request.namespace_id,
138 "cognition.query.representation_ms",
139 representation_started.elapsed().as_secs_f64() * 1000.0,
140 "representation",
141 ));
142
143 let flatten_started = Instant::now();
144 let ranked = flatten_ranked_representation_with_excluded(representation, &request);
145 let bucketed = &ranked.included;
146 metrics.push(stage_metric_sample(
147 request.namespace_id,
148 "cognition.query.flatten_ms",
149 flatten_started.elapsed().as_secs_f64() * 1000.0,
150 "flatten",
151 ));
152 debug!(count = bucketed.len(), "Found relevant memories");
153
154 if bucketed.is_empty() {
155 let answer_started = Instant::now();
156 let (answer, usage) = self.generate_answer(question, "").await?;
157 metrics.push(stage_metric_sample(
158 request.namespace_id,
159 "cognition.query.answer_ms",
160 answer_started.elapsed().as_secs_f64() * 1000.0,
161 "answer",
162 ));
163 metrics.extend(token_usage_metric_samples(
164 request.namespace_id,
165 "cognition.query.answer",
166 "answer",
167 usage.as_ref(),
168 ));
169 metrics.push(stage_metric_sample(
170 request.namespace_id,
171 "cognition.query.total_ms",
172 total_started.elapsed().as_secs_f64() * 1000.0,
173 "total",
174 ));
175 flush_metric_samples(memory_repo, &metrics).await;
176 return Ok(answer);
177 }
178
179 let lineages = build_lineages(bucketed);
181
182 let context_started = Instant::now();
184 let context = if bucketed.len() >= PHASE_GROUPING_THRESHOLD {
185 self.build_phase_aware_context(bucketed, &lineages)?
186 } else {
187 self.build_lightweight_context(bucketed, &lineages)?
188 };
189 metrics.push(stage_metric_sample(
190 request.namespace_id,
191 "cognition.query.context_ms",
192 context_started.elapsed().as_secs_f64() * 1000.0,
193 "context",
194 ));
195
196 debug!(context_len = context.len(), "Built query context");
197
198 let answer_started = Instant::now();
199 let (answer, usage) = self.generate_answer(question, &context).await?;
200 metrics.push(stage_metric_sample(
201 request.namespace_id,
202 "cognition.query.answer_ms",
203 answer_started.elapsed().as_secs_f64() * 1000.0,
204 "answer",
205 ));
206 metrics.extend(token_usage_metric_samples(
207 request.namespace_id,
208 "cognition.query.answer",
209 "answer",
210 usage.as_ref(),
211 ));
212 let mut answer = if should_refine_answer(question, &answer, bucketed) {
213 let refine_started = Instant::now();
214 let (refined, usage) = self
215 .generate_refined_answer(question, &context, &answer)
216 .await?;
217 metrics.push(stage_metric_sample(
218 request.namespace_id,
219 "cognition.query.refine_ms",
220 refine_started.elapsed().as_secs_f64() * 1000.0,
221 "refine",
222 ));
223 metrics.extend(token_usage_metric_samples(
224 request.namespace_id,
225 "cognition.query.refine",
226 "refine",
227 usage.as_ref(),
228 ));
229 select_better_answer(answer, refined)
230 } else {
231 answer
232 };
233 answer.lineages = lineages;
234
235 metrics.push(stage_metric_sample(
236 request.namespace_id,
237 "cognition.query.total_ms",
238 total_started.elapsed().as_secs_f64() * 1000.0,
239 "total",
240 ));
241 flush_metric_samples(memory_repo, &metrics).await;
242
243 info!("Query answered successfully");
244 Ok(answer)
245 }
246
247 pub async fn query_introspection(
253 &self,
254 question: &str,
255 namespace_id: i64,
256 memory_repo: &MemoryRepository,
257 ) -> Result<QueryIntrospection, AgentError> {
258 let request = WorkingRepresentationRequest {
259 namespace_id,
260 perspective: None,
261 query: Some(question.to_string()),
262 max_items: self.config.query_context_limit,
263 include_raw: false,
264 ..WorkingRepresentationRequest::default()
265 };
266 let request = self.with_cross_namespace_ids(request, memory_repo).await?;
267
268 self.introspection_with_representation(&request, question, memory_repo)
269 .await
270 }
271
272 pub async fn introspection_with_representation(
274 &self,
275 request: &WorkingRepresentationRequest,
276 question: &str,
277 memory_repo: &MemoryRepository,
278 ) -> Result<QueryIntrospection, AgentError> {
279 introspect_query_with_representation_service(
280 &self.representation,
281 request,
282 question,
283 memory_repo,
284 )
285 .await
286 }
287
288 async fn with_cross_namespace_ids(
289 &self,
290 mut request: WorkingRepresentationRequest,
291 memory_repo: &MemoryRepository,
292 ) -> Result<WorkingRepresentationRequest, AgentError> {
293 if !request.cross_namespace_ids.is_empty() {
294 return Ok(request);
295 }
296
297 let namespace_repo = NamespaceRepository::new(memory_repo.pool().clone());
298 let namespaces = namespace_repo
299 .list_all()
300 .await
301 .map_err(|e| AgentError::Storage(e.to_string()))?;
302 let Some(current) = namespaces
303 .iter()
304 .find(|namespace| namespace.id == request.namespace_id)
305 else {
306 return Ok(request);
307 };
308
309 request.cross_namespace_ids =
310 IdentityResolver::related_namespace_ids(&namespace_repo, ¤t.name)
311 .await
312 .into_iter()
313 .filter(|id| *id != request.namespace_id)
314 .collect();
315
316 Ok(request)
317 }
318
319 fn build_lightweight_context(
324 &self,
325 bucketed: &[BucketedMemory],
326 lineages: &[MemoryLineage],
327 ) -> Result<String, AgentError> {
328 let mut parts = Vec::with_capacity(bucketed.len());
329
330 for (bm, lineage) in bucketed.iter().zip(lineages.iter()) {
331 let summary = extract_agent_summary(
332 &serde_json::to_string(&bm.memory.metadata).unwrap_or_else(|_| "{}".to_string()),
333 &bm.memory.content,
334 300,
335 );
336
337 let relevance = lineage
338 .relevance_score
339 .map_or(String::new(), |r| format!(", relevance: {:.2}", r));
340
341 parts.push(format!(
342 "[Memory #{}] {} (bucket: {}, phase: {}{})\nSummary: {}",
343 bm.memory.id,
344 bm.memory.content.chars().take(100).collect::<String>(),
345 lineage.bucket,
346 lineage.phase,
347 relevance,
348 summary,
349 ));
350 }
351
352 Ok(parts.join("\n\n"))
353 }
354
355 fn build_phase_aware_context(
361 &self,
362 bucketed: &[BucketedMemory],
363 lineages: &[MemoryLineage],
364 ) -> Result<String, AgentError> {
365 let mut lephase = LePhaseIntegration::with_mode(CompressionMode::Balanced);
366
367 for bm in bucketed {
369 lephase.register_memory(&bm.memory);
370 }
371
372 let memories: Vec<Memory> = bucketed.iter().map(|bm| bm.memory.clone()).collect();
373 let formatted = lephase.format_for_model(&memories, None);
374
375 let lineage_map: HashMap<i64, &MemoryLineage> =
377 lineages.iter().map(|l| (l.memory_id, l)).collect();
378
379 let annotated = annotate_with_buckets(&formatted, &lineage_map);
381
382 Ok(annotated)
383 }
384
385 async fn generate_answer(
386 &self,
387 question: &str,
388 context: &str,
389 ) -> Result<(QueryAnswer, Option<TokenUsage>), AgentError> {
390 let user_msg = if context.is_empty() {
391 query_user_prompt_with_lineage(question, "No relevant memories found.")
392 } else {
393 query_user_prompt_with_lineage(question, context)
394 };
395
396 let params = GenerateParams {
397 messages: vec![
398 ChatMessage::system(QUERY_SYSTEM_PROMPT),
399 ChatMessage::user(user_msg),
400 ],
401 max_tokens: 4096,
402 temperature: 0.3,
403 json_mode: true,
404 };
405
406 let response = self
407 .llm
408 .generate(params)
409 .await
410 .map_err(|e| AgentError::Llm(e.to_string()))?;
411 let usage = response.usage.clone();
412 let answer: QueryAnswer =
413 parse_json_response(&response).map_err(|e| AgentError::Llm(e.to_string()))?;
414
415 Ok((answer, usage))
416 }
417
418 async fn generate_refined_answer(
419 &self,
420 question: &str,
421 context: &str,
422 draft: &QueryAnswer,
423 ) -> Result<(QueryAnswer, Option<TokenUsage>), AgentError> {
424 let params = GenerateParams {
425 messages: vec![
426 ChatMessage::system(QUERY_SYSTEM_PROMPT),
427 ChatMessage::user(query_refinement_user_prompt(
428 question,
429 context,
430 &draft.answer,
431 )),
432 ],
433 max_tokens: 4096,
434 temperature: 0.2,
435 json_mode: true,
436 };
437
438 let response = self
439 .llm
440 .generate(params)
441 .await
442 .map_err(|e| AgentError::Llm(e.to_string()))?;
443 let usage = response.usage.clone();
444 let answer: QueryAnswer =
445 parse_json_response(&response).map_err(|e| AgentError::Llm(e.to_string()))?;
446 Ok((answer, usage))
447 }
448
449 async fn query_legacy(
451 &self,
452 question: &str,
453 namespace_id: i64,
454 memory_repo: &MemoryRepository,
455 _relation_repo: &MemoryRelationRepository<'_>,
456 ) -> Result<QueryAnswer, AgentError> {
457 let limit = self.config.query_context_limit;
458 let memories = memory_repo
459 .search_by_text_memories(namespace_id, question, limit as i32, false)
460 .await
461 .map_err(|e| AgentError::Storage(e.to_string()))?;
462
463 let context = memories
464 .iter()
465 .map(|m| format!("[Memory #{}] {}", m.id, m.content))
466 .collect::<Vec<String>>()
467 .join("\n\n");
468
469 let (answer, _) = self.generate_answer(question, &context).await?;
470 Ok(answer)
471 }
472}
473
474fn introspection_request(request: &WorkingRepresentationRequest) -> WorkingRepresentationRequest {
475 let mut overfetch = request.clone();
476 overfetch.max_items = request
477 .max_items
478 .saturating_mul(3)
479 .max(request.max_items + 8);
480 overfetch
481}
482
483fn build_lineages(bucketed: &[BucketedMemory]) -> Vec<MemoryLineage> {
489 let analyzer = nexus_lephase::PhaseAnalyzer::new();
490
491 bucketed
492 .iter()
493 .map(|bm| {
494 let analysis = analyzer.analyze(&bm.memory);
495 MemoryLineage {
496 memory_id: bm.memory.id,
497 bucket: bm.bucket,
498 phase: analysis.phase.phase_type.to_string(),
499 relevance_score: bm
500 .memory
501 .relevance_score
502 .or(bm.memory.similarity_score)
503 .or(Some((bm.blended_score / 100.0).clamp(0.0, 1.0))),
504 }
505 })
506 .collect()
507}
508
509fn should_refine_answer(question: &str, answer: &QueryAnswer, bucketed: &[BucketedMemory]) -> bool {
510 if bucketed.is_empty() {
511 return false;
512 }
513
514 let lower_question = question.to_ascii_lowercase();
515 let question_word_count = question.split_whitespace().count();
516 let question_complex = question.len() > 120
517 || question_word_count > 18
518 || [
519 "why",
520 "how",
521 "compare",
522 "contrast",
523 "tradeoff",
524 "timeline",
525 "relationship",
526 "explain",
527 "summarize",
528 ]
529 .iter()
530 .any(|needle| lower_question.contains(needle));
531 let weak_answer =
532 answer.confidence < 0.72 || answer.citations.is_empty() || answer.answer.trim().len() < 40;
533 let broad_context = bucketed.len() >= 6;
534 let contradiction_present = bucketed
535 .iter()
536 .any(|memory| memory.bucket == crate::MemoryBucket::Contradictions);
537
538 (weak_answer && (question_complex || broad_context || contradiction_present))
539 || (question_complex && answer.confidence < 0.82 && broad_context)
540}
541
542fn select_better_answer(initial: QueryAnswer, refined: QueryAnswer) -> QueryAnswer {
543 if answer_quality(&refined) >= answer_quality(&initial) {
544 refined
545 } else {
546 initial
547 }
548}
549
550fn answer_quality(answer: &QueryAnswer) -> f32 {
551 let citation_bonus = (answer.citations.len().min(4) as f32) * 0.05;
552 let answer_length_bonus = if answer.answer.trim().len() >= 40 {
553 0.02
554 } else {
555 0.0
556 };
557 answer.confidence + citation_bonus + answer_length_bonus
558}
559
560fn annotate_with_buckets(formatted: &str, lineage_map: &HashMap<i64, &MemoryLineage>) -> String {
563 let mut out = String::with_capacity(formatted.len() + 256);
564 for line in formatted.lines() {
565 out.push_str(line);
566 if let Some(id_str) = line.strip_prefix("[Memory #") {
567 if let Some(end) = id_str.find(']') {
568 if let Ok(id) = id_str[..end].parse::<i64>() {
569 if let Some(lineage) = lineage_map.get(&id) {
570 out.push_str(&format!(" (bucket: {})", lineage.bucket));
571 }
572 }
573 }
574 }
575 out.push('\n');
576 }
577 out
578}
579
580const CONTENT_PREVIEW_LEN: usize = 80;
585const MAX_EXCLUDED_CANDIDATES: usize = 20;
586const MAX_RELEVANT_REFLECTIONS: usize = 10;
587
588pub(crate) async fn build_introspection(
592 ranked: &RankedResult,
593 question: &str,
594 request: &WorkingRepresentationRequest,
595 memory_repo: &MemoryRepository,
596 started: Instant,
597) -> Result<QueryIntrospection, AgentError> {
598 let included = build_inclusion_reasons(&ranked.included, request);
599 let excluded = build_excluded_candidates(&ranked.excluded);
600 let bucket_stats = build_bucket_stats(&ranked.included, &ranked.excluded);
601 let relevant_reflections = fetch_relevant_reflections(question, request, memory_repo).await?;
602
603 let pipeline_latency_ms = Some(started.elapsed().as_millis() as u64);
604 let representation_config = Some(RepresentationConfigSnapshot {
605 max_items: request.max_items,
606 include_raw: request.include_raw,
607 include_digests: request.include_digests,
608 include_semantic: request.include_semantic,
609 include_derived: request.include_derived,
610 include_contradictions: request.include_contradictions,
611 });
612
613 Ok(QueryIntrospection {
614 included,
615 excluded_candidates: excluded,
616 relevant_reflections,
617 bucket_stats,
618 pipeline_latency_ms,
619 representation_config,
620 })
621}
622
623fn build_inclusion_reasons(
624 bucketed: &[BucketedMemory],
625 request: &WorkingRepresentationRequest,
626) -> Vec<InclusionReason> {
627 let analyzer = nexus_lephase::PhaseAnalyzer::new();
628
629 bucketed
630 .iter()
631 .map(|bm| {
632 let analysis = analyzer.analyze(&bm.memory);
633 let relevance = bm
634 .memory
635 .relevance_score
636 .or(bm.memory.similarity_score)
637 .or(Some((bm.blended_score / 100.0).clamp(0.0, 1.0)));
638 let signals = extract_inclusion_signals(bm, request);
639
640 InclusionReason {
641 memory_id: bm.memory.id,
642 bucket: bm.bucket,
643 phase: analysis.phase.phase_type.to_string(),
644 relevance_score: relevance,
645 blended_score: bm.blended_score,
646 reason: classify_inclusion_reason(bm, relevance),
647 signals,
648 }
649 })
650 .collect()
651}
652
653fn extract_inclusion_signals(
657 bm: &BucketedMemory,
658 request: &WorkingRepresentationRequest,
659) -> Vec<InclusionSignal> {
660 use crate::util::CognitionSnapshot;
661 use nexus_core::CognitiveLevel;
662
663 let memory = &bm.memory;
664 let snapshot = CognitionSnapshot::from_memory(memory);
665 let mut signals = Vec::new();
666
667 let age_hours = (chrono::Utc::now() - memory.created_at).num_hours();
669 let recency_desc = match age_hours {
670 h if h <= 1 => "Created within the last hour".to_string(),
671 h if h <= 6 => format!("Created {h}h ago"),
672 h if h <= 24 => format!("Created {h}h ago"),
673 h if h <= 72 => format!("Created {h}h ago"),
674 h if h <= 168 => format!("Created {}d ago", h / 24),
675 _ => format!("Created {}d ago", age_hours / 24),
676 };
677 let recency_weight = match age_hours {
678 h if h <= 1 => 1.0,
679 h if h <= 6 => 0.8,
680 h if h <= 24 => 0.6,
681 h if h <= 72 => 0.35,
682 h if h <= 168 => 0.15,
683 _ => 0.0,
684 };
685 signals.push(InclusionSignal {
686 signal_type: "recency".to_string(),
687 description: recency_desc,
688 weight_contribution: recency_weight,
689 });
690
691 let level_desc = match snapshot.level {
693 CognitiveLevel::Explicit => "Explicit factual memory",
694 CognitiveLevel::Derived => "System-derived insight",
695 CognitiveLevel::Contradiction => "Detected contradiction",
696 CognitiveLevel::SummaryShort => "Short-form session digest",
697 CognitiveLevel::SummaryLong => "Long-form session digest",
698 CognitiveLevel::Raw => "Raw activity record",
699 };
700 let confidence = snapshot.confidence.unwrap_or(0.75).clamp(0.0, 1.0);
701 signals.push(InclusionSignal {
702 signal_type: "cognitive_level".to_string(),
703 description: format!("{level_desc} (confidence: {:.2})", confidence),
704 weight_contribution: confidence,
705 });
706
707 if matches!(bm.bucket, MemoryBucket::Semantic) {
709 let similarity = memory
710 .relevance_score
711 .or(memory.similarity_score)
712 .unwrap_or_default();
713 if similarity > 0.0 {
714 signals.push(InclusionSignal {
715 signal_type: "semantic_similarity".to_string(),
716 description: format!("Embedding similarity score: {:.3}", similarity),
717 weight_contribution: similarity,
718 });
719 }
720 }
721
722 if let Some(ref req_perspective) = request.perspective {
724 if let Some(ref mem_perspective) = snapshot.perspective {
725 let observer_match = mem_perspective.observer == req_perspective.observer;
726 let subject_match = mem_perspective.subject == req_perspective.subject;
727 if observer_match || subject_match {
728 let match_kind = if observer_match && subject_match {
729 "full perspective match"
730 } else if observer_match {
731 "observer match"
732 } else {
733 "subject match"
734 };
735 let weight = if observer_match && subject_match {
736 1.0
737 } else {
738 0.5
739 };
740 signals.push(InclusionSignal {
741 signal_type: "perspective_match".to_string(),
742 description: match_kind.to_string(),
743 weight_contribution: weight,
744 });
745 }
746 }
747 }
748
749 if snapshot.times_reinforced > 0 {
751 let reinforced = ((snapshot.times_reinforced as f32) / 5.0).min(1.0);
752 signals.push(InclusionSignal {
753 signal_type: "reinforcement".to_string(),
754 description: format!("Reinforced {} time(s)", snapshot.times_reinforced),
755 weight_contribution: reinforced,
756 });
757 }
758
759 let bucket_desc = match bm.bucket {
761 MemoryBucket::Digests => Some("Digest bucket — prioritized for context grounding"),
762 MemoryBucket::Contradictions => {
763 Some("Contradiction bucket — surfaced for conflict awareness")
764 }
765 MemoryBucket::Derived => Some("Derived bucket — system insight priority"),
766 MemoryBucket::Semantic => None,
767 MemoryBucket::Recent => None,
768 };
769 if let Some(desc) = bucket_desc {
770 signals.push(InclusionSignal {
771 signal_type: "bucket_boost".to_string(),
772 description: desc.to_string(),
773 weight_contribution: 0.0,
774 });
775 }
776
777 signals
778}
779
780fn classify_inclusion_reason(bm: &BucketedMemory, relevance: Option<f32>) -> String {
781 match bm.bucket {
782 MemoryBucket::Semantic => {
783 let score_str = relevance
784 .map(|s| format!("{:.2}", s))
785 .unwrap_or_else(|| "N/A".to_string());
786 format!("Semantic match (score: {}) in semantic bucket", score_str)
787 }
788 MemoryBucket::Digests => "Session digest selected for context grounding".to_string(),
789 MemoryBucket::Derived => {
790 format!(
791 "Reinforced derived insight (blended: {:.2}) in derived bucket",
792 bm.blended_score
793 )
794 }
795 MemoryBucket::Recent => {
796 format!(
797 "Recent memory (blended: {:.2}) in recent bucket",
798 bm.blended_score
799 )
800 }
801 MemoryBucket::Contradictions => {
802 "Contradiction detected — surfaced for conflict awareness".to_string()
803 }
804 }
805}
806
807fn build_excluded_candidates(excluded: &[RankedExcludedMemory]) -> Vec<ExcludedCandidate> {
808 excluded
809 .iter()
810 .take(MAX_EXCLUDED_CANDIDATES)
811 .map(|excluded| ExcludedCandidate {
812 memory_id: excluded.memory.id,
813 bucket: excluded.bucket,
814 blended_score: excluded.blended_score,
815 reason: excluded.reason.clone(),
816 content_preview: truncate_str(&excluded.memory.content, CONTENT_PREVIEW_LEN),
817 })
818 .collect()
819}
820
821fn build_bucket_stats(
822 included: &[BucketedMemory],
823 excluded: &[RankedExcludedMemory],
824) -> Vec<BucketIntrospectionStats> {
825 let mut all_buckets: Vec<(MemoryBucket, usize, usize)> = Vec::new();
826
827 for bm in included {
828 if let Some(entry) = all_buckets.iter_mut().find(|(b, _, _)| *b == bm.bucket) {
829 entry.1 += 1;
830 } else {
831 all_buckets.push((bm.bucket, 1, 0));
832 }
833 }
834
835 for excluded in excluded {
836 if let Some(entry) = all_buckets
837 .iter_mut()
838 .find(|(bucket, _, _)| *bucket == excluded.bucket)
839 {
840 entry.2 += 1;
841 } else {
842 all_buckets.push((excluded.bucket, 0, 1));
843 }
844 }
845
846 all_buckets
847 .into_iter()
848 .map(|(bucket, inc, exc)| BucketIntrospectionStats {
849 bucket,
850 fetched: inc + exc,
851 included: inc,
852 excluded: exc,
853 })
854 .collect()
855}
856
857async fn fetch_relevant_reflections(
858 question: &str,
859 request: &WorkingRepresentationRequest,
860 memory_repo: &MemoryRepository,
861) -> Result<Vec<RelevantReflection>, AgentError> {
862 let query_terms = tokenize_query(question);
863 let filters = nexus_storage::repository::ListMemoryFilters {
864 category: None,
865 since: None,
866 until: None,
867 content_like: None,
868 include_raw: false,
869 limit: (MAX_RELEVANT_REFLECTIONS as i64).max(10) * 8,
870 offset: 0,
871 };
872
873 let all = memory_repo
874 .list_filtered(request.namespace_id, filters)
875 .await
876 .map_err(|e| AgentError::Storage(e.to_string()))?;
877
878 let mut ranked: Vec<(f32, Memory)> = all
879 .into_iter()
880 .filter(|m| {
881 if !reflection_matches_request_scope(m, request) {
882 return false;
883 }
884 let level = m
885 .metadata
886 .get("cognitive")
887 .and_then(|c| c.get("level"))
888 .and_then(|v| v.as_str())
889 .unwrap_or("");
890 level == "derived" || level == "contradiction"
891 })
892 .map(|m| {
893 let content_terms = tokenize_query(&m.content);
894 let overlap = if query_terms.is_empty() {
895 0.0
896 } else {
897 let shared = query_terms.intersection(&content_terms).count() as f32;
898 shared / query_terms.len() as f32
899 };
900 let confidence = m
901 .metadata
902 .get("cognitive")
903 .and_then(|c| c.get("confidence"))
904 .and_then(|v| v.as_f64())
905 .unwrap_or(0.75) as f32;
906 let recency = {
907 let age_hours = (chrono::Utc::now() - m.created_at).num_hours();
908 match age_hours {
909 h if h <= 1 => 1.0,
910 h if h <= 6 => 0.8,
911 h if h <= 24 => 0.6,
912 h if h <= 72 => 0.35,
913 h if h <= 168 => 0.15,
914 _ => 0.0,
915 }
916 };
917 let score = if query_terms.is_empty() {
918 0.5 * confidence + 0.5 * recency
919 } else {
920 0.65 * overlap + 0.20 * confidence + 0.15 * recency
921 };
922 (score, m)
923 })
924 .filter(|(score, _)| query_terms.is_empty() || *score > 0.0)
925 .collect();
926 ranked.sort_by(|left, right| {
927 right
928 .0
929 .partial_cmp(&left.0)
930 .unwrap_or(std::cmp::Ordering::Equal)
931 .then_with(|| right.1.created_at.cmp(&left.1.created_at))
932 });
933
934 let reflections: Vec<RelevantReflection> = ranked
935 .into_iter()
936 .take(MAX_RELEVANT_REFLECTIONS)
937 .map(|(_, m)| {
938 let reflection_type = m
939 .metadata
940 .get("cognitive")
941 .and_then(|c| c.get("level"))
942 .and_then(|v| v.as_str())
943 .unwrap_or("unknown")
944 .to_string();
945 let confidence = m
946 .metadata
947 .get("cognitive")
948 .and_then(|c| c.get("confidence"))
949 .and_then(|v| v.as_f64());
950
951 RelevantReflection {
952 memory_id: m.id,
953 reflection_type,
954 content_preview: truncate_str(&m.content, CONTENT_PREVIEW_LEN),
955 confidence,
956 created_at: m.created_at.to_rfc3339(),
957 }
958 })
959 .collect();
960
961 Ok(reflections)
962}
963
964fn reflection_matches_request_scope(
965 memory: &Memory,
966 request: &WorkingRepresentationRequest,
967) -> bool {
968 let Some(request_perspective) = request.perspective.as_ref() else {
969 return true;
970 };
971
972 let snapshot = CognitionSnapshot::from_memory(memory);
973 let Some(memory_perspective) = snapshot.perspective.as_ref() else {
974 return false;
975 };
976
977 if memory_perspective.observer != request_perspective.observer
978 || memory_perspective.subject != request_perspective.subject
979 {
980 return false;
981 }
982
983 match request_perspective.session_key.as_deref() {
984 Some(session_key) => memory_perspective.session_key.as_deref() == Some(session_key),
985 None => true,
986 }
987}
988
989fn tokenize_query(text: &str) -> std::collections::BTreeSet<String> {
990 text.split(|c: char| !c.is_alphanumeric())
991 .filter_map(|segment| {
992 let term = segment.trim().to_ascii_lowercase();
993 (term.len() >= 3).then_some(term)
994 })
995 .collect()
996}
997
998fn truncate_str(s: &str, max: usize) -> String {
999 if s.len() <= max {
1000 s.to_string()
1001 } else {
1002 let mut end = max;
1003 while end > 0 && !s.is_char_boundary(end) {
1004 end -= 1;
1005 }
1006 format!("{}...", &s[..end])
1007 }
1008}
1009
1010pub async fn introspect_query(
1022 request: &WorkingRepresentationRequest,
1023 question: &str,
1024 memory_repo: &MemoryRepository,
1025) -> Result<QueryIntrospection, AgentError> {
1026 introspect_query_with_representation_service(
1027 &RepresentationService::new(),
1028 request,
1029 question,
1030 memory_repo,
1031 )
1032 .await
1033}
1034
1035async fn introspect_query_with_representation_service(
1036 representation: &RepresentationService,
1037 request: &WorkingRepresentationRequest,
1038 question: &str,
1039 memory_repo: &MemoryRepository,
1040) -> Result<QueryIntrospection, AgentError> {
1041 let started = Instant::now();
1042 let overfetch_request = introspection_request(request);
1043 let representation = representation
1044 .build(&overfetch_request, memory_repo)
1045 .await
1046 .map_err(|e| AgentError::Storage(e.to_string()))?;
1047
1048 let ranked = flatten_ranked_representation_with_excluded(representation, request);
1049
1050 build_introspection(&ranked, question, request, memory_repo, started).await
1051}
1052
1053#[cfg(test)]
1054mod tests {
1055 use super::*;
1056 use crate::types::{ExclusionReason, MemoryBucket};
1057 use async_trait::async_trait;
1058 use nexus_core::{CognitiveLevel, CognitiveMetadata, MemoryCategory, PerspectiveKey};
1059 use nexus_llm::GenerateResponse;
1060 use nexus_storage::repository::{
1061 MemoryRelationRepository, MemoryRepository, NamespaceRepository, StoreDigestParams,
1062 StoreMemoryParams,
1063 };
1064 use sqlx::sqlite::SqlitePoolOptions;
1065 use std::collections::VecDeque;
1066 use std::sync::{Arc, Mutex};
1067
1068 struct MockLlmClient {
1069 responses: Mutex<VecDeque<nexus_llm::Result<GenerateResponse>>>,
1070 calls: Mutex<Vec<GenerateParams>>,
1071 }
1072
1073 impl MockLlmClient {
1074 fn new(responses: Vec<nexus_llm::Result<GenerateResponse>>) -> Self {
1075 Self {
1076 responses: Mutex::new(VecDeque::from(responses)),
1077 calls: Mutex::new(Vec::new()),
1078 }
1079 }
1080
1081 fn call_count(&self) -> usize {
1082 self.calls.lock().expect("mock calls poisoned").len()
1083 }
1084
1085 fn user_messages(&self) -> Vec<String> {
1086 self.calls
1087 .lock()
1088 .expect("mock calls poisoned")
1089 .iter()
1090 .flat_map(|params| params.messages.iter())
1091 .filter(|message| message.role == "user")
1092 .map(|message| message.content.clone())
1093 .collect()
1094 }
1095 }
1096
1097 #[async_trait]
1098 impl LlmClient for MockLlmClient {
1099 async fn generate(&self, params: GenerateParams) -> nexus_llm::Result<GenerateResponse> {
1100 self.calls.lock().expect("mock calls poisoned").push(params);
1101 self.responses
1102 .lock()
1103 .expect("mock responses poisoned")
1104 .pop_front()
1105 .expect("mock response missing")
1106 }
1107
1108 fn provider_name(&self) -> String {
1109 "mock".to_string()
1110 }
1111
1112 fn model_name(&self) -> String {
1113 "mock-model".to_string()
1114 }
1115 }
1116
1117 async fn setup_repo() -> (sqlx::SqlitePool, MemoryRepository, i64, PerspectiveKey) {
1118 let pool = SqlitePoolOptions::new()
1119 .max_connections(1)
1120 .connect("sqlite::memory:")
1121 .await
1122 .unwrap();
1123 nexus_storage::migrations::run_migrations(&pool)
1124 .await
1125 .unwrap();
1126 let namespace_repo = NamespaceRepository::new(pool.clone());
1127 let namespace = namespace_repo
1128 .get_or_create("query-test", "query-test")
1129 .await
1130 .unwrap();
1131 let perspective =
1132 PerspectiveKey::new("claude-code", "claude-code", Some("session-1".to_string()));
1133 (
1134 pool.clone(),
1135 MemoryRepository::new(pool),
1136 namespace.id,
1137 perspective,
1138 )
1139 }
1140
1141 fn metadata(level: CognitiveLevel, perspective: &PerspectiveKey) -> serde_json::Value {
1142 let mut cognitive = CognitiveMetadata::new(
1143 level,
1144 perspective.observer.clone(),
1145 perspective.subject.clone(),
1146 perspective.session_key.clone(),
1147 "test",
1148 );
1149 cognitive.confidence = Some(0.9);
1150 cognitive.merge_into(&serde_json::json!({}))
1151 }
1152
1153 async fn store_memory(
1154 repo: &MemoryRepository,
1155 namespace_id: i64,
1156 content: &str,
1157 level: CognitiveLevel,
1158 perspective: &PerspectiveKey,
1159 ) -> Memory {
1160 repo.store(StoreMemoryParams {
1161 namespace_id,
1162 content,
1163 category: &MemoryCategory::Facts,
1164 memory_lane_type: None,
1165 labels: &[],
1166 metadata: &metadata(level, perspective),
1167 embedding: None,
1168 embedding_model: None,
1169 })
1170 .await
1171 .unwrap()
1172 }
1173
1174 async fn store_raw_memory(
1175 repo: &MemoryRepository,
1176 namespace_id: i64,
1177 content: &str,
1178 perspective: &PerspectiveKey,
1179 ) -> Memory {
1180 repo.store(StoreMemoryParams {
1181 namespace_id,
1182 content,
1183 category: &MemoryCategory::Session,
1184 memory_lane_type: None,
1185 labels: &["raw-activity".to_string()],
1186 metadata: &serde_json::json!({
1187 "raw_activity": true,
1188 "cognitive": {
1189 "level": "raw",
1190 "observer": perspective.observer,
1191 "subject": perspective.subject,
1192 "session_key": perspective.session_key,
1193 "generated_by": "test"
1194 }
1195 }),
1196 embedding: None,
1197 embedding_model: None,
1198 })
1199 .await
1200 .unwrap()
1201 }
1202
1203 fn answer_response(answer: &str, confidence: f32, citations: &[i64]) -> GenerateResponse {
1204 let citations_json: Vec<serde_json::Value> = citations
1205 .iter()
1206 .map(|memory_id| {
1207 serde_json::json!({
1208 "memory_id": memory_id,
1209 "title": format!("Memory {}", memory_id),
1210 "excerpt": format!("Excerpt {}", memory_id)
1211 })
1212 })
1213 .collect();
1214 GenerateResponse {
1215 content: serde_json::json!({
1216 "answer": answer,
1217 "citations": citations_json,
1218 "confidence": confidence
1219 })
1220 .to_string(),
1221 model: "mock-model".to_string(),
1222 usage: None,
1223 }
1224 }
1225
1226 fn test_memory(id: i64, content: &str) -> Memory {
1227 Memory {
1228 id,
1229 namespace_id: 1,
1230 content: content.to_string(),
1231 category: nexus_core::MemoryCategory::Facts,
1232 labels: Vec::new(),
1233 metadata: serde_json::json!({}),
1234 ..Memory::default()
1235 }
1236 }
1237
1238 fn test_memory_with_relevance(id: i64, content: &str, relevance: f32) -> Memory {
1239 Memory {
1240 id,
1241 namespace_id: 1,
1242 content: content.to_string(),
1243 category: nexus_core::MemoryCategory::Facts,
1244 labels: Vec::new(),
1245 metadata: serde_json::json!({}),
1246 relevance_score: Some(relevance),
1247 ..Memory::default()
1248 }
1249 }
1250
1251 #[test]
1252 fn test_build_lineages_detects_phases() {
1253 let bucketed = vec![
1254 BucketedMemory {
1255 memory: test_memory(1, "Plan the next sprint tasks"),
1256 bucket: MemoryBucket::Recent,
1257 blended_score: 0.91,
1258 },
1259 BucketedMemory {
1260 memory: test_memory(2, "Implement the auth feature code"),
1261 bucket: MemoryBucket::Semantic,
1262 blended_score: 0.83,
1263 },
1264 BucketedMemory {
1265 memory: test_memory(3, "Fix the bug in error handling"),
1266 bucket: MemoryBucket::Derived,
1267 blended_score: 0.88,
1268 },
1269 ];
1270
1271 let lineages = build_lineages(&bucketed);
1272 assert_eq!(lineages.len(), 3);
1273
1274 assert_eq!(lineages[0].memory_id, 1);
1275 assert_eq!(lineages[0].bucket, MemoryBucket::Recent);
1276 assert_eq!(lineages[0].phase, "planning");
1277
1278 assert_eq!(lineages[1].memory_id, 2);
1279 assert_eq!(lineages[1].bucket, MemoryBucket::Semantic);
1280 assert_eq!(lineages[1].phase, "execution");
1281
1282 assert_eq!(lineages[2].memory_id, 3);
1283 assert_eq!(lineages[2].bucket, MemoryBucket::Derived);
1284 assert_eq!(lineages[2].phase, "debugging");
1285 }
1286
1287 #[test]
1288 fn test_build_lineages_captures_relevance_scores() {
1289 let bucketed = vec![BucketedMemory {
1290 memory: test_memory_with_relevance(42, "test content", 0.87),
1291 bucket: MemoryBucket::Semantic,
1292 blended_score: 0.95,
1293 }];
1294
1295 let lineages = build_lineages(&bucketed);
1296 assert_eq!(lineages[0].relevance_score, Some(0.87));
1297 }
1298
1299 #[test]
1300 fn test_build_lineages_falls_back_to_similarity_score() {
1301 let bucketed = vec![BucketedMemory {
1302 memory: Memory {
1303 id: 1,
1304 similarity_score: Some(0.72),
1305 ..test_memory(1, "test")
1306 },
1307 bucket: MemoryBucket::Semantic,
1308 blended_score: 0.79,
1309 }];
1310
1311 let lineages = build_lineages(&bucketed);
1312 assert_eq!(lineages[0].relevance_score, Some(0.72));
1313 }
1314
1315 #[test]
1316 fn test_should_refine_answer_for_complex_low_confidence_answer() {
1317 let bucketed = vec![
1318 BucketedMemory {
1319 memory: test_memory(1, "Digest"),
1320 bucket: MemoryBucket::Digests,
1321 blended_score: 0.88,
1322 },
1323 BucketedMemory {
1324 memory: test_memory(2, "Contradiction"),
1325 bucket: MemoryBucket::Contradictions,
1326 blended_score: 0.74,
1327 },
1328 ];
1329 let answer = QueryAnswer {
1330 answer: "Maybe.".to_string(),
1331 citations: Vec::new(),
1332 confidence: 0.55,
1333 ..Default::default()
1334 };
1335
1336 assert!(should_refine_answer(
1337 "Explain the tradeoff timeline and contradictions in this session",
1338 &answer,
1339 &bucketed,
1340 ));
1341 }
1342
1343 #[test]
1344 fn test_should_not_refine_simple_high_confidence_answer() {
1345 let bucketed = vec![BucketedMemory {
1346 memory: test_memory(1, "Recent memory"),
1347 bucket: MemoryBucket::Recent,
1348 blended_score: 0.82,
1349 }];
1350 let answer = QueryAnswer {
1351 answer: "The active provider is Gemini and the setting is already applied.".to_string(),
1352 citations: vec![crate::types::MemoryCitation {
1353 memory_id: 1,
1354 title: "Provider".to_string(),
1355 excerpt: "Gemini is active".to_string(),
1356 }],
1357 confidence: 0.91,
1358 ..Default::default()
1359 };
1360
1361 assert!(!should_refine_answer(
1362 "What is the active provider?",
1363 &answer,
1364 &bucketed,
1365 ));
1366 }
1367
1368 #[test]
1369 fn test_select_better_answer_prefers_cited_refined_answer() {
1370 let initial = QueryAnswer {
1371 answer: "Short".to_string(),
1372 citations: Vec::new(),
1373 confidence: 0.78,
1374 ..Default::default()
1375 };
1376 let refined = QueryAnswer {
1377 answer: "Longer answer with supporting detail and an explicit citation.".to_string(),
1378 citations: vec![crate::types::MemoryCitation {
1379 memory_id: 3,
1380 title: "Evidence".to_string(),
1381 excerpt: "Supporting excerpt".to_string(),
1382 }],
1383 confidence: 0.76,
1384 ..Default::default()
1385 };
1386
1387 let selected = select_better_answer(initial, refined);
1388 assert_eq!(selected.citations.len(), 1);
1389 }
1390
1391 #[test]
1394 fn test_annotate_with_buckets_appends_provenance() {
1395 let lineage = MemoryLineage {
1396 memory_id: 1,
1397 bucket: MemoryBucket::Semantic,
1398 phase: "execution".to_string(),
1399 relevance_score: Some(0.9),
1400 };
1401 let mut lineage_map = HashMap::new();
1402 lineage_map.insert(1, &lineage);
1403
1404 let formatted = "[Memory #1] Implement feature\nSome content\n";
1405 let annotated = annotate_with_buckets(formatted, &lineage_map);
1406
1407 assert!(annotated.contains("[Memory #1] Implement feature (bucket: semantic)"));
1408 assert!(annotated.contains("Some content"));
1409 assert_eq!(
1411 annotated.matches("(bucket:").count(),
1412 1,
1413 "expected exactly one bucket annotation"
1414 );
1415 }
1416
1417 #[test]
1418 fn test_annotate_skips_unknown_ids() {
1419 let lineage_map = HashMap::new();
1420 let formatted = "[Memory #1] Implement feature\n";
1421 let annotated = annotate_with_buckets(formatted, &lineage_map);
1422 assert_eq!(annotated, "[Memory #1] Implement feature\n");
1424 }
1425
1426 #[test]
1429 fn test_memory_bucket_display() {
1430 assert_eq!(MemoryBucket::Digests.to_string(), "digests");
1431 assert_eq!(MemoryBucket::Recent.to_string(), "recent");
1432 assert_eq!(MemoryBucket::Semantic.to_string(), "semantic");
1433 assert_eq!(MemoryBucket::Derived.to_string(), "derived");
1434 assert_eq!(MemoryBucket::Contradictions.to_string(), "contradictions");
1435 }
1436
1437 #[tokio::test]
1438 async fn test_query_service_empty_working_set_uses_no_relevant_memories_prompt() {
1439 let (pool, repo, namespace_id, _perspective) = setup_repo().await;
1440 let relation_repo = MemoryRelationRepository::new(&pool);
1441 let llm = Arc::new(MockLlmClient::new(vec![Ok(answer_response(
1442 "No memory matched.",
1443 0.92,
1444 &[],
1445 ))]));
1446 let service = QueryService::new(llm.clone(), AgentConfig::default());
1447
1448 let answer = service
1449 .query("What happened?", namespace_id, &repo, &relation_repo)
1450 .await
1451 .unwrap();
1452
1453 assert_eq!(answer.answer, "No memory matched.");
1454 assert!(answer.lineages.is_empty());
1455 assert_eq!(llm.call_count(), 1);
1456 let prompts = llm.user_messages();
1457 assert!(prompts
1458 .iter()
1459 .any(|prompt| prompt.contains("No relevant memories found.")));
1460 }
1461
1462 #[tokio::test]
1463 async fn test_query_service_excludes_raw_noise_by_default_and_attaches_lineages() {
1464 let (pool, repo, namespace_id, perspective) = setup_repo().await;
1465 let relation_repo = MemoryRelationRepository::new(&pool);
1466 let explicit = store_memory(
1467 &repo,
1468 namespace_id,
1469 "Explicit observation about retrieval ranking.",
1470 CognitiveLevel::Explicit,
1471 &perspective,
1472 )
1473 .await;
1474 let raw = store_raw_memory(
1475 &repo,
1476 namespace_id,
1477 "raw hook payload about retrieval ranking",
1478 &perspective,
1479 )
1480 .await;
1481 let llm = Arc::new(MockLlmClient::new(vec![Ok(answer_response(
1482 "Ranking was improved.",
1483 0.9,
1484 &[explicit.id],
1485 ))]));
1486 let service = QueryService::new(llm.clone(), AgentConfig::default());
1487
1488 let answer = service
1489 .query(
1490 "What changed in retrieval ranking?",
1491 namespace_id,
1492 &repo,
1493 &relation_repo,
1494 )
1495 .await
1496 .unwrap();
1497
1498 assert!(!answer.lineages.is_empty());
1499 assert!(answer
1500 .lineages
1501 .iter()
1502 .any(|lineage| lineage.memory_id == explicit.id));
1503 assert!(!answer
1504 .lineages
1505 .iter()
1506 .any(|lineage| lineage.memory_id == raw.id));
1507 let prompts = llm.user_messages();
1508 assert!(prompts
1509 .iter()
1510 .any(|prompt| prompt.contains("Explicit observation about retrieval ranking.")));
1511 assert!(!prompts
1512 .iter()
1513 .any(|prompt| prompt.contains("raw hook payload about retrieval ranking")));
1514 }
1515
1516 #[tokio::test]
1517 async fn test_query_with_representation_can_include_raw_when_requested() {
1518 let (pool, repo, namespace_id, perspective) = setup_repo().await;
1519 let relation_repo = MemoryRelationRepository::new(&pool);
1520 store_memory(
1521 &repo,
1522 namespace_id,
1523 "Explicit observation about hook routing.",
1524 CognitiveLevel::Explicit,
1525 &perspective,
1526 )
1527 .await;
1528 let raw = store_raw_memory(
1529 &repo,
1530 namespace_id,
1531 "raw hook payload about hook routing",
1532 &perspective,
1533 )
1534 .await;
1535 let llm = Arc::new(MockLlmClient::new(vec![Ok(answer_response(
1536 "The hook routing is visible through the explicit and raw activity records.",
1537 0.88,
1538 &[raw.id],
1539 ))]));
1540 let service = QueryService::new(llm.clone(), AgentConfig::default());
1541
1542 let answer = service
1543 .query_with_representation(
1544 "What does the hook traffic show?",
1545 WorkingRepresentationRequest {
1546 namespace_id,
1547 perspective: None,
1548 query: Some("hook routing".to_string()),
1549 max_items: 10,
1550 include_raw: true,
1551 ..WorkingRepresentationRequest::default()
1552 },
1553 &repo,
1554 &relation_repo,
1555 )
1556 .await
1557 .unwrap();
1558
1559 assert!(answer
1560 .lineages
1561 .iter()
1562 .any(|lineage| lineage.memory_id == raw.id));
1563 let prompts = llm.user_messages();
1564 assert!(prompts
1565 .iter()
1566 .any(|prompt| prompt.contains("raw hook payload about hook routing")));
1567 }
1568
1569 #[tokio::test]
1570 async fn test_query_service_mixed_cognition_outputs_attach_multiple_lineages_and_phase_context()
1571 {
1572 let (pool, repo, namespace_id, perspective) = setup_repo().await;
1573 let relation_repo = MemoryRelationRepository::new(&pool);
1574 let digest = store_memory(
1575 &repo,
1576 namespace_id,
1577 "Digest summary of the session.",
1578 CognitiveLevel::SummaryShort,
1579 &perspective,
1580 )
1581 .await;
1582 repo.store_digest(StoreDigestParams {
1583 namespace_id,
1584 session_key: "session-1",
1585 digest_kind: "short",
1586 memory_id: digest.id,
1587 start_memory_id: Some(digest.id),
1588 end_memory_id: Some(digest.id),
1589 token_count: 42,
1590 })
1591 .await
1592 .unwrap();
1593 let derived = store_memory(
1594 &repo,
1595 namespace_id,
1596 "Derived insight about the refactor.",
1597 CognitiveLevel::Derived,
1598 &perspective,
1599 )
1600 .await;
1601 let contradiction = store_memory(
1602 &repo,
1603 namespace_id,
1604 "Contradiction between old and new recall paths.",
1605 CognitiveLevel::Contradiction,
1606 &perspective,
1607 )
1608 .await;
1609 let llm = Arc::new(MockLlmClient::new(vec![Ok(answer_response(
1610 "The session had a digest, an insight, and a contradiction.",
1611 0.86,
1612 &[digest.id, derived.id, contradiction.id],
1613 ))]));
1614 let service = QueryService::new(llm.clone(), AgentConfig::default());
1615
1616 let answer = service
1617 .query_with_representation(
1618 "Explain the session timeline and contradictions in context.",
1619 WorkingRepresentationRequest {
1620 namespace_id,
1621 perspective: Some(perspective.clone()),
1622 query: Some("timeline contradiction insight".to_string()),
1623 max_items: 10,
1624 include_raw: false,
1625 include_recent: false,
1626 include_semantic: false,
1627 include_derived: true,
1628 include_digests: true,
1629 include_contradictions: true,
1630 ..WorkingRepresentationRequest::default()
1631 },
1632 &repo,
1633 &relation_repo,
1634 )
1635 .await
1636 .unwrap();
1637
1638 assert!(answer
1639 .lineages
1640 .iter()
1641 .any(|lineage| lineage.memory_id == digest.id));
1642 assert!(answer
1643 .lineages
1644 .iter()
1645 .any(|lineage| lineage.memory_id == derived.id));
1646 assert!(answer
1647 .lineages
1648 .iter()
1649 .any(|lineage| lineage.memory_id == contradiction.id));
1650 let prompts = llm.user_messages();
1651 assert!(prompts
1652 .iter()
1653 .any(|prompt| prompt.contains("Digest summary of the session.")));
1654 assert!(prompts
1655 .iter()
1656 .any(|prompt| prompt.contains("Derived insight about the refactor.")));
1657 assert!(prompts
1658 .iter()
1659 .any(|prompt| prompt.contains("Contradiction between old and new recall paths.")));
1660 assert!(!prompts.iter().any(|prompt| prompt.contains("Summary:")));
1661 }
1662
1663 #[tokio::test]
1664 async fn test_query_service_representation_beats_old_like_recall_for_session_digest_context() {
1665 let (pool, repo, namespace_id, perspective) = setup_repo().await;
1666 let relation_repo = MemoryRelationRepository::new(&pool);
1667
1668 store_memory(
1669 &repo,
1670 namespace_id,
1671 "Configured Gemini as the active provider and preserved installer env settings.",
1672 CognitiveLevel::Explicit,
1673 &perspective,
1674 )
1675 .await;
1676 let digest = store_memory(
1677 &repo,
1678 namespace_id,
1679 "Digest summary: migration timeline of the provider switch, installer preservation, and bounded dreaming rollout.",
1680 CognitiveLevel::SummaryShort,
1681 &perspective,
1682 )
1683 .await;
1684 repo.store_digest(StoreDigestParams {
1685 namespace_id,
1686 session_key: perspective.session_key.as_deref().unwrap_or("session-1"),
1687 digest_kind: "short",
1688 memory_id: digest.id,
1689 start_memory_id: Some(digest.id),
1690 end_memory_id: Some(digest.id),
1691 token_count: 64,
1692 })
1693 .await
1694 .unwrap();
1695 let contradiction = store_memory(
1696 &repo,
1697 namespace_id,
1698 "Contradiction note: old recall missed the migration timeline while representation-first recall surfaced it.",
1699 CognitiveLevel::Contradiction,
1700 &perspective,
1701 )
1702 .await;
1703
1704 let question =
1705 "What does the migration timeline summary say about the provider switch rollout?";
1706 let old_like_hits = repo
1707 .search_by_text(namespace_id, question, 10, false)
1708 .await
1709 .unwrap();
1710 assert!(
1711 old_like_hits.is_empty(),
1712 "legacy LIKE recall should miss the natural-language question when no memory contains the full string"
1713 );
1714
1715 let llm = Arc::new(MockLlmClient::new(vec![Ok(answer_response(
1716 "The migration timeline shows a provider switch digest with rollout context and a contradiction note about the old recall path missing it.",
1717 0.9,
1718 &[digest.id, contradiction.id],
1719 ))]));
1720 let service = QueryService::new(llm.clone(), AgentConfig::default());
1721
1722 let answer = service
1723 .query_with_representation(
1724 question,
1725 WorkingRepresentationRequest {
1726 namespace_id,
1727 perspective: Some(perspective.clone()),
1728 query: Some(question.to_string()),
1729 max_items: 12,
1730 include_raw: false,
1731 include_recent: true,
1732 include_semantic: true,
1733 include_derived: true,
1734 include_digests: true,
1735 include_contradictions: true,
1736 ..WorkingRepresentationRequest::default()
1737 },
1738 &repo,
1739 &relation_repo,
1740 )
1741 .await
1742 .unwrap();
1743
1744 assert!(answer
1745 .lineages
1746 .iter()
1747 .any(|lineage| lineage.memory_id == digest.id));
1748 assert!(answer
1749 .lineages
1750 .iter()
1751 .any(|lineage| lineage.memory_id == contradiction.id));
1752
1753 let prompts = llm.user_messages();
1754 assert!(prompts.iter().any(|prompt| prompt.contains(
1755 "Digest summary: migration timeline of the provider switch, installer preservation, and bounded dreaming rollout."
1756 )));
1757 assert!(prompts.iter().any(|prompt| prompt.contains(
1758 "Contradiction note: old recall missed the migration timeline while representation-first recall surfaced it."
1759 )));
1760 }
1761
1762 #[tokio::test]
1763 async fn test_query_service_refinement_triggers_second_llm_call() {
1764 let (pool, repo, namespace_id, perspective) = setup_repo().await;
1765 let relation_repo = MemoryRelationRepository::new(&pool);
1766 for idx in 0..6 {
1767 let content = format!("Execution detail {idx} for the migration timeline.");
1768 store_memory(
1769 &repo,
1770 namespace_id,
1771 &content,
1772 CognitiveLevel::Explicit,
1773 &perspective,
1774 )
1775 .await;
1776 }
1777 let llm = Arc::new(MockLlmClient::new(vec![
1778 Ok(answer_response("Maybe.", 0.55, &[])),
1779 Ok(answer_response(
1780 "The migration timeline shows several execution details with stronger support.",
1781 0.9,
1782 &[1],
1783 )),
1784 ]));
1785 let service = QueryService::new(llm.clone(), AgentConfig::default());
1786
1787 let answer = service
1788 .query(
1789 "Explain the tradeoff timeline and relationship across the migration work.",
1790 namespace_id,
1791 &repo,
1792 &relation_repo,
1793 )
1794 .await
1795 .unwrap();
1796
1797 assert_eq!(
1798 answer.answer,
1799 "The migration timeline shows several execution details with stronger support."
1800 );
1801 assert_eq!(llm.call_count(), 2);
1802 }
1803
1804 #[tokio::test]
1805 async fn test_query_service_auto_includes_cross_namespace_alias_digest_without_perspective() {
1806 let pool = SqlitePoolOptions::new()
1807 .max_connections(1)
1808 .connect("sqlite::memory:")
1809 .await
1810 .unwrap();
1811 nexus_storage::migrations::run_migrations(&pool)
1812 .await
1813 .unwrap();
1814
1815 let namespace_repo = NamespaceRepository::new(pool.clone());
1816 let primary = namespace_repo
1817 .get_or_create("claude-code", "claude-code")
1818 .await
1819 .unwrap();
1820 let alias = namespace_repo
1821 .get_or_create("claude", "claude")
1822 .await
1823 .unwrap();
1824 let unrelated = namespace_repo
1825 .get_or_create("codex", "codex")
1826 .await
1827 .unwrap();
1828
1829 let repo = MemoryRepository::new(pool.clone());
1830 let relation_repo = MemoryRelationRepository::new(&pool);
1831 let perspective =
1832 PerspectiveKey::new("claude-code", "claude-code", Some("session-1".to_string()));
1833
1834 let alias_digest = store_memory(
1835 &repo,
1836 alias.id,
1837 "Alias digest summary: the claude namespace captured the provider rollout timeline.",
1838 CognitiveLevel::SummaryShort,
1839 &perspective,
1840 )
1841 .await;
1842 repo.store_digest(StoreDigestParams {
1843 namespace_id: alias.id,
1844 session_key: "session-1",
1845 digest_kind: "short",
1846 memory_id: alias_digest.id,
1847 start_memory_id: Some(alias_digest.id),
1848 end_memory_id: Some(alias_digest.id),
1849 token_count: 48,
1850 })
1851 .await
1852 .unwrap();
1853
1854 let unrelated_digest = store_memory(
1855 &repo,
1856 unrelated.id,
1857 "Unrelated codex digest summary: refactor unrelated CLI parsing bug.",
1858 CognitiveLevel::SummaryShort,
1859 &perspective,
1860 )
1861 .await;
1862 repo.store_digest(StoreDigestParams {
1863 namespace_id: unrelated.id,
1864 session_key: "session-1",
1865 digest_kind: "short",
1866 memory_id: unrelated_digest.id,
1867 start_memory_id: Some(unrelated_digest.id),
1868 end_memory_id: Some(unrelated_digest.id),
1869 token_count: 41,
1870 })
1871 .await
1872 .unwrap();
1873
1874 let llm = Arc::new(MockLlmClient::new(vec![Ok(answer_response(
1875 "The provider rollout timeline is preserved in the alias digest.",
1876 0.93,
1877 &[alias_digest.id],
1878 ))]));
1879 let service = QueryService::new(llm.clone(), AgentConfig::default());
1880
1881 let answer = service
1882 .query(
1883 "What does the provider rollout timeline say?",
1884 primary.id,
1885 &repo,
1886 &relation_repo,
1887 )
1888 .await
1889 .unwrap();
1890
1891 assert!(answer
1892 .lineages
1893 .iter()
1894 .any(|lineage| lineage.memory_id == alias_digest.id));
1895 assert!(!answer
1896 .lineages
1897 .iter()
1898 .any(|lineage| lineage.memory_id == unrelated_digest.id));
1899
1900 let prompts = llm.user_messages();
1901 assert!(prompts.iter().any(|prompt| prompt.contains(
1902 "Alias digest summary: the claude namespace captured the provider rollout timeline."
1903 )));
1904 assert!(!prompts.iter().any(|prompt| prompt
1905 .contains("Unrelated codex digest summary: refactor unrelated CLI parsing bug.")));
1906 }
1907
1908 #[tokio::test]
1909 async fn test_query_service_simple_answer_stays_single_call() {
1910 let (pool, repo, namespace_id, perspective) = setup_repo().await;
1911 let relation_repo = MemoryRelationRepository::new(&pool);
1912 let explicit = store_memory(
1913 &repo,
1914 namespace_id,
1915 "Explicit note about the active provider switch.",
1916 CognitiveLevel::Explicit,
1917 &perspective,
1918 )
1919 .await;
1920 let llm = Arc::new(MockLlmClient::new(vec![Ok(answer_response(
1921 "The active provider is Gemini and the change is already applied with explicit support.",
1922 0.94,
1923 &[explicit.id],
1924 ))]));
1925 let service = QueryService::new(llm.clone(), AgentConfig::default());
1926
1927 let answer = service
1928 .query(
1929 "What is the active provider?",
1930 namespace_id,
1931 &repo,
1932 &relation_repo,
1933 )
1934 .await
1935 .unwrap();
1936
1937 assert_eq!(
1938 answer.answer,
1939 "The active provider is Gemini and the change is already applied with explicit support."
1940 );
1941 assert_eq!(llm.call_count(), 1);
1942 }
1943
1944 #[tokio::test]
1945 async fn test_query_service_lightweight_context_below_phase_threshold() {
1946 let (pool, repo, namespace_id, perspective) = setup_repo().await;
1947 let relation_repo = MemoryRelationRepository::new(&pool);
1948 store_memory(
1949 &repo,
1950 namespace_id,
1951 "Short note about the provider switch.",
1952 CognitiveLevel::Explicit,
1953 &perspective,
1954 )
1955 .await;
1956 store_memory(
1957 &repo,
1958 namespace_id,
1959 "Follow-up note about the provider switch.",
1960 CognitiveLevel::Explicit,
1961 &perspective,
1962 )
1963 .await;
1964 let llm = Arc::new(MockLlmClient::new(vec![Ok(answer_response(
1965 "The provider changed.",
1966 0.93,
1967 &[1],
1968 ))]));
1969 let service = QueryService::new(llm.clone(), AgentConfig::default());
1970
1971 let answer = service
1972 .query(
1973 "What changed with the provider?",
1974 namespace_id,
1975 &repo,
1976 &relation_repo,
1977 )
1978 .await
1979 .unwrap();
1980
1981 assert!(!answer.lineages.is_empty());
1982 let prompts = llm.user_messages();
1983 assert!(prompts.iter().any(|prompt| prompt.contains("Summary:")));
1984 }
1985
1986 #[test]
1991 fn test_build_inclusion_reasons_produces_non_empty_reasons() {
1992 let bucketed = vec![
1993 BucketedMemory {
1994 memory: test_memory(1, "Fix the authentication bug"),
1995 bucket: MemoryBucket::Semantic,
1996 blended_score: 0.91,
1997 },
1998 BucketedMemory {
1999 memory: test_memory(2, "Session digest for sprint review"),
2000 bucket: MemoryBucket::Digests,
2001 blended_score: 0.85,
2002 },
2003 BucketedMemory {
2004 memory: test_memory(3, "Derived insight: auth patterns converge"),
2005 bucket: MemoryBucket::Derived,
2006 blended_score: 0.88,
2007 },
2008 ];
2009
2010 let default_request = WorkingRepresentationRequest::default();
2011 let reasons = build_inclusion_reasons(&bucketed, &default_request);
2012 assert_eq!(reasons.len(), 3);
2013
2014 assert_eq!(reasons[0].memory_id, 1);
2015 assert_eq!(reasons[0].bucket, MemoryBucket::Semantic);
2016 assert!(!reasons[0].reason.is_empty());
2017 assert!(reasons[0].reason.contains("Semantic match"));
2018
2019 assert_eq!(reasons[1].memory_id, 2);
2020 assert_eq!(reasons[1].bucket, MemoryBucket::Digests);
2021 assert!(reasons[1].reason.contains("digest"));
2022
2023 assert_eq!(reasons[2].memory_id, 3);
2024 assert_eq!(reasons[2].bucket, MemoryBucket::Derived);
2025 assert!(reasons[2].reason.contains("derived"));
2026 }
2027
2028 #[test]
2029 fn test_build_excluded_candidates_classifies_exclusion_reasons() {
2030 let explicit_memory = Memory {
2031 id: 10,
2032 namespace_id: 1,
2033 content: "Low-scoring memory that got cut".to_string(),
2034 category: nexus_core::MemoryCategory::Facts,
2035 labels: Vec::new(),
2036 metadata: serde_json::json!({
2037 "cognitive": {
2038 "level": "explicit",
2039 "confidence": 0.85,
2040 "observer": "",
2041 "subject": "",
2042 "generated_by": ""
2043 }
2044 }),
2045 ..Memory::default()
2046 };
2047 let duplicate_memory = test_memory(11, "Another excluded memory with more text content");
2048
2049 let excluded = vec![
2050 RankedExcludedMemory {
2051 memory: explicit_memory,
2052 bucket: MemoryBucket::Recent,
2053 blended_score: 0.30,
2054 reason: ExclusionReason::BudgetTruncation,
2055 },
2056 RankedExcludedMemory {
2057 memory: duplicate_memory,
2058 bucket: MemoryBucket::Semantic,
2059 blended_score: 0.25,
2060 reason: ExclusionReason::Deduplicated,
2061 },
2062 ];
2063
2064 let candidates = build_excluded_candidates(&excluded);
2065 assert_eq!(candidates.len(), 2);
2066
2067 assert_eq!(candidates[0].memory_id, 10);
2068 assert_eq!(candidates[0].reason, ExclusionReason::BudgetTruncation);
2069 assert!(!candidates[0].content_preview.is_empty());
2070
2071 assert_eq!(candidates[1].memory_id, 11);
2072 assert_eq!(candidates[1].reason, ExclusionReason::Deduplicated);
2073 }
2074
2075 #[test]
2076 fn test_build_bucket_stats_correct_counts() {
2077 let included = vec![
2078 BucketedMemory {
2079 memory: test_memory(1, "a"),
2080 bucket: MemoryBucket::Semantic,
2081 blended_score: 0.9,
2082 },
2083 BucketedMemory {
2084 memory: test_memory(2, "b"),
2085 bucket: MemoryBucket::Semantic,
2086 blended_score: 0.8,
2087 },
2088 BucketedMemory {
2089 memory: test_memory(3, "c"),
2090 bucket: MemoryBucket::Derived,
2091 blended_score: 0.7,
2092 },
2093 ];
2094 let excluded = vec![
2095 RankedExcludedMemory {
2096 memory: test_memory(4, "d"),
2097 bucket: MemoryBucket::Semantic,
2098 blended_score: 0.5,
2099 reason: ExclusionReason::BudgetTruncation,
2100 },
2101 RankedExcludedMemory {
2102 memory: test_memory(5, "e"),
2103 bucket: MemoryBucket::Recent,
2104 blended_score: 0.4,
2105 reason: ExclusionReason::BudgetTruncation,
2106 },
2107 ];
2108
2109 let stats = build_bucket_stats(&included, &excluded);
2110
2111 let semantic = stats
2112 .iter()
2113 .find(|s| s.bucket == MemoryBucket::Semantic)
2114 .unwrap();
2115 assert_eq!(semantic.fetched, 3);
2116 assert_eq!(semantic.included, 2);
2117 assert_eq!(semantic.excluded, 1);
2118
2119 let derived = stats
2120 .iter()
2121 .find(|s| s.bucket == MemoryBucket::Derived)
2122 .unwrap();
2123 assert_eq!(derived.fetched, 1);
2124 assert_eq!(derived.included, 1);
2125 assert_eq!(derived.excluded, 0);
2126
2127 let recent = stats
2128 .iter()
2129 .find(|s| s.bucket == MemoryBucket::Recent)
2130 .unwrap();
2131 assert_eq!(recent.fetched, 1);
2132 assert_eq!(recent.included, 0);
2133 assert_eq!(recent.excluded, 1);
2134 }
2135
2136 #[test]
2137 fn test_query_introspection_serialization_contract() {
2138 let introspection = QueryIntrospection {
2139 included: vec![InclusionReason {
2140 memory_id: 1,
2141 bucket: MemoryBucket::Semantic,
2142 phase: "execution".to_string(),
2143 relevance_score: Some(0.87),
2144 blended_score: 0.91,
2145 reason: "Semantic match (score: 0.87) in semantic bucket".to_string(),
2146 signals: vec![InclusionSignal {
2147 signal_type: "semantic_similarity".to_string(),
2148 description: "Embedding similarity score: 0.870".to_string(),
2149 weight_contribution: 0.87,
2150 }],
2151 }],
2152 excluded_candidates: vec![ExcludedCandidate {
2153 memory_id: 2,
2154 bucket: MemoryBucket::Recent,
2155 blended_score: 0.30,
2156 reason: ExclusionReason::BudgetTruncation,
2157 content_preview: "Some content...".to_string(),
2158 }],
2159 relevant_reflections: vec![RelevantReflection {
2160 memory_id: 3,
2161 reflection_type: "derived".to_string(),
2162 content_preview: "Auth patterns converge...".to_string(),
2163 confidence: Some(0.92),
2164 created_at: "2026-03-27T12:00:00Z".to_string(),
2165 }],
2166 bucket_stats: vec![BucketIntrospectionStats {
2167 bucket: MemoryBucket::Semantic,
2168 fetched: 2,
2169 included: 1,
2170 excluded: 1,
2171 }],
2172 pipeline_latency_ms: Some(12),
2173 representation_config: Some(RepresentationConfigSnapshot {
2174 max_items: 20,
2175 include_raw: false,
2176 include_digests: true,
2177 include_semantic: true,
2178 include_derived: true,
2179 include_contradictions: true,
2180 }),
2181 };
2182
2183 let json = serde_json::to_string(&introspection).expect("serialize introspection");
2184 let parsed: QueryIntrospection =
2185 serde_json::from_str(&json).expect("deserialize introspection");
2186
2187 assert_eq!(parsed.included.len(), 1);
2188 assert_eq!(parsed.excluded_candidates.len(), 1);
2189 assert_eq!(parsed.relevant_reflections.len(), 1);
2190 assert_eq!(parsed.bucket_stats.len(), 1);
2191 assert_eq!(
2192 parsed.excluded_candidates[0].reason,
2193 ExclusionReason::BudgetTruncation
2194 );
2195 assert_eq!(parsed.bucket_stats[0].fetched, 2);
2196 assert_eq!(parsed.pipeline_latency_ms, Some(12));
2197 assert!(parsed.representation_config.is_some());
2198 let config = parsed.representation_config.unwrap();
2199 assert_eq!(config.max_items, 20);
2200 assert!(!config.include_raw);
2201 assert!(config.include_digests);
2202 assert_eq!(parsed.included[0].signals.len(), 1);
2204 assert_eq!(
2205 parsed.included[0].signals[0].signal_type,
2206 "semantic_similarity"
2207 );
2208 }
2209
2210 #[test]
2211 fn test_truncate_str_behavior() {
2212 assert_eq!(truncate_str("short", 80), "short");
2213 assert_eq!(truncate_str("hello world", 5), "hello...");
2214 assert_eq!(truncate_str("a longer string here", 10), "a longer s...");
2215 }
2216
2217 #[tokio::test]
2218 async fn test_query_introspection_with_stored_memories() {
2219 let (_pool, repo, namespace_id, perspective) = setup_repo().await;
2220
2221 let explicit = store_memory(
2223 &repo,
2224 namespace_id,
2225 "The authentication module uses JWT tokens for session management",
2226 CognitiveLevel::Explicit,
2227 &perspective,
2228 )
2229 .await;
2230 let _derived = store_memory(
2231 &repo,
2232 namespace_id,
2233 "Derived: JWT token patterns show convergence across microservices",
2234 CognitiveLevel::Derived,
2235 &perspective,
2236 )
2237 .await;
2238
2239 let llm = Arc::new(MockLlmClient::new(vec![])); let service = QueryService::new(llm, AgentConfig::default());
2241
2242 let introspection = service
2243 .query_introspection("authentication tokens", namespace_id, &repo)
2244 .await
2245 .unwrap();
2246
2247 assert!(introspection
2249 .included
2250 .iter()
2251 .any(|i| i.memory_id == explicit.id));
2252 for reason in &introspection.included {
2254 assert!(!reason.reason.is_empty());
2255 assert!(!reason.signals.is_empty());
2256 }
2257 assert!(introspection.pipeline_latency_ms.is_some());
2259 assert!(introspection.representation_config.is_some());
2261 assert!(!introspection.bucket_stats.is_empty());
2263 }
2264
2265 #[tokio::test]
2266 async fn test_query_introspection_surfaces_excluded_candidates_with_small_budget() {
2267 let (_pool, repo, namespace_id, perspective) = setup_repo().await;
2268
2269 store_memory(
2270 &repo,
2271 namespace_id,
2272 "Session cookies replaced JWT for auth",
2273 CognitiveLevel::Explicit,
2274 &perspective,
2275 )
2276 .await;
2277 store_memory(
2278 &repo,
2279 namespace_id,
2280 "Derived auth insight from repeated login failures",
2281 CognitiveLevel::Derived,
2282 &perspective,
2283 )
2284 .await;
2285 store_memory(
2286 &repo,
2287 namespace_id,
2288 "Recent authentication follow-up note",
2289 CognitiveLevel::Explicit,
2290 &perspective,
2291 )
2292 .await;
2293
2294 let service =
2295 QueryService::new(Arc::new(MockLlmClient::new(vec![])), AgentConfig::default());
2296 let request = WorkingRepresentationRequest {
2297 namespace_id,
2298 perspective: Some(perspective.clone()),
2299 query: Some("authentication".to_string()),
2300 max_items: 1,
2301 include_raw: false,
2302 include_recent: true,
2303 include_semantic: true,
2304 include_derived: true,
2305 include_digests: true,
2306 include_contradictions: true,
2307 ..WorkingRepresentationRequest::default()
2308 };
2309
2310 let introspection = service
2311 .introspection_with_representation(&request, "authentication", &repo)
2312 .await
2313 .unwrap();
2314
2315 assert!(!introspection.excluded_candidates.is_empty());
2316 }
2317
2318 #[tokio::test]
2319 async fn test_fetch_relevant_reflections_uses_tokenized_matching() {
2320 let (_pool, repo, namespace_id, perspective) = setup_repo().await;
2321
2322 store_memory(
2323 &repo,
2324 namespace_id,
2325 "Derived insight: session cookies replaced JWT after CSRF review",
2326 CognitiveLevel::Derived,
2327 &perspective,
2328 )
2329 .await;
2330 store_memory(
2331 &repo,
2332 namespace_id,
2333 "Contradiction: older notes still mention JWT login flow",
2334 CognitiveLevel::Contradiction,
2335 &perspective,
2336 )
2337 .await;
2338
2339 let request = WorkingRepresentationRequest {
2340 namespace_id,
2341 perspective: Some(perspective.clone()),
2342 query: Some("why did auth move away from jwt tokens".to_string()),
2343 ..WorkingRepresentationRequest::default()
2344 };
2345
2346 let reflections =
2347 fetch_relevant_reflections("why did auth move away from jwt tokens", &request, &repo)
2348 .await
2349 .unwrap();
2350
2351 assert!(!reflections.is_empty());
2352 assert!(reflections
2353 .iter()
2354 .any(|reflection| reflection.content_preview.to_lowercase().contains("jwt")));
2355 }
2356
2357 #[tokio::test]
2358 async fn test_fetch_relevant_reflections_respects_request_scope() {
2359 let (_pool, repo, namespace_id, perspective) = setup_repo().await;
2360 let other_perspective = PerspectiveKey {
2361 observer: "codex".to_string(),
2362 subject: "codex".to_string(),
2363 session_key: Some("other-session".to_string()),
2364 };
2365
2366 store_memory(
2367 &repo,
2368 namespace_id,
2369 "Derived auth insight from the active scoped session",
2370 CognitiveLevel::Derived,
2371 &perspective,
2372 )
2373 .await;
2374 store_memory(
2375 &repo,
2376 namespace_id,
2377 "Derived auth insight from an unrelated session",
2378 CognitiveLevel::Derived,
2379 &other_perspective,
2380 )
2381 .await;
2382
2383 let request = WorkingRepresentationRequest {
2384 namespace_id,
2385 perspective: Some(perspective.clone()),
2386 query: Some("auth insight".to_string()),
2387 ..WorkingRepresentationRequest::default()
2388 };
2389
2390 let reflections = fetch_relevant_reflections("auth insight", &request, &repo)
2391 .await
2392 .unwrap();
2393
2394 assert_eq!(reflections.len(), 1);
2395 assert!(reflections[0]
2396 .content_preview
2397 .contains("active scoped session"));
2398 }
2399
2400 #[tokio::test]
2401 async fn test_standalone_introspect_query_matches_service_introspection_contract() {
2402 let (_pool, repo, namespace_id, perspective) = setup_repo().await;
2403
2404 for content in [
2405 "Session cookies replaced JWT for auth",
2406 "Derived auth insight from repeated login failures",
2407 "Recent authentication follow-up note",
2408 ] {
2409 store_memory(
2410 &repo,
2411 namespace_id,
2412 content,
2413 if content.starts_with("Derived") {
2414 CognitiveLevel::Derived
2415 } else {
2416 CognitiveLevel::Explicit
2417 },
2418 &perspective,
2419 )
2420 .await;
2421 }
2422
2423 let request = WorkingRepresentationRequest {
2424 namespace_id,
2425 perspective: Some(perspective.clone()),
2426 query: Some("authentication".to_string()),
2427 max_items: 1,
2428 include_raw: false,
2429 include_recent: true,
2430 include_semantic: true,
2431 include_derived: true,
2432 include_digests: true,
2433 include_contradictions: true,
2434 ..WorkingRepresentationRequest::default()
2435 };
2436
2437 let service =
2438 QueryService::new(Arc::new(MockLlmClient::new(vec![])), AgentConfig::default());
2439 let from_service = service
2440 .introspection_with_representation(&request, "authentication", &repo)
2441 .await
2442 .unwrap();
2443 let standalone = introspect_query(&request, "authentication", &repo)
2444 .await
2445 .unwrap();
2446
2447 assert_eq!(
2448 standalone
2449 .excluded_candidates
2450 .iter()
2451 .map(|candidate| {
2452 (
2453 candidate.memory_id,
2454 candidate.bucket,
2455 candidate.reason.clone(),
2456 )
2457 })
2458 .collect::<Vec<_>>(),
2459 from_service
2460 .excluded_candidates
2461 .iter()
2462 .map(|candidate| {
2463 (
2464 candidate.memory_id,
2465 candidate.bucket,
2466 candidate.reason.clone(),
2467 )
2468 })
2469 .collect::<Vec<_>>(),
2470 "standalone introspection should expose the same excluded candidates as service introspection"
2471 );
2472 assert_eq!(
2473 standalone
2474 .bucket_stats
2475 .iter()
2476 .map(|stats| (stats.bucket, stats.fetched, stats.included, stats.excluded))
2477 .collect::<Vec<_>>(),
2478 from_service
2479 .bucket_stats
2480 .iter()
2481 .map(|stats| (stats.bucket, stats.fetched, stats.included, stats.excluded))
2482 .collect::<Vec<_>>()
2483 );
2484 assert_eq!(
2485 standalone
2486 .included
2487 .iter()
2488 .map(|reason| (reason.memory_id, reason.bucket))
2489 .collect::<Vec<_>>(),
2490 from_service
2491 .included
2492 .iter()
2493 .map(|reason| (reason.memory_id, reason.bucket))
2494 .collect::<Vec<_>>()
2495 );
2496 }
2497
2498 #[test]
2499 fn test_query_answer_introspection_field_defaults_to_none() {
2500 let answer = QueryAnswer {
2501 answer: "test".to_string(),
2502 citations: vec![],
2503 confidence: 0.9,
2504 lineages: vec![],
2505 introspection: None,
2506 };
2507
2508 let json = serde_json::to_string(&answer).unwrap();
2509 assert!(!json.contains("introspection"));
2510 }
2511
2512 #[test]
2513 fn test_query_answer_introspection_serializes_when_present() {
2514 let answer = QueryAnswer {
2515 answer: "test".to_string(),
2516 citations: vec![],
2517 confidence: 0.9,
2518 lineages: vec![],
2519 introspection: Some(QueryIntrospection {
2520 included: vec![],
2521 excluded_candidates: vec![],
2522 relevant_reflections: vec![],
2523 bucket_stats: vec![],
2524 pipeline_latency_ms: None,
2525 representation_config: None,
2526 }),
2527 };
2528
2529 let json = serde_json::to_string(&answer).unwrap();
2530 assert!(json.contains("introspection"));
2531 let parsed: QueryAnswer = serde_json::from_str(&json).unwrap();
2532 assert!(parsed.introspection.is_some());
2533 }
2534
2535 #[test]
2540 fn test_inclusion_signals_populated_for_semantic_bucket() {
2541 let semantic_memory = Memory {
2542 id: 42,
2543 namespace_id: 1,
2544 content: "Authentication uses JWT tokens".to_string(),
2545 category: nexus_core::MemoryCategory::Facts,
2546 labels: Vec::new(),
2547 metadata: serde_json::json!({
2548 "cognitive": {
2549 "level": "explicit",
2550 "confidence": 0.90,
2551 "observer": "",
2552 "subject": "",
2553 "generated_by": ""
2554 }
2555 }),
2556 similarity_score: Some(0.85),
2557 ..Memory::default()
2558 };
2559 let bm = BucketedMemory {
2560 memory: semantic_memory,
2561 bucket: MemoryBucket::Semantic,
2562 blended_score: 0.92,
2563 };
2564 let default_request = WorkingRepresentationRequest::default();
2565 let reasons = build_inclusion_reasons(std::slice::from_ref(&bm), &default_request);
2566 assert_eq!(reasons.len(), 1);
2567 assert!(!reasons[0].signals.is_empty());
2568 let signal_types: Vec<&str> = reasons[0]
2570 .signals
2571 .iter()
2572 .map(|s| s.signal_type.as_str())
2573 .collect();
2574 assert!(signal_types.contains(&"recency"));
2575 assert!(signal_types.contains(&"cognitive_level"));
2576 assert!(signal_types.contains(&"semantic_similarity"));
2577 }
2578
2579 #[test]
2580 fn test_inclusion_signals_populated_for_digest_bucket() {
2581 let digest_memory = Memory {
2582 id: 55,
2583 namespace_id: 1,
2584 content: "Session summary: worked on auth module".to_string(),
2585 category: nexus_core::MemoryCategory::Session,
2586 labels: Vec::new(),
2587 metadata: serde_json::json!({
2588 "cognitive": {
2589 "level": "summary_short",
2590 "confidence": 0.80,
2591 "observer": "",
2592 "subject": "",
2593 "generated_by": ""
2594 }
2595 }),
2596 ..Memory::default()
2597 };
2598 let bm = BucketedMemory {
2599 memory: digest_memory,
2600 bucket: MemoryBucket::Digests,
2601 blended_score: 0.88,
2602 };
2603 let default_request = WorkingRepresentationRequest::default();
2604 let reasons = build_inclusion_reasons(&[bm], &default_request);
2605 assert_eq!(reasons.len(), 1);
2606 let signal_types: Vec<&str> = reasons[0]
2607 .signals
2608 .iter()
2609 .map(|s| s.signal_type.as_str())
2610 .collect();
2611 assert!(signal_types.contains(&"bucket_boost"));
2612 }
2613
2614 #[test]
2615 fn test_exclusion_reason_variants_serialize_correctly() {
2616 for (variant, expected) in [
2617 (ExclusionReason::BudgetTruncation, "budget_truncation"),
2618 (
2619 ExclusionReason::ConfidenceBelowThreshold,
2620 "confidence_below_threshold",
2621 ),
2622 (ExclusionReason::Deduplicated, "deduplicated"),
2623 ] {
2624 let json = serde_json::to_string(&variant).unwrap();
2625 assert_eq!(json, format!("\"{expected}\""));
2626 let parsed: ExclusionReason = serde_json::from_str(&json).unwrap();
2627 assert_eq!(parsed, variant);
2628 }
2629 }
2630
2631 #[test]
2632 fn test_representation_config_snapshot_serialization() {
2633 let config = RepresentationConfigSnapshot {
2634 max_items: 30,
2635 include_raw: true,
2636 include_digests: true,
2637 include_semantic: true,
2638 include_derived: false,
2639 include_contradictions: false,
2640 };
2641 let json = serde_json::to_string(&config).unwrap();
2642 assert!(json.contains("\"max_items\":30"));
2643 let parsed: RepresentationConfigSnapshot = serde_json::from_str(&json).unwrap();
2644 assert_eq!(parsed.max_items, 30);
2645 assert!(parsed.include_raw);
2646 assert!(!parsed.include_derived);
2647 }
2648
2649 #[test]
2650 fn test_introspection_optional_fields_default_safely() {
2651 let intro = QueryIntrospection {
2652 included: vec![],
2653 excluded_candidates: vec![],
2654 relevant_reflections: vec![],
2655 bucket_stats: vec![],
2656 pipeline_latency_ms: None,
2657 representation_config: None,
2658 };
2659 let json = serde_json::to_string(&intro).unwrap();
2660 assert!(!json.contains("pipeline_latency_ms"));
2662 assert!(!json.contains("representation_config"));
2663 let parsed: QueryIntrospection = serde_json::from_str(&json).unwrap();
2665 assert!(parsed.pipeline_latency_ms.is_none());
2666 assert!(parsed.representation_config.is_none());
2667 }
2668
2669 #[test]
2670 fn test_exclusion_reason_display_impl() {
2671 assert_eq!(
2672 format!("{}", ExclusionReason::BudgetTruncation),
2673 "budget_truncation"
2674 );
2675 assert_eq!(
2676 format!("{}", ExclusionReason::ConfidenceBelowThreshold),
2677 "confidence_below_threshold"
2678 );
2679 assert_eq!(format!("{}", ExclusionReason::Deduplicated), "deduplicated");
2680 }
2681}