Skip to main content

hirn_engine/ql/
context.rs

1//! Token-aware context assembly pipeline.
2//!
3//! Implements the "last mile" from memory retrieval to LLM-ready context:
4//! layer-priority budget allocation, progressive compression, contradiction
5//! surfacing, and structured formatting.
6
7use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
8use std::fmt::Write;
9
10use arrow_array::{Array, Float32Array, RecordBatch, StringArray, UInt32Array};
11use async_trait::async_trait;
12use serde::{Deserialize, Serialize};
13
14use hirn_core::error::HirnResult;
15use hirn_core::id::MemoryId;
16use hirn_core::record::MemoryRecord;
17use hirn_core::resource::ResourceGovernanceState;
18use hirn_core::revision::{LogicalMemoryId, RecallSnapshot, RevisionId, RevisionState};
19use hirn_core::semantic::SemanticRecord;
20use hirn_core::tokenizer::Tokenizer;
21use hirn_core::types::{AgentId, EdgeRelation, Layer, Namespace};
22use hirn_core::working::WorkingMemoryEntry;
23use hirn_core::{ConflictResolutionPolicy, HirnConfig};
24
25use crate::GraphEdge;
26use crate::db::HirnDB;
27use crate::graph_store::GraphStore;
28use crate::recall::ResourceEvidenceSummary;
29use crate::resource_presentation::{
30    PreviewPackageCache, PreviewPackageSurface, ResourcePreviewPackage, ResourceScoreAttribution,
31    package_resource_preview_packages_for_evidence, resource_preview_packages_to_json,
32    resource_score_attribution_to_json,
33};
34use crate::result_json::{resource_evidence_to_json, resource_hydration_to_json};
35
36use super::results::ScoredMemory;
37
38#[async_trait]
39pub(crate) trait ConflictReadRuntime: Send + Sync {
40    fn config(&self) -> &HirnConfig;
41
42    fn graph_store(&self) -> &dyn GraphStore;
43
44    async fn get_memory(&self, id: MemoryId) -> HirnResult<MemoryRecord>;
45
46    async fn semantic_head_for_logical_id(
47        &self,
48        logical_memory_id: LogicalMemoryId,
49    ) -> HirnResult<SemanticRecord>;
50
51    async fn semantic_revision_for_logical_id_at_snapshot(
52        &self,
53        logical_memory_id: LogicalMemoryId,
54        snapshot: RecallSnapshot,
55    ) -> HirnResult<Option<SemanticRecord>>;
56}
57
58#[async_trait]
59impl ConflictReadRuntime for HirnDB {
60    fn config(&self) -> &HirnConfig {
61        HirnDB::config(self)
62    }
63
64    fn graph_store(&self) -> &dyn GraphStore {
65        HirnDB::graph_store(self)
66    }
67
68    async fn get_memory(&self, id: MemoryId) -> HirnResult<MemoryRecord> {
69        HirnDB::get_memory(self, id).await
70    }
71
72    async fn semantic_head_for_logical_id(
73        &self,
74        logical_memory_id: LogicalMemoryId,
75    ) -> HirnResult<SemanticRecord> {
76        HirnDB::semantic_head_for_logical_id(self, logical_memory_id).await
77    }
78
79    async fn semantic_revision_for_logical_id_at_snapshot(
80        &self,
81        logical_memory_id: LogicalMemoryId,
82        snapshot: RecallSnapshot,
83    ) -> HirnResult<Option<SemanticRecord>> {
84        HirnDB::semantic_revision_for_logical_id_at_snapshot(self, logical_memory_id, snapshot)
85            .await
86    }
87}
88
89// ── Configuration ──────────────────────────────────────────────────────
90
91/// Configuration for context assembly.
92#[derive(Debug, Clone, Copy, PartialEq, Eq)]
93pub struct ContextFeatures(u8);
94
95impl ContextFeatures {
96    const GRAPH_CONTEXT: u8 = 0b0001;
97    const CAUSAL_CHAINS: u8 = 0b0010;
98    const CONTRADICTIONS: u8 = 0b0100;
99    const RESOURCE_PREVIEWS: u8 = 0b1000;
100
101    #[must_use]
102    pub const fn all() -> Self {
103        Self(
104            Self::GRAPH_CONTEXT
105                | Self::CAUSAL_CHAINS
106                | Self::CONTRADICTIONS
107                | Self::RESOURCE_PREVIEWS,
108        )
109    }
110
111    #[must_use]
112    pub const fn empty() -> Self {
113        Self(0)
114    }
115
116    #[must_use]
117    pub const fn include_graph_context(self) -> bool {
118        self.0 & Self::GRAPH_CONTEXT != 0
119    }
120
121    #[must_use]
122    pub const fn include_causal_chains(self) -> bool {
123        self.0 & Self::CAUSAL_CHAINS != 0
124    }
125
126    #[must_use]
127    pub const fn surface_contradictions(self) -> bool {
128        self.0 & Self::CONTRADICTIONS != 0
129    }
130
131    #[must_use]
132    pub const fn package_resource_previews(self) -> bool {
133        self.0 & Self::RESOURCE_PREVIEWS != 0
134    }
135
136    #[must_use]
137    pub const fn with_graph_context(self, enabled: bool) -> Self {
138        if enabled {
139            Self(self.0 | Self::GRAPH_CONTEXT)
140        } else {
141            Self(self.0 & !Self::GRAPH_CONTEXT)
142        }
143    }
144
145    #[must_use]
146    pub const fn with_causal_chains(self, enabled: bool) -> Self {
147        if enabled {
148            Self(self.0 | Self::CAUSAL_CHAINS)
149        } else {
150            Self(self.0 & !Self::CAUSAL_CHAINS)
151        }
152    }
153
154    #[must_use]
155    pub const fn with_surface_contradictions(self, enabled: bool) -> Self {
156        if enabled {
157            Self(self.0 | Self::CONTRADICTIONS)
158        } else {
159            Self(self.0 & !Self::CONTRADICTIONS)
160        }
161    }
162
163    #[must_use]
164    pub const fn with_resource_previews(self, enabled: bool) -> Self {
165        if enabled {
166            Self(self.0 | Self::RESOURCE_PREVIEWS)
167        } else {
168            Self(self.0 & !Self::RESOURCE_PREVIEWS)
169        }
170    }
171}
172
173impl Default for ContextFeatures {
174    fn default() -> Self {
175        Self::all()
176    }
177}
178
179#[derive(Debug, Clone)]
180pub struct ContextConfig {
181    /// Total token budget for the assembled context.
182    pub token_budget: usize,
183    /// Fraction of budget reserved for working memory (0.0–1.0).
184    pub working_memory_reserve: f32,
185    /// Relative weight for semantic vs episodic allocation.
186    pub semantic_weight: f32,
187    /// Score below which summaries replace full content.
188    pub compression_threshold: f32,
189    /// Maximum number of episodic entries to include.
190    pub max_episodic_entries: usize,
191    /// Optional context assembly features.
192    pub features: ContextFeatures,
193    /// Maximum preview packages to attach per included record.
194    pub max_resource_previews_per_entry: usize,
195    /// Maximum characters retained from a packaged preview artifact.
196    pub max_resource_preview_chars: usize,
197    /// Output format for the assembled context.
198    pub output_format: ContextFormat,
199}
200
201impl Default for ContextConfig {
202    fn default() -> Self {
203        Self {
204            token_budget: 4096,
205            working_memory_reserve: 0.2,
206            semantic_weight: 0.6,
207            compression_threshold: 0.4,
208            max_episodic_entries: 50,
209            features: ContextFeatures::all(),
210            max_resource_previews_per_entry: 1,
211            max_resource_preview_chars: 160,
212            output_format: ContextFormat::Structured,
213        }
214    }
215}
216
217impl ContextConfig {
218    #[must_use]
219    pub fn from_hirn_config(cfg: &HirnConfig) -> Self {
220        Self {
221            token_budget: cfg.token_budget as usize,
222            working_memory_reserve: cfg.working_memory_reserve,
223            max_resource_previews_per_entry: cfg.think_preview_package_max_previews,
224            max_resource_preview_chars: cfg.think_preview_package_max_chars,
225            ..Self::default()
226        }
227    }
228}
229
230/// Output format for assembled context.
231#[derive(Debug, Clone, Copy, PartialEq, Eq)]
232pub enum ContextFormat {
233    /// Sectioned output with clear headers.
234    Structured,
235    /// Flowing narrative text.
236    Narrative,
237    /// Machine-readable JSON.
238    Json,
239}
240
241// ── Result types ───────────────────────────────────────────────────────
242
243/// Result of a THINK context assembly.
244#[derive(Debug, Clone)]
245pub struct ThinkResult {
246    /// The assembled context string, ready for LLM consumption.
247    pub context: String,
248    /// Token count of the assembled context.
249    pub token_count: usize,
250    /// IDs of records included in the context.
251    pub records_included: Vec<MemoryId>,
252    /// Number of candidate records that were excluded.
253    pub records_excluded_count: usize,
254    /// Detected contradictions.
255    pub contradictions: Vec<ConflictPair>,
256    /// Grouped contradiction context derived from contradiction edges.
257    pub conflict_groups: Vec<ConflictGroup>,
258    /// Query execution time in milliseconds.
259    pub query_time_ms: f64,
260    /// Score distribution of included records.
261    pub score_distribution: ScoreDistribution,
262}
263
264/// Wire-format for `ContextAssemblyExec` output.
265///
266/// JSON-serialised by the engine's `ScopedContextAssemblyRuntime`, and
267/// deserialised by `decode_compiled_think_assembly_from_batches` in
268/// `query_exec.rs`.  Carries both the assembled context text and the
269/// hydrated `ScoredMemory` records so the engine can reconstruct a full
270/// `QueryResult::Records` response.
271#[derive(Debug, Clone, Serialize, Deserialize)]
272pub struct ThinkAssemblyOutput {
273    /// The assembled context string, ready for LLM consumption.
274    pub context: String,
275    /// Token count of the assembled context.
276    pub token_count: usize,
277    /// Hydrated, scored records (all candidates after post-load filters).
278    pub records: Vec<super::results::ScoredMemory>,
279    /// IDs of records included in the assembled context (subset of `records`).
280    pub records_included: Vec<MemoryId>,
281    /// Number of candidates excluded by token budget.
282    pub records_excluded_count: usize,
283    /// Detected contradictions.
284    pub contradictions: Vec<ConflictPair>,
285    /// Grouped contradiction context.
286    pub conflict_groups: Vec<ConflictGroup>,
287    /// Score distribution.
288    pub score_distribution: ScoreDistribution,
289}
290
291/// A pair of contradicting memories.
292#[derive(Debug, Clone, Serialize, Deserialize)]
293pub struct ConflictPair {
294    pub memory_a: MemoryId,
295    pub memory_b: MemoryId,
296    pub content_a: String,
297    pub content_b: String,
298    pub confidence: f32,
299    pub source_reliability_a: f32,
300    pub source_reliability_b: f32,
301}
302
303/// The per-member state exposed for a grouped belief conflict.
304#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
305pub enum ConflictMemberStatus {
306    Active,
307    Superseded,
308    Retracted,
309    Quarantined,
310    Merged,
311}
312
313/// The current arbitration state for a grouped belief conflict.
314#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
315pub enum ConflictArbitrationStatus {
316    Unresolved,
317    Resolved,
318    Quarantined,
319    Superseded,
320}
321
322/// A visible member of a grouped belief conflict.
323#[derive(Debug, Clone, Serialize, Deserialize)]
324pub struct ConflictMember {
325    pub memory_id: MemoryId,
326    pub logical_memory_id: Option<LogicalMemoryId>,
327    pub revision_id: Option<RevisionId>,
328    pub status: ConflictMemberStatus,
329    pub layer: Layer,
330    pub content: String,
331    pub in_result_set: bool,
332    pub source_reliability: f32,
333    #[serde(skip)]
334    recency_basis_ms: i64,
335}
336
337/// A grouped belief conflict derived from contradiction connected components.
338#[derive(Debug, Clone, Serialize, Deserialize)]
339pub struct ConflictGroup {
340    pub conflict_id: String,
341    pub members: Vec<ConflictMember>,
342    pub omitted_member_count: usize,
343    pub pair_count: usize,
344    pub confidence: f32,
345    pub evidence_count: usize,
346    pub source_reliability: f32,
347    pub arbitration_status: ConflictArbitrationStatus,
348    pub authoritative_memory_id: Option<MemoryId>,
349    pub preferred_memory_id: Option<MemoryId>,
350}
351
352/// Statistics about the scores of included records.
353#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
354pub struct ScoreDistribution {
355    pub min: f32,
356    pub max: f32,
357    pub mean: f32,
358}
359
360impl Default for ScoreDistribution {
361    fn default() -> Self {
362        Self {
363            min: 0.0,
364            max: 0.0,
365            mean: 0.0,
366        }
367    }
368}
369
370// ── Internal types ─────────────────────────────────────────────────────
371
372/// A candidate memory classified for assembly.
373#[derive(Debug, Clone)]
374pub(crate) struct Candidate {
375    id: MemoryId,
376    layer: Layer,
377    full_content: String,
378    summary: String,
379    score: f32,
380    trust_score: f32,
381    /// Test-only instrumentation for classify_candidates().
382    #[cfg_attr(not(test), allow(dead_code))]
383    token_count_full: usize,
384    #[cfg_attr(not(test), allow(dead_code))]
385    token_count_summary: usize,
386    /// Pre-computed token costs for each compression level's composed text.
387    /// Populated by `finalize_candidate_render_tokens` after trust scoring.
388    /// Zero means not yet computed; `select_candidate_render` falls back to
389    /// the tokenizer in that case.
390    tokens_full: usize,
391    tokens_summary: usize,
392    tokens_entity: usize,
393    is_contradiction: bool,
394    entities: Vec<String>,
395    resource_evidence: Vec<ResourceEvidenceSummary>,
396    resource_preview_packages: Vec<ResourcePreviewPackage>,
397    resource_score_attribution: Vec<ResourceScoreAttribution>,
398}
399
400#[derive(Debug, Clone)]
401struct ContextEntry {
402    id: MemoryId,
403    content: String,
404    /// Token cost of `content` in isolation (does not include per-entry format
405    /// overhead such as bullet prefixes or JSON wrappers).  Used by
406    /// `fit_context_to_budget` to estimate total tokens without re-serializing
407    /// the entire context on every binary-search or greedy-trim iteration.
408    token_cost: usize,
409    resource_evidence: Vec<ResourceEvidenceSummary>,
410    resource_preview_packages: Vec<ResourcePreviewPackage>,
411    resource_score_attribution: Vec<ResourceScoreAttribution>,
412}
413
414#[derive(Debug, Clone, Default)]
415struct ContextSections {
416    working_memory: Vec<ContextEntry>,
417    contradictions: Vec<String>,
418    semantic: Vec<ContextEntry>,
419    episodic: Vec<ContextEntry>,
420    procedural: Vec<ContextEntry>,
421    graph_connected: Vec<String>,
422    causal_upstream: Vec<String>,
423}
424
425#[derive(Debug, Clone, Copy)]
426struct ContextSectionLengths {
427    working_memory: usize,
428    contradictions: usize,
429    semantic: usize,
430    episodic: usize,
431    procedural: usize,
432    graph_connected: usize,
433    causal_upstream: usize,
434}
435
436impl ContextSections {
437    fn included_ids(&self) -> Vec<MemoryId> {
438        let mut included_ids = Vec::with_capacity(
439            self.working_memory.len()
440                + self.semantic.len()
441                + self.episodic.len()
442                + self.procedural.len(),
443        );
444        for entry in &self.working_memory {
445            included_ids.push(entry.id);
446        }
447        for entry in &self.semantic {
448            included_ids.push(entry.id);
449        }
450        for entry in &self.episodic {
451            included_ids.push(entry.id);
452        }
453        for entry in &self.procedural {
454            included_ids.push(entry.id);
455        }
456        included_ids
457    }
458
459    fn trimmable_count(&self) -> usize {
460        self.working_memory.len()
461            + self.contradictions.len()
462            + self.semantic.len()
463            + self.episodic.len()
464            + self.procedural.len()
465            + self.graph_connected.len()
466            + self.causal_upstream.len()
467    }
468
469    fn section_lengths(&self) -> ContextSectionLengths {
470        ContextSectionLengths {
471            working_memory: self.working_memory.len(),
472            contradictions: self.contradictions.len(),
473            semantic: self.semantic.len(),
474            episodic: self.episodic.len(),
475            procedural: self.procedural.len(),
476            graph_connected: self.graph_connected.len(),
477            causal_upstream: self.causal_upstream.len(),
478        }
479    }
480
481    fn keep_lengths_after_trim(&self, trim_count: usize) -> ContextSectionLengths {
482        let mut remaining = trim_count;
483        let mut lengths = self.section_lengths();
484
485        trim_section_length(&mut lengths.causal_upstream, &mut remaining);
486        trim_section_length(&mut lengths.graph_connected, &mut remaining);
487        trim_section_length(&mut lengths.procedural, &mut remaining);
488        trim_section_length(&mut lengths.episodic, &mut remaining);
489        trim_section_length(&mut lengths.semantic, &mut remaining);
490        trim_section_length(&mut lengths.contradictions, &mut remaining);
491        trim_section_length(&mut lengths.working_memory, &mut remaining);
492
493        lengths
494    }
495
496    fn truncate_to_lengths(&mut self, lengths: ContextSectionLengths) {
497        self.working_memory.truncate(lengths.working_memory);
498        self.contradictions.truncate(lengths.contradictions);
499        self.semantic.truncate(lengths.semantic);
500        self.episodic.truncate(lengths.episodic);
501        self.procedural.truncate(lengths.procedural);
502        self.graph_connected.truncate(lengths.graph_connected);
503        self.causal_upstream.truncate(lengths.causal_upstream);
504    }
505
506    /// Build the per-entry token cost list in trim priority order (lowest
507    /// priority first).  Within each section the last entry is listed first
508    /// because `keep_lengths_after_trim` removes from the tail.
509    fn compute_formatted_entry_costs(
510        &self,
511        format: ContextFormat,
512        tokenizer: &dyn Tokenizer,
513    ) -> Vec<usize> {
514        let overhead = per_entry_format_overhead(format);
515        let mut costs = Vec::with_capacity(self.trimmable_count());
516
517        match format {
518            ContextFormat::Structured => {
519                // Tokenise the fully-rendered bullet line so the prefix-sum is
520                // exact, removing any BPE-boundary approximation error.
521                for s in self.causal_upstream.iter().rev() {
522                    costs.push(tokenizer.count_tokens(&format!("• {s}\n")));
523                }
524                for s in self.graph_connected.iter().rev() {
525                    costs.push(tokenizer.count_tokens(&format!("• {s}\n")));
526                }
527                for e in self.procedural.iter().rev() {
528                    costs.push(tokenizer.count_tokens(&format!("• {}\n", e.content)));
529                }
530                for e in self.episodic.iter().rev() {
531                    costs.push(tokenizer.count_tokens(&format!("• {}\n", e.content)));
532                }
533                for e in self.semantic.iter().rev() {
534                    costs.push(tokenizer.count_tokens(&format!("• {}\n", e.content)));
535                }
536                for s in self.contradictions.iter().rev() {
537                    costs.push(tokenizer.count_tokens(&format!("{s}\n")));
538                }
539                for e in self.working_memory.iter().rev() {
540                    costs.push(tokenizer.count_tokens(&format!("• {}\n", e.content)));
541                }
542            }
543            ContextFormat::Narrative | ContextFormat::Json => {
544                // Per-entry cost depends on surrounding context for these
545                // formats; use the same overhead constant as the greedy phase.
546                for s in self.causal_upstream.iter().rev() {
547                    costs.push(tokenizer.count_tokens(s) + overhead);
548                }
549                for s in self.graph_connected.iter().rev() {
550                    costs.push(tokenizer.count_tokens(s) + overhead);
551                }
552                for e in self.procedural.iter().rev() {
553                    costs.push(e.token_cost + overhead);
554                }
555                for e in self.episodic.iter().rev() {
556                    costs.push(e.token_cost + overhead);
557                }
558                for e in self.semantic.iter().rev() {
559                    costs.push(e.token_cost + overhead);
560                }
561                for s in self.contradictions.iter().rev() {
562                    costs.push(tokenizer.count_tokens(s) + overhead);
563                }
564                for e in self.working_memory.iter().rev() {
565                    costs.push(e.token_cost + overhead);
566                }
567            }
568        }
569
570        costs
571    }
572}
573
574fn trim_section_length(length: &mut usize, remaining: &mut usize) {
575    let trimmed = (*length).min(*remaining);
576    *length -= trimmed;
577    *remaining -= trimmed;
578}
579
580/// Compression level applied to a candidate.
581#[derive(Debug, Clone, Copy, PartialEq, Eq)]
582enum CompressionLevel {
583    Full,
584    Summary,
585    EntityOnly,
586}
587
588/// F-43: Budget allocation per section.
589///
590/// Implements CONCEPT §10.2 tiered budget allocation:
591/// working memory (mandatory) → contradictions → direct results (semantic +
592/// episodic + procedural) → graph-connected neighbors → causal-upstream.
593#[derive(Debug, Clone)]
594struct BudgetAllocation {
595    working_memory: usize,
596    contradictions: usize,
597    semantic: usize,
598    episodic: usize,
599    procedural: usize,
600    graph_connected: usize,
601    causal_upstream: usize,
602}
603
604#[derive(Debug, Clone, Default)]
605pub(crate) struct ConflictSummary {
606    pub pairs: Vec<ConflictPair>,
607    pub groups: Vec<ConflictGroup>,
608}
609
610#[derive(Debug, Clone)]
611struct ConflictEdgeMeta {
612    a: MemoryId,
613    b: MemoryId,
614    confidence: f32,
615    evidence_count: usize,
616    resolved: bool,
617}
618
619const FALLBACK_CONTRADICTION_CONFIDENCE: f32 = 0.5;
620
621// ── Assembly pipeline ──────────────────────────────────────────────────
622
623/// Assemble context from scored memories with full pipeline:
624/// retrieve working memory → detect contradictions → allocate budget →
625/// compress → format.
626pub async fn assemble_think_context(
627    db: &HirnDB,
628    actor_id: &AgentId,
629    candidates: &[ScoredMemory],
630    config: &ContextConfig,
631    visible_namespaces: Option<&[Namespace]>,
632    // Optional wider pool of already-loaded records. When provided, graph
633    // and causal neighbor content is served from this pool before any Lance
634    // I/O, typically eliminating the second batch hydration round-trip.
635    content_pool: Option<&[ScoredMemory]>,
636    // Arrow-native fast path: raw RecordBatches from the DataFusion pipeline
637    // (ContextBudgetExec output). When provided, Candidates are built directly
638    // from Arrow columns, skipping the secondary Lance hydration round-trip and
639    // trust-score computation (~5.5 ms saved). `candidates` is unused when this
640    // is Some.
641    raw_batches: Option<&[RecordBatch]>,
642) -> HirnResult<ThinkResult> {
643    let tokenizer = db.tokenizer();
644
645    // 1 + 2. Retrieve working memory in parallel with candidate classification,
646    // trust scoring, and token-cost pre-computation. These are independent: working
647    // memory is a separate Lance scan; classification + trust are CPU + hot-graph.
648    let (working_entries, mut classified) = tokio::join!(
649        async { db.working_memory().await.unwrap_or_default() },
650        async {
651            if let Some(batches) = raw_batches {
652                // Arrow fast path: build Candidates from Arrow columns directly.
653                // token_count is pre-populated from ContextBudgetExec output.
654                // Trust score defaults to 1.0; entities are empty.
655                let mut classified = candidates_from_batches(batches, config.token_budget);
656
657                // Preserve resource-side evidence captured during recall hydration.
658                // The Arrow payload does not currently carry these fields, but
659                // JSON assembly (preview packages / score attribution) depends on them.
660                let evidence_by_id: HashMap<
661                    MemoryId,
662                    (
663                        Vec<ResourceEvidenceSummary>,
664                        Vec<ResourcePreviewPackage>,
665                        Vec<ResourceScoreAttribution>,
666                    ),
667                > = candidates
668                    .iter()
669                    .map(|scored| {
670                        (
671                            scored.record.id(),
672                            (
673                                scored.resource_evidence.clone(),
674                                scored.resource_preview_packages.clone(),
675                                scored.resource_score_attribution.clone(),
676                            ),
677                        )
678                    })
679                    .collect();
680                for candidate in &mut classified {
681                    if let Some((
682                        resource_evidence,
683                        resource_preview_packages,
684                        resource_score_attribution,
685                    )) = evidence_by_id.get(&candidate.id)
686                    {
687                        candidate.resource_evidence.clone_from(resource_evidence);
688                        candidate
689                            .resource_preview_packages
690                            .clone_from(resource_preview_packages);
691                        candidate
692                            .resource_score_attribution
693                            .clone_from(resource_score_attribution);
694                    }
695                }
696
697                // Still run finalize so tokens_entity and any zero tokens_full/
698                // tokens_summary are computed (skips non-zero pre-populated values).
699                finalize_candidate_render_tokens(&mut classified, tokenizer.as_ref());
700                classified
701            } else {
702                // Classic path: decode ScoredMemory records, compute trust scores.
703                let mut classified = classify_candidates(candidates, tokenizer.as_ref());
704                compute_trust_scores(db, candidates, &mut classified).await;
705                finalize_candidate_render_tokens(&mut classified, tokenizer.as_ref());
706                classified
707            }
708        }
709    );
710    let sorted_direct_candidates = prepare_sorted_direct_candidates(&classified);
711
712    // 3. Allocate a preliminary budget without contradiction reserve and use it
713    // to bound the direct-result slice that can possibly survive the first fit.
714    let preliminary_allocation = allocate_budget(
715        config,
716        &working_entries,
717        &[],
718        &classified,
719        tokenizer.as_ref(),
720    );
721    let (preliminary_semantic, preliminary_episodic, preliminary_procedural) =
722        build_direct_sections(
723            &sorted_direct_candidates,
724            &preliminary_allocation,
725            config,
726            tokenizer.as_ref(),
727        );
728
729    // 4. Concurrently: detect contradictions among preliminary-section candidates
730    //    AND speculatively build graph/causal sections from those same seeds.
731    //
732    //    Both are async I/O operations with no shared mutable state:
733    //    - `collect_conflict_summary` queries Cedar + graph contradiction edges
734    //    - `build_graph_and_causal_sections` hydrates graph/causal neighbor content
735    //
736    //    In the common case (no contradictions) the speculative result is used
737    //    directly, overlapping ~5-10 ms of graph hydration with ~5-10 ms of
738    //    contradiction detection. When contradictions DO change the final seed,
739    //    only the graph/causal sections are rebuilt (rare path).
740    let preliminary_seed_ids = collect_direct_section_ids(
741        &preliminary_semantic,
742        &preliminary_episodic,
743        &preliminary_procedural,
744    );
745    let preliminary_seed_candidates: Vec<ScoredMemory> = candidates
746        .iter()
747        .filter(|c| preliminary_seed_ids.contains(&c.record.id()))
748        .cloned()
749        .collect();
750
751    let effective_pool = content_pool.unwrap_or(candidates);
752    let pre_needs_graph = config.features.include_graph_context()
753        && preliminary_allocation.graph_connected > 0
754        && !preliminary_seed_candidates.is_empty();
755    let pre_needs_causal = config.features.include_causal_chains()
756        && preliminary_allocation.causal_upstream > 0
757        && !preliminary_seed_candidates.is_empty();
758
759    let (conflict_summary, speculative_graph_causal) = tokio::join!(
760        async {
761            if config.features.surface_contradictions() && !preliminary_seed_ids.is_empty() {
762                let scoped_candidates = candidates
763                    .iter()
764                    .filter(|c| preliminary_seed_ids.contains(&c.record.id()))
765                    .cloned()
766                    .collect::<Vec<_>>();
767                collect_conflict_summary(db, &scoped_candidates, visible_namespaces, None).await
768            } else {
769                ConflictSummary::default()
770            }
771        },
772        async {
773            if pre_needs_graph || pre_needs_causal {
774                build_graph_and_causal_sections(
775                    db,
776                    &preliminary_seed_candidates,
777                    effective_pool,
778                    if pre_needs_graph {
779                        preliminary_allocation.graph_connected
780                    } else {
781                        0
782                    },
783                    if pre_needs_causal {
784                        preliminary_allocation.causal_upstream
785                    } else {
786                        0
787                    },
788                    tokenizer.as_ref(),
789                )
790                .await
791            } else {
792                (Vec::new(), Vec::new())
793            }
794        }
795    );
796
797    // Apply is_contradiction marks synchronously (no I/O — O(N) scan).
798    // `sorted_direct_candidates` borrows `classified`; drop it so we can mutate.
799    drop(sorted_direct_candidates);
800    let contradiction_ids: HashSet<MemoryId> = conflict_summary
801        .groups
802        .iter()
803        .flat_map(|g| g.members.iter().map(|m| m.memory_id))
804        .collect();
805    for candidate in &mut classified {
806        if contradiction_ids.contains(&candidate.id) {
807            candidate.is_contradiction = true;
808        }
809    }
810    // Recreate the sorted view now that `is_contradiction` marks are applied.
811    let sorted_direct_candidates = prepare_sorted_direct_candidates(&classified);
812
813    // 5. Allocate the final budget with contradiction overhead included.
814    let allocation = allocate_budget(
815        config,
816        &working_entries,
817        &conflict_summary.groups,
818        &classified,
819        tokenizer.as_ref(),
820    );
821
822    // 6. Select and compress candidates within budget.
823    let (working_section, _wm_tokens) = build_working_memory_section(
824        &working_entries,
825        allocation.working_memory,
826        tokenizer.as_ref(),
827    );
828
829    let (contradiction_section, _contra_tokens) = if config.features.surface_contradictions() {
830        build_contradiction_section(
831            &conflict_summary.groups,
832            allocation.contradictions,
833            tokenizer.as_ref(),
834        )
835    } else {
836        (Vec::new(), 0)
837    };
838
839    let (semantic_section, episodic_section, procedural_section) =
840        if config.features.surface_contradictions() && !conflict_summary.groups.is_empty() {
841            // Contradictions were found; the final allocation includes contradiction
842            // overhead so the direct-section budgets may differ from the preliminary.
843            build_direct_sections(
844                &sorted_direct_candidates,
845                &allocation,
846                config,
847                tokenizer.as_ref(),
848            )
849        } else {
850            // No contradictions found, or feature disabled: reuse the preliminary
851            // sections already computed above.  Avoids a second section-building pass.
852            (
853                preliminary_semantic,
854                preliminary_episodic,
855                preliminary_procedural,
856            )
857        };
858
859    // Use the speculative graph/causal sections when the final direct-section seed
860    // matches the preliminary seed (common case: no contradictions or contradictions
861    // did not alter the survivor set). When the seed changed (rare: contradictions
862    // shifted the allocation), rebuild from the corrected seed.
863    let helper_seed_candidate_ids =
864        collect_direct_section_ids(&semantic_section, &episodic_section, &procedural_section);
865    let (graph_section, causal_section) = if helper_seed_candidate_ids == preliminary_seed_ids {
866        // Speculative sections are valid — use without extra I/O.
867        speculative_graph_causal
868    } else {
869        // Rare path: rebuild from the corrected final seed.
870        let helper_seed_candidates = candidates
871            .iter()
872            .filter(|c| helper_seed_candidate_ids.contains(&c.record.id()))
873            .cloned()
874            .collect::<Vec<_>>();
875        let needs_graph = config.features.include_graph_context()
876            && allocation.graph_connected > 0
877            && !helper_seed_candidates.is_empty();
878        let needs_causal = config.features.include_causal_chains()
879            && allocation.causal_upstream > 0
880            && !helper_seed_candidates.is_empty();
881        if needs_graph || needs_causal {
882            build_graph_and_causal_sections(
883                db,
884                &helper_seed_candidates,
885                effective_pool,
886                if needs_graph {
887                    allocation.graph_connected
888                } else {
889                    0
890                },
891                if needs_causal {
892                    allocation.causal_upstream
893                } else {
894                    0
895                },
896                tokenizer.as_ref(),
897            )
898            .await
899        } else {
900            (Vec::new(), Vec::new())
901        }
902    };
903
904    // 7. Fit formatted output to budget by trimming the section model rather than
905    // truncating the rendered string, so structured formats remain syntactically valid.
906    let mut sections = ContextSections {
907        working_memory: working_section,
908        contradictions: contradiction_section,
909        semantic: semantic_section,
910        episodic: episodic_section,
911        procedural: procedural_section,
912        graph_connected: graph_section,
913        causal_upstream: causal_section,
914    };
915
916    if should_package_resource_previews(config) {
917        // Trim once before hydrating previews so we do not fetch/package preview
918        // artifacts for entries that are definitely outside the initial budget.
919        let _ = fit_context_to_budget(
920            config.output_format,
921            &mut sections,
922            config.token_budget,
923            tokenizer.as_ref(),
924        );
925        hydrate_selected_resource_previews(db, actor_id, &mut sections, config).await?;
926    }
927    let final_context = fit_context_to_budget(
928        config.output_format,
929        &mut sections,
930        config.token_budget,
931        tokenizer.as_ref(),
932    );
933    let final_tokens = tokenizer.count_tokens(&final_context);
934
935    // 8. Collect included/excluded IDs after budget fitting.
936    let included_ids = sections.included_ids();
937    let total_candidates = candidates.len();
938    let records_excluded_count = total_candidates.saturating_sub(included_ids.len());
939
940    // 9. Compute score distribution for the records that survived budget fitting.
941    let score_distribution = compute_score_distribution(candidates, &included_ids);
942
943    Ok(ThinkResult {
944        context: final_context,
945        token_count: final_tokens,
946        records_included: included_ids,
947        records_excluded_count,
948        contradictions: conflict_summary.pairs,
949        conflict_groups: conflict_summary.groups,
950        query_time_ms: 0.0, // caller sets this
951        score_distribution,
952    })
953}
954
955// ── Step 2: Classify candidates ────────────────────────────────────────
956
957#[cfg(test)]
958fn classify_token_counts(
959    full_content: &str,
960    summary: &str,
961    tokenizer: &dyn Tokenizer,
962) -> (usize, usize) {
963    (
964        tokenizer.count_tokens(full_content),
965        tokenizer.count_tokens(summary),
966    )
967}
968
969#[cfg(not(test))]
970fn classify_token_counts(
971    _full_content: &str,
972    _summary: &str,
973    _tokenizer: &dyn Tokenizer,
974) -> (usize, usize) {
975    (0, 0)
976}
977
978/// Build `Candidate` structs directly from Arrow `RecordBatch`es produced by
979/// the DataFusion THINK pipeline (`ContextBudgetExec` output).
980///
981/// This is the Arrow-native fast path that eliminates the secondary Lance
982/// round-trip. The batch schema must include at minimum: `id`, `content`,
983/// `full_content`, `layer`, `score`, `importance`.  Optional columns read
984/// when present: `token_count` (pre-computed by `ContextBudgetExec`),
985/// `assembly_mode`.
986///
987/// Trade-offs vs the ScoredMemory path:
988/// - `trust_score` defaults to 1.0 (no provenance graph lookup).
989/// - `entities = []` (entity-only compression emits empty string; acceptable
990///   because entity-only entries are extremely low-score and rarely shown).
991/// - Resource evidence fields are empty (not available from Arrow).
992pub(crate) fn candidates_from_batches(batches: &[RecordBatch], limit: usize) -> Vec<Candidate> {
993    let mut result = Vec::new();
994
995    'outer: for batch in batches {
996        if batch.num_rows() == 0 {
997            continue;
998        }
999
1000        let Some(ids) = batch
1001            .column_by_name("id")
1002            .and_then(|c| c.as_any().downcast_ref::<StringArray>())
1003        else {
1004            continue;
1005        };
1006        let Some(contents) = batch
1007            .column_by_name("content")
1008            .and_then(|c| c.as_any().downcast_ref::<StringArray>())
1009        else {
1010            continue;
1011        };
1012        let full_contents = batch
1013            .column_by_name("full_content")
1014            .and_then(|c| c.as_any().downcast_ref::<StringArray>());
1015        let Some(layers) = batch
1016            .column_by_name("layer")
1017            .and_then(|c| c.as_any().downcast_ref::<StringArray>())
1018        else {
1019            continue;
1020        };
1021        let Some(scores) = batch
1022            .column_by_name("score")
1023            .and_then(|c| c.as_any().downcast_ref::<Float32Array>())
1024        else {
1025            continue;
1026        };
1027        let importances = batch
1028            .column_by_name("importance")
1029            .and_then(|c| c.as_any().downcast_ref::<Float32Array>());
1030        // `token_count` emitted by `ContextBudgetExec` — represents tokens for
1031        // the `content` (display/summary) column.
1032        let token_counts = batch
1033            .column_by_name("token_count")
1034            .and_then(|c| c.as_any().downcast_ref::<UInt32Array>());
1035
1036        for row in 0..batch.num_rows() {
1037            if result.len() >= limit {
1038                break 'outer;
1039            }
1040
1041            let id_str = ids.value(row);
1042            let Ok(id) = MemoryId::parse(id_str) else {
1043                continue;
1044            };
1045
1046            let content = contents.value(row).to_string();
1047            let full_content = full_contents
1048                .map(|fc| fc.value(row).to_string())
1049                .unwrap_or_else(|| content.clone());
1050
1051            let layer = match layers.value(row) {
1052                "episodic" => Layer::Episodic,
1053                "semantic" => Layer::Semantic,
1054                "procedural" => Layer::Procedural,
1055                "working" => Layer::Working,
1056                _ => Layer::Semantic,
1057            };
1058
1059            let raw_score = if scores.is_null(row) {
1060                0.0_f32
1061            } else {
1062                scores.value(row)
1063            };
1064            // When the composite score is absent or zero (e.g. MemoryStore test
1065            // environments where no real ANN similarity is computed), fall back to
1066            // `importance` so that `determine_compression` can still distinguish
1067            // high-importance records from low-importance ones.  In production
1068            // (Lance with real vector search) raw_score is always > 0.
1069            let score = if raw_score == 0.0 {
1070                importances
1071                    .and_then(|imp| {
1072                        if imp.is_null(row) {
1073                            None
1074                        } else {
1075                            Some(imp.value(row))
1076                        }
1077                    })
1078                    .unwrap_or(0.0)
1079            } else {
1080                raw_score
1081            };
1082
1083            // Pre-populated token count from ContextBudgetExec (tokens for
1084            // the display/summary text). Use as tokens_summary for all layers.
1085            // Use as tokens_full only when content == full_content (i.e. no
1086            // truncation happened — semantic, procedural, short episodics).
1087            let pre_tokens = match token_counts {
1088                Some(tc) if !tc.is_null(row) => tc.value(row) as usize,
1089                _ => 0,
1090            };
1091            let same_content = content == full_content;
1092            let tokens_full = if same_content && pre_tokens > 0 {
1093                pre_tokens
1094            } else {
1095                0 // will be filled in by finalize_candidate_render_tokens
1096            };
1097            let tokens_summary = pre_tokens;
1098
1099            // summary = content (display text); full_content = raw text
1100            let summary = content;
1101
1102            result.push(Candidate {
1103                id,
1104                layer,
1105                full_content,
1106                summary,
1107                score,
1108                trust_score: 1.0,
1109                token_count_full: tokens_full,
1110                token_count_summary: tokens_summary,
1111                tokens_full,
1112                tokens_summary,
1113                tokens_entity: 0, // always recomputed (fast: entities = [])
1114                is_contradiction: false,
1115                entities: vec![],
1116                resource_evidence: vec![],
1117                resource_preview_packages: vec![],
1118                resource_score_attribution: vec![],
1119            });
1120        }
1121    }
1122
1123    result
1124}
1125
1126fn classify_candidates(candidates: &[ScoredMemory], tokenizer: &dyn Tokenizer) -> Vec<Candidate> {
1127    candidates
1128        .iter()
1129        .map(|sm| {
1130            let (full_content, summary, entities) = match &sm.record {
1131                MemoryRecord::Episodic(e) => {
1132                    let entities: Vec<String> =
1133                        e.entities.iter().map(|er| er.name.clone()).collect();
1134                    (e.content.clone(), e.summary.clone(), entities)
1135                }
1136                MemoryRecord::Semantic(s) => (s.description.clone(), s.concept.clone(), vec![]),
1137                MemoryRecord::Working(w) => (w.content.clone(), w.content.clone(), vec![]),
1138                MemoryRecord::Procedural(p) => (p.description.clone(), p.name.clone(), vec![]),
1139            };
1140
1141            let (token_count_full, token_count_summary) =
1142                classify_token_counts(&full_content, &summary, tokenizer);
1143
1144            Candidate {
1145                id: sm.record.id(),
1146                layer: sm.record.layer(),
1147                full_content,
1148                summary,
1149                score: sm.score,
1150                trust_score: 1.0,
1151                token_count_full,
1152                token_count_summary,
1153                // Populated later by finalize_candidate_render_tokens().
1154                tokens_full: 0,
1155                tokens_summary: 0,
1156                tokens_entity: 0,
1157                is_contradiction: false,
1158                entities,
1159                resource_evidence: sm.resource_evidence.clone(),
1160                resource_preview_packages: sm.resource_preview_packages.clone(),
1161                resource_score_attribution: sm.resource_score_attribution.clone(),
1162            }
1163        })
1164        .collect()
1165}
1166
1167/// Pre-compute token costs for all three compression levels of each candidate.
1168///
1169/// Must be called after `compute_trust_scores` because `compose_candidate_text`
1170/// includes a low-trust prefix when `trust_score < 0.5`.  The cached counts
1171/// are used by `select_candidate_render` to avoid repeated tokenizer calls.
1172///
1173/// Skips `tokens_full` and `tokens_summary` when already pre-populated (i.e.
1174/// non-zero) from the Arrow fast path — `candidates_from_batches` fills these
1175/// in from the `token_count` column emitted by `ContextBudgetExec`.
1176fn finalize_candidate_render_tokens(classified: &mut [Candidate], tokenizer: &dyn Tokenizer) {
1177    for candidate in classified.iter_mut() {
1178        if candidate.tokens_full == 0 {
1179            candidate.tokens_full =
1180                tokenizer.count_tokens(&compose_candidate_text(candidate, CompressionLevel::Full));
1181        }
1182        if candidate.tokens_summary == 0 {
1183            candidate.tokens_summary = tokenizer.count_tokens(&compose_candidate_text(
1184                candidate,
1185                CompressionLevel::Summary,
1186            ));
1187        }
1188        // tokens_entity is always recomputed: entities is typically empty in the
1189        // Arrow fast path, making this call trivially fast.
1190        candidate.tokens_entity = tokenizer.count_tokens(&compose_candidate_text(
1191            candidate,
1192            CompressionLevel::EntityOnly,
1193        ));
1194    }
1195}
1196
1197/// Compute trust scores for classified candidates using provenance and graph data.
1198async fn compute_trust_scores(
1199    db: &HirnDB,
1200    candidates: &[ScoredMemory],
1201    classified: &mut [Candidate],
1202) {
1203    let graph = db.graph_store();
1204    let candidate_ids = candidates
1205        .iter()
1206        .map(|sm| sm.record.id())
1207        .collect::<Vec<_>>();
1208    let contradiction_edges =
1209        get_relation_edges_best_effort(graph, &candidate_ids, EdgeRelation::Contradicts).await;
1210
1211    for (i, sm) in candidates.iter().enumerate() {
1212        let provenance = match &sm.record {
1213            MemoryRecord::Episodic(e) => Some(&e.provenance),
1214            MemoryRecord::Semantic(s) => Some(&s.provenance),
1215            MemoryRecord::Working(_) => None,
1216            MemoryRecord::Procedural(p) => Some(&p.provenance),
1217        };
1218        if let Some(prov) = provenance {
1219            let contra_count = contradiction_edges.get(&sm.record.id()).map_or(0, Vec::len);
1220            classified[i].trust_score = crate::causal::compute_trust_score(prov, contra_count);
1221        }
1222    }
1223}
1224
1225async fn get_relation_edges_best_effort(
1226    graph: &dyn GraphStore,
1227    node_ids: &[MemoryId],
1228    relation: EdgeRelation,
1229) -> HashMap<MemoryId, Vec<GraphEdge>> {
1230    match graph.get_edges_of_type_many(node_ids, relation).await {
1231        Ok(edges) => edges,
1232        Err(_) => {
1233            let mut edges_by_node = HashMap::with_capacity(node_ids.len());
1234            for &node_id in node_ids {
1235                let edges = graph
1236                    .get_edges_of_type(node_id, relation)
1237                    .await
1238                    .unwrap_or_default();
1239                if !edges.is_empty() {
1240                    edges_by_node.insert(node_id, edges);
1241                }
1242            }
1243            edges_by_node
1244        }
1245    }
1246}
1247
1248// ── Step 3: Detect contradictions ──────────────────────────────────────
1249
1250fn extract_content_str(record: &MemoryRecord) -> &str {
1251    match record {
1252        MemoryRecord::Episodic(e) => &e.content,
1253        MemoryRecord::Semantic(s) => &s.description,
1254        MemoryRecord::Working(w) => &w.content,
1255        MemoryRecord::Procedural(p) => &p.description,
1256    }
1257}
1258
1259// ── WITH CONFLICTS for RECALL ──────────────────────────────────────────
1260
1261/// Detect contradictions among recall results by querying `Contradicts` edges.
1262///
1263/// Returns grouped conflict context plus compatibility `ConflictPair`s.
1264pub(crate) async fn detect_conflicts_for_recall(
1265    db: &HirnDB,
1266    candidates: &[ScoredMemory],
1267    visible_namespaces: Option<&[Namespace]>,
1268    snapshot: Option<RecallSnapshot>,
1269) -> ConflictSummary {
1270    collect_conflict_summary(db, candidates, visible_namespaces, snapshot).await
1271}
1272
1273pub(crate) async fn detect_conflicts_for_record(
1274    db: &HirnDB,
1275    record: &MemoryRecord,
1276    visible_namespaces: Option<&[Namespace]>,
1277) -> ConflictSummary {
1278    detect_conflicts_for_record_with_runtime(db, record, visible_namespaces).await
1279}
1280
1281pub(crate) async fn detect_conflicts_for_record_with_runtime<R>(
1282    runtime: &R,
1283    record: &MemoryRecord,
1284    visible_namespaces: Option<&[Namespace]>,
1285) -> ConflictSummary
1286where
1287    R: ConflictReadRuntime + ?Sized,
1288{
1289    collect_conflict_summary_for_record(
1290        runtime,
1291        record,
1292        visible_namespaces,
1293        RecordConflictResolution::Live,
1294    )
1295    .await
1296}
1297
1298pub(crate) async fn detect_conflicts_for_exact_record(
1299    db: &HirnDB,
1300    record: &MemoryRecord,
1301    visible_namespaces: Option<&[Namespace]>,
1302) -> ConflictSummary {
1303    detect_conflicts_for_exact_record_with_runtime(db, record, visible_namespaces).await
1304}
1305
1306pub(crate) async fn detect_conflicts_for_exact_record_with_runtime<R>(
1307    runtime: &R,
1308    record: &MemoryRecord,
1309    visible_namespaces: Option<&[Namespace]>,
1310) -> ConflictSummary
1311where
1312    R: ConflictReadRuntime + ?Sized,
1313{
1314    collect_conflict_summary_for_record(
1315        runtime,
1316        record,
1317        visible_namespaces,
1318        RecordConflictResolution::Exact,
1319    )
1320    .await
1321}
1322
1323#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1324enum RecordConflictResolution {
1325    Live,
1326    Exact,
1327}
1328
1329async fn collect_conflict_summary<R>(
1330    db: &R,
1331    candidates: &[ScoredMemory],
1332    visible_namespaces: Option<&[Namespace]>,
1333    snapshot: Option<RecallSnapshot>,
1334) -> ConflictSummary
1335where
1336    R: ConflictReadRuntime + ?Sized,
1337{
1338    let visible_members: BTreeMap<MemoryId, ConflictMember> = candidates
1339        .iter()
1340        .map(|candidate| {
1341            let member = conflict_member_from_scored(candidate, true);
1342            (member.memory_id, member)
1343        })
1344        .collect();
1345
1346    if visible_members.is_empty() {
1347        return ConflictSummary::default();
1348    }
1349
1350    let mut human_override_members = HashSet::new();
1351    let mut semantic_contradictions = BTreeMap::new();
1352    for candidate in candidates {
1353        if let MemoryRecord::Semantic(record) = &candidate.record {
1354            if semantic_record_has_human_override(record) {
1355                human_override_members.insert(record.id);
1356            }
1357            if !record.contradiction_ids.is_empty() {
1358                semantic_contradictions.insert(record.id, record.contradiction_ids.clone());
1359            }
1360        }
1361    }
1362
1363    let mut seen_pairs: HashSet<(MemoryId, MemoryId)> = HashSet::new();
1364    let mut adjacency: BTreeMap<MemoryId, Vec<MemoryId>> = BTreeMap::new();
1365    let mut namespace_cache: BTreeMap<MemoryId, Option<Namespace>> = candidates
1366        .iter()
1367        .map(|candidate| {
1368            (
1369                candidate.record.id(),
1370                Some(candidate.record.effective_namespace()),
1371            )
1372        })
1373        .collect();
1374    let mut pair_edges = Vec::new();
1375    let mut visible_pairs = Vec::new();
1376
1377    let graph = db.graph_store();
1378    let visible_ids = visible_members.keys().copied().collect::<Vec<_>>();
1379    let contradiction_edges =
1380        get_relation_edges_best_effort(graph, &visible_ids, EdgeRelation::Contradicts).await;
1381
1382    for id in visible_ids {
1383        let Some(edges) = contradiction_edges.get(&id) else {
1384            continue;
1385        };
1386
1387        for edge in edges {
1388            let other_id = if edge.source == id {
1389                edge.target
1390            } else {
1391                edge.source
1392            };
1393            if !conflict_node_is_visible(graph, other_id, visible_namespaces, &mut namespace_cache)
1394                .await
1395            {
1396                continue;
1397            }
1398
1399            let pair = normalize_conflict_pair(id, other_id);
1400            if !seen_pairs.insert(pair) {
1401                continue;
1402            }
1403
1404            let confidence = edge.confidence().unwrap_or(edge.weight).clamp(0.0, 1.0);
1405            let evidence_count = edge
1406                .evidence_count()
1407                .unwrap_or(1)
1408                .max(1)
1409                .try_into()
1410                .unwrap_or(1);
1411
1412            push_conflict_pair_edge(
1413                pair,
1414                confidence,
1415                evidence_count,
1416                edge.resolved,
1417                &mut adjacency,
1418                &mut pair_edges,
1419            );
1420
1421            if let (Some(member_a), Some(member_b)) =
1422                (visible_members.get(&pair.0), visible_members.get(&pair.1))
1423            {
1424                visible_pairs.push(ConflictPair {
1425                    memory_a: pair.0,
1426                    memory_b: pair.1,
1427                    content_a: member_a.content.clone(),
1428                    content_b: member_b.content.clone(),
1429                    confidence,
1430                    source_reliability_a: member_a.source_reliability,
1431                    source_reliability_b: member_b.source_reliability,
1432                });
1433            }
1434        }
1435    }
1436
1437    for (source_id, contradiction_ids) in semantic_contradictions {
1438        for contradiction_id in contradiction_ids {
1439            let Some(target_record) = resolve_conflict_target_record(
1440                db,
1441                contradiction_id,
1442                visible_namespaces,
1443                snapshot,
1444                false,
1445            )
1446            .await
1447            else {
1448                continue;
1449            };
1450            let target_id = target_record.id();
1451            if !visible_members.contains_key(&target_id) {
1452                continue;
1453            }
1454            if let MemoryRecord::Semantic(record) = &target_record {
1455                if semantic_record_has_human_override(record) {
1456                    human_override_members.insert(record.id);
1457                }
1458            }
1459
1460            let pair = normalize_conflict_pair(source_id, target_id);
1461            if !seen_pairs.insert(pair) {
1462                continue;
1463            }
1464
1465            push_conflict_pair_edge(
1466                pair,
1467                FALLBACK_CONTRADICTION_CONFIDENCE,
1468                1,
1469                false,
1470                &mut adjacency,
1471                &mut pair_edges,
1472            );
1473
1474            if let (Some(member_a), Some(member_b)) =
1475                (visible_members.get(&pair.0), visible_members.get(&pair.1))
1476            {
1477                visible_pairs.push(ConflictPair {
1478                    memory_a: pair.0,
1479                    memory_b: pair.1,
1480                    content_a: member_a.content.clone(),
1481                    content_b: member_b.content.clone(),
1482                    confidence: FALLBACK_CONTRADICTION_CONFIDENCE,
1483                    source_reliability_a: member_a.source_reliability,
1484                    source_reliability_b: member_b.source_reliability,
1485                });
1486            }
1487        }
1488    }
1489
1490    visible_pairs.sort_by_key(|pair| (pair.memory_a, pair.memory_b));
1491    let policy = resolve_conflict_resolution_policy(db, visible_namespaces);
1492    let groups = build_conflict_groups(
1493        &visible_members,
1494        &adjacency,
1495        &pair_edges,
1496        &human_override_members,
1497        policy,
1498    );
1499
1500    ConflictSummary {
1501        pairs: visible_pairs,
1502        groups,
1503    }
1504}
1505
1506async fn collect_conflict_summary_for_record<R>(
1507    db: &R,
1508    record: &MemoryRecord,
1509    visible_namespaces: Option<&[Namespace]>,
1510    resolution: RecordConflictResolution,
1511) -> ConflictSummary
1512where
1513    R: ConflictReadRuntime + ?Sized,
1514{
1515    if let Some(visible_namespaces) = visible_namespaces {
1516        if !visible_namespaces.contains(&record.effective_namespace()) {
1517            return ConflictSummary::default();
1518        }
1519    }
1520
1521    let seed_id = record.id();
1522    let graph = db.graph_store();
1523    let mut visible_members = BTreeMap::new();
1524    visible_members.insert(seed_id, conflict_member_from_record(record, true));
1525
1526    let mut loaded_records = BTreeMap::from([(seed_id, record.clone())]);
1527    let mut human_override_members = HashSet::new();
1528    if let MemoryRecord::Semantic(record) = record {
1529        if semantic_record_has_human_override(record) {
1530            human_override_members.insert(record.id);
1531        }
1532    }
1533
1534    let mut seen_nodes: HashSet<MemoryId> = HashSet::from([seed_id]);
1535    let mut seen_pairs: HashSet<(MemoryId, MemoryId)> = HashSet::new();
1536    let mut adjacency: BTreeMap<MemoryId, Vec<MemoryId>> = BTreeMap::new();
1537    let mut pair_edges = Vec::new();
1538    let mut queue = VecDeque::from([seed_id]);
1539
1540    while let Some(current_id) = queue.pop_front() {
1541        let Some(current_record) = loaded_records.get(&current_id).cloned() else {
1542            continue;
1543        };
1544        let resolution_snapshot = match resolution {
1545            RecordConflictResolution::Live => {
1546                conflict_resolution_snapshot_for_record(db, &current_record).await
1547            }
1548            RecordConflictResolution::Exact => None,
1549        };
1550
1551        if !matches!(resolution, RecordConflictResolution::Exact) {
1552            let edges = graph
1553                .get_edges_of_type(current_id, EdgeRelation::Contradicts)
1554                .await
1555                .unwrap_or_default();
1556
1557            for edge in edges {
1558                let raw_other_id = if edge.source == current_id {
1559                    edge.target
1560                } else {
1561                    edge.source
1562                };
1563
1564                let Some(other_record) = resolve_conflict_target_record(
1565                    db,
1566                    raw_other_id,
1567                    visible_namespaces,
1568                    resolution_snapshot,
1569                    matches!(resolution, RecordConflictResolution::Exact),
1570                )
1571                .await
1572                else {
1573                    continue;
1574                };
1575
1576                let other_id = other_record.id();
1577                if other_id == current_id {
1578                    continue;
1579                }
1580
1581                let pair = normalize_conflict_pair(current_id, other_id);
1582                if seen_pairs.insert(pair) {
1583                    let confidence = edge.confidence().unwrap_or(edge.weight).clamp(0.0, 1.0);
1584                    let evidence_count = edge
1585                        .evidence_count()
1586                        .unwrap_or(1)
1587                        .max(1)
1588                        .try_into()
1589                        .unwrap_or(1);
1590                    push_conflict_pair_edge(
1591                        pair,
1592                        confidence,
1593                        evidence_count,
1594                        edge.resolved,
1595                        &mut adjacency,
1596                        &mut pair_edges,
1597                    );
1598                }
1599
1600                if seen_nodes.insert(other_id) {
1601                    if let MemoryRecord::Semantic(record) = &other_record {
1602                        if semantic_record_has_human_override(record) {
1603                            human_override_members.insert(record.id);
1604                        }
1605                    }
1606                    visible_members
1607                        .entry(other_id)
1608                        .or_insert_with(|| conflict_member_from_record(&other_record, false));
1609                    loaded_records.insert(other_id, other_record);
1610                    queue.push_back(other_id);
1611                }
1612            }
1613        }
1614
1615        if let MemoryRecord::Semantic(semantic) = &current_record {
1616            if matches!(resolution, RecordConflictResolution::Exact) && current_id != seed_id {
1617                continue;
1618            }
1619
1620            for contradiction_id in &semantic.contradiction_ids {
1621                let Some(target_record) = resolve_conflict_target_record(
1622                    db,
1623                    *contradiction_id,
1624                    visible_namespaces,
1625                    resolution_snapshot,
1626                    matches!(resolution, RecordConflictResolution::Exact),
1627                )
1628                .await
1629                else {
1630                    continue;
1631                };
1632                let target_id = target_record.id();
1633                if target_id == current_id {
1634                    continue;
1635                }
1636
1637                let pair = normalize_conflict_pair(current_id, target_id);
1638                if seen_pairs.insert(pair) {
1639                    push_conflict_pair_edge(
1640                        pair,
1641                        FALLBACK_CONTRADICTION_CONFIDENCE,
1642                        1,
1643                        false,
1644                        &mut adjacency,
1645                        &mut pair_edges,
1646                    );
1647                }
1648
1649                if seen_nodes.insert(target_id) {
1650                    if let MemoryRecord::Semantic(record) = &target_record {
1651                        if semantic_record_has_human_override(record) {
1652                            human_override_members.insert(record.id);
1653                        }
1654                    }
1655                    visible_members
1656                        .entry(target_id)
1657                        .or_insert_with(|| conflict_member_from_record(&target_record, false));
1658                    loaded_records.insert(target_id, target_record);
1659                    queue.push_back(target_id);
1660                }
1661            }
1662        }
1663    }
1664
1665    if pair_edges.is_empty() {
1666        return ConflictSummary::default();
1667    }
1668
1669    let mut visible_pairs = Vec::new();
1670    for edge in &pair_edges {
1671        if let (Some(member_a), Some(member_b)) =
1672            (visible_members.get(&edge.a), visible_members.get(&edge.b))
1673        {
1674            visible_pairs.push(ConflictPair {
1675                memory_a: edge.a,
1676                memory_b: edge.b,
1677                content_a: member_a.content.clone(),
1678                content_b: member_b.content.clone(),
1679                confidence: edge.confidence,
1680                source_reliability_a: member_a.source_reliability,
1681                source_reliability_b: member_b.source_reliability,
1682            });
1683        }
1684    }
1685    visible_pairs.sort_by_key(|pair| (pair.memory_a, pair.memory_b));
1686    let policy = resolve_conflict_resolution_policy(db, visible_namespaces);
1687
1688    ConflictSummary {
1689        pairs: visible_pairs,
1690        groups: build_conflict_groups(
1691            &visible_members,
1692            &adjacency,
1693            &pair_edges,
1694            &human_override_members,
1695            policy,
1696        ),
1697    }
1698}
1699
1700fn push_conflict_pair_edge(
1701    pair: (MemoryId, MemoryId),
1702    confidence: f32,
1703    evidence_count: usize,
1704    resolved: bool,
1705    adjacency: &mut BTreeMap<MemoryId, Vec<MemoryId>>,
1706    pair_edges: &mut Vec<ConflictEdgeMeta>,
1707) {
1708    adjacency.entry(pair.0).or_default().push(pair.1);
1709    adjacency.entry(pair.1).or_default().push(pair.0);
1710    pair_edges.push(ConflictEdgeMeta {
1711        a: pair.0,
1712        b: pair.1,
1713        confidence,
1714        evidence_count,
1715        resolved,
1716    });
1717}
1718
1719pub(crate) fn build_semantic_conflict_groups(
1720    records: &[hirn_core::semantic::SemanticRecord],
1721    policy: ConflictResolutionPolicy,
1722) -> Vec<ConflictGroup> {
1723    let mut visible_members = BTreeMap::new();
1724    let mut adjacency = BTreeMap::new();
1725    let mut pair_edges = Vec::new();
1726    let mut human_override_members = HashSet::new();
1727    let mut seen_pairs = HashSet::new();
1728    let records_by_id: BTreeMap<MemoryId, &hirn_core::semantic::SemanticRecord> =
1729        records.iter().map(|record| (record.id, record)).collect();
1730
1731    for record in records {
1732        let memory_record = MemoryRecord::Semantic(record.clone());
1733        visible_members.insert(record.id, conflict_member_from_record(&memory_record, true));
1734        if semantic_record_has_human_override(record) {
1735            human_override_members.insert(record.id);
1736        }
1737    }
1738
1739    for record in records {
1740        for contradiction_id in &record.contradiction_ids {
1741            let Some(other) = records_by_id.get(contradiction_id) else {
1742                continue;
1743            };
1744
1745            let pair = normalize_conflict_pair(record.id, *contradiction_id);
1746            if !seen_pairs.insert(pair) {
1747                continue;
1748            }
1749
1750            let resolved = !record.is_live() || !other.is_live();
1751            push_conflict_pair_edge(
1752                pair,
1753                FALLBACK_CONTRADICTION_CONFIDENCE,
1754                1,
1755                resolved,
1756                &mut adjacency,
1757                &mut pair_edges,
1758            );
1759        }
1760    }
1761
1762    build_conflict_groups(
1763        &visible_members,
1764        &adjacency,
1765        &pair_edges,
1766        &human_override_members,
1767        policy,
1768    )
1769}
1770
1771async fn resolve_conflict_target_record<R>(
1772    db: &R,
1773    target_id: MemoryId,
1774    visible_namespaces: Option<&[Namespace]>,
1775    snapshot: Option<RecallSnapshot>,
1776    preserve_exact_semantic_targets: bool,
1777) -> Option<MemoryRecord>
1778where
1779    R: ConflictReadRuntime + ?Sized,
1780{
1781    let record = db.get_memory(target_id).await.ok()?;
1782    let resolved = match record {
1783        MemoryRecord::Semantic(record) if preserve_exact_semantic_targets => {
1784            MemoryRecord::Semantic(record)
1785        }
1786        MemoryRecord::Semantic(record) => match snapshot {
1787            Some(snapshot) => match db
1788                .semantic_revision_for_logical_id_at_snapshot(record.logical_memory_id, snapshot)
1789                .await
1790            {
1791                Ok(Some(revision)) => MemoryRecord::Semantic(revision),
1792                Ok(None) => return None,
1793                Err(_) => MemoryRecord::Semantic(record),
1794            },
1795            None => match db
1796                .semantic_head_for_logical_id(record.logical_memory_id)
1797                .await
1798            {
1799                Ok(head) => MemoryRecord::Semantic(head),
1800                Err(_) => MemoryRecord::Semantic(record),
1801            },
1802        },
1803        other => other,
1804    };
1805
1806    if let Some(visible_namespaces) = visible_namespaces {
1807        if !visible_namespaces.contains(&resolved.effective_namespace()) {
1808            return None;
1809        }
1810    }
1811
1812    Some(resolved)
1813}
1814
1815async fn conflict_resolution_snapshot_for_record<R>(
1816    db: &R,
1817    record: &MemoryRecord,
1818) -> Option<RecallSnapshot>
1819where
1820    R: ConflictReadRuntime + ?Sized,
1821{
1822    let MemoryRecord::Semantic(record) = record else {
1823        return None;
1824    };
1825
1826    match db
1827        .semantic_head_for_logical_id(record.logical_memory_id)
1828        .await
1829    {
1830        Ok(head) if head.revision_id != record.revision_id => {
1831            Some(RecallSnapshot::revision(record.revision_id))
1832        }
1833        _ => None,
1834    }
1835}
1836
1837fn semantic_record_has_human_override(record: &hirn_core::semantic::SemanticRecord) -> bool {
1838    matches!(
1839        record.revision_operation,
1840        hirn_core::RevisionOperation::Override
1841    )
1842}
1843
1844fn resolve_conflict_resolution_policy<R>(
1845    db: &R,
1846    visible_namespaces: Option<&[Namespace]>,
1847) -> ConflictResolutionPolicy
1848where
1849    R: ConflictReadRuntime + ?Sized,
1850{
1851    let config = db.config();
1852    if let Some(namespaces) = visible_namespaces {
1853        if namespaces.len() == 1 {
1854            if let Some(policy) = config
1855                .conflict_resolution_overrides
1856                .by_namespace
1857                .get(namespaces[0].as_str())
1858            {
1859                return *policy;
1860            }
1861        }
1862    }
1863
1864    config
1865        .conflict_resolution_overrides
1866        .by_realm
1867        .get(&config.default_realm)
1868        .copied()
1869        .unwrap_or(config.conflict_resolution_policy)
1870}
1871
1872fn normalize_conflict_pair(a: MemoryId, b: MemoryId) -> (MemoryId, MemoryId) {
1873    if a < b { (a, b) } else { (b, a) }
1874}
1875
1876async fn conflict_node_is_visible(
1877    graph: &dyn crate::graph_store::GraphStore,
1878    node_id: MemoryId,
1879    visible_namespaces: Option<&[Namespace]>,
1880    namespace_cache: &mut BTreeMap<MemoryId, Option<Namespace>>,
1881) -> bool {
1882    let Some(visible_namespaces) = visible_namespaces else {
1883        return true;
1884    };
1885
1886    let namespace = match namespace_cache.get(&node_id) {
1887        Some(namespace) => *namespace,
1888        None => {
1889            let namespace = graph.node_namespace(node_id).await.ok().flatten();
1890            namespace_cache.insert(node_id, namespace);
1891            namespace
1892        }
1893    };
1894
1895    namespace.is_some_and(|namespace| visible_namespaces.contains(&namespace))
1896}
1897
1898fn conflict_member_from_scored(scored: &ScoredMemory, in_result_set: bool) -> ConflictMember {
1899    let (logical_memory_id, revision_id, status) = conflict_member_identity(scored);
1900    ConflictMember {
1901        memory_id: scored.record.id(),
1902        logical_memory_id,
1903        revision_id,
1904        status,
1905        layer: scored.record.layer(),
1906        content: extract_content_str(&scored.record).to_string(),
1907        in_result_set,
1908        source_reliability: crate::scoring::source_reliability_for_record(&scored.record),
1909        recency_basis_ms: conflict_member_recency_basis_ms(&scored.record),
1910    }
1911}
1912
1913fn conflict_member_from_record(record: &MemoryRecord, in_result_set: bool) -> ConflictMember {
1914    let (logical_memory_id, revision_id, status) = conflict_member_identity_from_record(record);
1915    ConflictMember {
1916        memory_id: record.id(),
1917        logical_memory_id,
1918        revision_id,
1919        status,
1920        layer: record.layer(),
1921        content: extract_content_str(record).to_string(),
1922        in_result_set,
1923        source_reliability: crate::scoring::source_reliability_for_record(record),
1924        recency_basis_ms: conflict_member_recency_basis_ms(record),
1925    }
1926}
1927
1928fn conflict_member_recency_basis_ms(record: &MemoryRecord) -> i64 {
1929    match record {
1930        MemoryRecord::Episodic(record) => record.timestamp.timestamp_ms(),
1931        MemoryRecord::Semantic(record) => record.valid_from.timestamp_ms(),
1932        MemoryRecord::Working(record) => record.observed_at.timestamp_ms(),
1933        MemoryRecord::Procedural(record) => record.observed_at.timestamp_ms(),
1934    }
1935}
1936
1937fn conflict_member_identity(
1938    scored: &ScoredMemory,
1939) -> (
1940    Option<LogicalMemoryId>,
1941    Option<RevisionId>,
1942    ConflictMemberStatus,
1943) {
1944    if let Some(revision) = scored.revision {
1945        return (
1946            Some(revision.logical_memory_id),
1947            Some(revision.revision_id),
1948            conflict_member_status_from_revision_state(revision.state),
1949        );
1950    }
1951
1952    conflict_member_identity_from_record(&scored.record)
1953}
1954
1955fn conflict_member_identity_from_record(
1956    record: &MemoryRecord,
1957) -> (
1958    Option<LogicalMemoryId>,
1959    Option<RevisionId>,
1960    ConflictMemberStatus,
1961) {
1962    match record {
1963        MemoryRecord::Semantic(record) => {
1964            let status = if record.is_retracted() {
1965                ConflictMemberStatus::Retracted
1966            } else if record.is_merged() {
1967                ConflictMemberStatus::Merged
1968            } else if record.superseded_by.is_some() {
1969                ConflictMemberStatus::Superseded
1970            } else {
1971                ConflictMemberStatus::Active
1972            };
1973            (
1974                Some(record.logical_memory_id),
1975                Some(record.revision_id),
1976                status,
1977            )
1978        }
1979        MemoryRecord::Episodic(record) => (
1980            None,
1981            None,
1982            if record.archived {
1983                ConflictMemberStatus::Superseded
1984            } else {
1985                ConflictMemberStatus::Active
1986            },
1987        ),
1988        MemoryRecord::Procedural(record) => (
1989            None,
1990            None,
1991            if record.archived {
1992                ConflictMemberStatus::Superseded
1993            } else {
1994                ConflictMemberStatus::Active
1995            },
1996        ),
1997        MemoryRecord::Working(_) => (None, None, ConflictMemberStatus::Active),
1998    }
1999}
2000
2001fn conflict_member_status_from_revision_state(state: RevisionState) -> ConflictMemberStatus {
2002    match state {
2003        RevisionState::Active => ConflictMemberStatus::Active,
2004        RevisionState::Superseded => ConflictMemberStatus::Superseded,
2005        RevisionState::Retracted => ConflictMemberStatus::Retracted,
2006        RevisionState::Quarantined => ConflictMemberStatus::Quarantined,
2007        RevisionState::Merged => ConflictMemberStatus::Merged,
2008    }
2009}
2010
2011fn build_conflict_groups(
2012    visible_members: &BTreeMap<MemoryId, ConflictMember>,
2013    adjacency: &BTreeMap<MemoryId, Vec<MemoryId>>,
2014    pair_edges: &[ConflictEdgeMeta],
2015    human_override_members: &HashSet<MemoryId>,
2016    policy: ConflictResolutionPolicy,
2017) -> Vec<ConflictGroup> {
2018    let mut visited: HashSet<MemoryId> = HashSet::new();
2019    let mut groups = Vec::new();
2020
2021    for start in visible_members.keys().copied() {
2022        if visited.contains(&start) || !adjacency.contains_key(&start) {
2023            continue;
2024        }
2025
2026        let mut queue = VecDeque::from([start]);
2027        let mut component = HashSet::new();
2028        component.insert(start);
2029        visited.insert(start);
2030
2031        while let Some(current) = queue.pop_front() {
2032            if let Some(neighbors) = adjacency.get(&current) {
2033                for &neighbor in neighbors {
2034                    if component.insert(neighbor) {
2035                        visited.insert(neighbor);
2036                        queue.push_back(neighbor);
2037                    }
2038                }
2039            }
2040        }
2041
2042        let mut members: Vec<ConflictMember> = component
2043            .iter()
2044            .filter_map(|id| visible_members.get(id).cloned())
2045            .collect();
2046        if members.is_empty() {
2047            continue;
2048        }
2049        members.sort_by_key(|member| member.memory_id);
2050
2051        let omitted_member_count = component.len().saturating_sub(members.len());
2052        let component_edges: Vec<&ConflictEdgeMeta> = pair_edges
2053            .iter()
2054            .filter(|edge| component.contains(&edge.a) && component.contains(&edge.b))
2055            .collect();
2056        let pair_count = component_edges.len();
2057        let confidence = if pair_count > 0 {
2058            component_edges
2059                .iter()
2060                .map(|edge| edge.confidence)
2061                .sum::<f32>()
2062                / pair_count as f32
2063        } else {
2064            0.0
2065        };
2066        let evidence_count = component_edges.iter().map(|edge| edge.evidence_count).sum();
2067        let source_reliability = members
2068            .iter()
2069            .map(|member| member.source_reliability)
2070            .sum::<f32>()
2071            / members.len() as f32;
2072        let arbitration_status =
2073            derive_conflict_arbitration_status(&members, &component_edges, omitted_member_count);
2074        let authoritative_memory_id =
2075            authoritative_conflict_memory_id(&members, omitted_member_count);
2076        let preferred_memory_id = if authoritative_memory_id.is_none() {
2077            select_conflict_preferred_memory_id(
2078                &members,
2079                &component_edges,
2080                omitted_member_count,
2081                human_override_members,
2082                policy,
2083            )
2084        } else {
2085            None
2086        };
2087
2088        groups.push(ConflictGroup {
2089            conflict_id: members
2090                .iter()
2091                .map(|member| member.memory_id.to_string())
2092                .collect::<Vec<_>>()
2093                .join(":"),
2094            members,
2095            omitted_member_count,
2096            pair_count,
2097            confidence,
2098            evidence_count,
2099            source_reliability,
2100            arbitration_status,
2101            authoritative_memory_id,
2102            preferred_memory_id,
2103        });
2104    }
2105
2106    groups.sort_by(|a, b| a.conflict_id.cmp(&b.conflict_id));
2107    groups
2108}
2109
2110fn derive_conflict_arbitration_status(
2111    members: &[ConflictMember],
2112    component_edges: &[&ConflictEdgeMeta],
2113    omitted_member_count: usize,
2114) -> ConflictArbitrationStatus {
2115    let active_member_count = members
2116        .iter()
2117        .filter(|member| member.status == ConflictMemberStatus::Active)
2118        .count();
2119    let has_resolved_loser = members.iter().any(|member| {
2120        matches!(
2121            member.status,
2122            ConflictMemberStatus::Retracted | ConflictMemberStatus::Merged
2123        )
2124    });
2125
2126    if !component_edges.is_empty() && component_edges.iter().all(|edge| edge.resolved) {
2127        ConflictArbitrationStatus::Resolved
2128    } else if members
2129        .iter()
2130        .any(|member| member.status == ConflictMemberStatus::Quarantined)
2131    {
2132        ConflictArbitrationStatus::Quarantined
2133    } else if omitted_member_count == 0
2134        && active_member_count == 1
2135        && members
2136            .iter()
2137            .any(|member| member.status != ConflictMemberStatus::Active)
2138    {
2139        if has_resolved_loser {
2140            ConflictArbitrationStatus::Resolved
2141        } else {
2142            ConflictArbitrationStatus::Superseded
2143        }
2144    } else if members
2145        .iter()
2146        .all(|member| member.status != ConflictMemberStatus::Active)
2147    {
2148        ConflictArbitrationStatus::Resolved
2149    } else {
2150        ConflictArbitrationStatus::Unresolved
2151    }
2152}
2153
2154fn authoritative_conflict_memory_id(
2155    members: &[ConflictMember],
2156    omitted_member_count: usize,
2157) -> Option<MemoryId> {
2158    if omitted_member_count > 0 {
2159        return None;
2160    }
2161
2162    let active_members: Vec<MemoryId> = members
2163        .iter()
2164        .filter(|member| member.status == ConflictMemberStatus::Active)
2165        .map(|member| member.memory_id)
2166        .collect();
2167
2168    (active_members.len() == 1).then_some(active_members[0])
2169}
2170
2171fn select_conflict_preferred_memory_id(
2172    members: &[ConflictMember],
2173    component_edges: &[&ConflictEdgeMeta],
2174    omitted_member_count: usize,
2175    human_override_members: &HashSet<MemoryId>,
2176    policy: ConflictResolutionPolicy,
2177) -> Option<MemoryId> {
2178    if omitted_member_count > 0 {
2179        return None;
2180    }
2181
2182    let mut active_members: Vec<&ConflictMember> = members
2183        .iter()
2184        .filter(|member| member.status == ConflictMemberStatus::Active)
2185        .collect();
2186    if active_members.is_empty() {
2187        return None;
2188    }
2189
2190    if policy.prefer_human_override
2191        && active_members
2192            .iter()
2193            .any(|member| human_override_members.contains(&member.memory_id))
2194    {
2195        active_members.retain(|member| human_override_members.contains(&member.memory_id));
2196    }
2197
2198    let supports: BTreeMap<MemoryId, (usize, f32)> = active_members
2199        .iter()
2200        .map(|member| {
2201            (
2202                member.memory_id,
2203                conflict_member_support(member.memory_id, component_edges),
2204            )
2205        })
2206        .collect();
2207    let max_evidence = supports
2208        .values()
2209        .map(|(evidence, _)| *evidence)
2210        .max()
2211        .unwrap_or(0);
2212    let max_confidence = supports
2213        .values()
2214        .map(|(_, confidence)| *confidence)
2215        .fold(0.0, f32::max);
2216
2217    let mut recency_order = active_members.clone();
2218    recency_order.sort_by_key(|member| {
2219        (
2220            member.recency_basis_ms,
2221            member.revision_id,
2222            member.memory_id,
2223        )
2224    });
2225    let recency_rank: BTreeMap<MemoryId, usize> = recency_order
2226        .iter()
2227        .enumerate()
2228        .map(|(index, member)| (member.memory_id, index))
2229        .collect();
2230
2231    active_members
2232        .into_iter()
2233        .max_by(|left, right| {
2234            compare_conflict_member_preference(
2235                left,
2236                right,
2237                &supports,
2238                &recency_rank,
2239                recency_order.len(),
2240                max_evidence,
2241                max_confidence,
2242                human_override_members,
2243                policy,
2244            )
2245        })
2246        .map(|member| member.memory_id)
2247}
2248
2249fn compare_conflict_member_preference(
2250    left: &ConflictMember,
2251    right: &ConflictMember,
2252    supports: &BTreeMap<MemoryId, (usize, f32)>,
2253    recency_rank: &BTreeMap<MemoryId, usize>,
2254    active_member_count: usize,
2255    max_evidence: usize,
2256    max_confidence: f32,
2257    human_override_members: &HashSet<MemoryId>,
2258    policy: ConflictResolutionPolicy,
2259) -> std::cmp::Ordering {
2260    let left_score = conflict_member_preference_score(
2261        left,
2262        supports,
2263        recency_rank,
2264        active_member_count,
2265        max_evidence,
2266        max_confidence,
2267        human_override_members,
2268        policy,
2269    );
2270    let right_score = conflict_member_preference_score(
2271        right,
2272        supports,
2273        recency_rank,
2274        active_member_count,
2275        max_evidence,
2276        max_confidence,
2277        human_override_members,
2278        policy,
2279    );
2280
2281    left_score
2282        .total_cmp(&right_score)
2283        .then_with(|| left.revision_id.cmp(&right.revision_id))
2284        .then_with(|| left.memory_id.cmp(&right.memory_id))
2285}
2286
2287fn conflict_member_preference_score(
2288    member: &ConflictMember,
2289    supports: &BTreeMap<MemoryId, (usize, f32)>,
2290    recency_rank: &BTreeMap<MemoryId, usize>,
2291    active_member_count: usize,
2292    max_evidence: usize,
2293    max_confidence: f32,
2294    human_override_members: &HashSet<MemoryId>,
2295    policy: ConflictResolutionPolicy,
2296) -> f32 {
2297    let (evidence_count, confidence_sum) =
2298        supports.get(&member.memory_id).copied().unwrap_or((0, 0.0));
2299    let recency_score = if active_member_count <= 1 {
2300        1.0
2301    } else {
2302        recency_rank.get(&member.memory_id).copied().unwrap_or(0) as f32
2303            / (active_member_count - 1) as f32
2304    };
2305    let evidence_score = if max_evidence == 0 {
2306        0.0
2307    } else {
2308        evidence_count as f32 / max_evidence as f32
2309    };
2310    let confidence_score = if max_confidence <= 0.0 {
2311        0.0
2312    } else {
2313        confidence_sum / max_confidence
2314    };
2315    let support_score = if evidence_count == 0 && confidence_sum <= 0.0 {
2316        0.0
2317    } else {
2318        f32::midpoint(evidence_score, confidence_score)
2319    };
2320    let human_override_score = if human_override_members.contains(&member.memory_id) {
2321        1.0
2322    } else {
2323        0.0
2324    };
2325
2326    recency_score * policy.recency_weight
2327        + member.source_reliability.clamp(0.0, 1.0) * policy.source_reliability_weight
2328        + support_score * policy.supporting_evidence_weight
2329        + human_override_score * policy.human_override_weight
2330}
2331
2332fn conflict_member_support(
2333    memory_id: MemoryId,
2334    component_edges: &[&ConflictEdgeMeta],
2335) -> (usize, f32) {
2336    component_edges
2337        .iter()
2338        .filter(|edge| edge.a == memory_id || edge.b == memory_id)
2339        .fold((0, 0.0), |(evidence_count, confidence), edge| {
2340            (
2341                evidence_count + edge.evidence_count,
2342                confidence + edge.confidence,
2343            )
2344        })
2345}
2346
2347fn format_conflict_group_line(group: &ConflictGroup) -> String {
2348    let mut line = format!(
2349        "⚠ CONFLICT {:?} (conf={:.2}, evidence={}): ",
2350        group.arbitration_status, group.confidence, group.evidence_count
2351    );
2352
2353    if let Some(authoritative_memory_id) = group.authoritative_memory_id {
2354        let _ = write!(line, "active=[{authoritative_memory_id}] ");
2355    } else if let Some(preferred_memory_id) = group.preferred_memory_id {
2356        let _ = write!(line, "preferred_visible=[{preferred_memory_id}] ");
2357    }
2358
2359    let member_summary = group
2360        .members
2361        .iter()
2362        .map(|member| {
2363            format!(
2364                "[{} {:?}] {}",
2365                member.memory_id, member.status, member.content
2366            )
2367        })
2368        .collect::<Vec<_>>()
2369        .join(" | ");
2370    line.push_str(&member_summary);
2371
2372    if group.omitted_member_count > 0 {
2373        let _ = write!(
2374            line,
2375            " | {} contradictory claim(s) omitted from this result set",
2376            group.omitted_member_count
2377        );
2378    }
2379
2380    line
2381}
2382
2383// ── Step 4: Budget allocation (F-43: tiered) ──────────────────────────
2384
2385/// F-43: Tiered budget allocation per CONCEPT §10.2.
2386///
2387/// Budget tiers (after mandatory WM + contradiction reserves):
2388/// 1. **Direct results** (50%): semantic + episodic + procedural
2389/// 2. **Graph-connected neighbors** (25%): memories linked by graph edges
2390/// 3. **Causally-upstream** (15%): memories reachable via CAUSED_BY / LED_TO
2391/// 4. **Filler / expansion** (10%): redistributed to direct if unused
2392fn allocate_budget(
2393    config: &ContextConfig,
2394    working_entries: &[WorkingMemoryEntry],
2395    contradictions: &[ConflictGroup],
2396    classified: &[Candidate],
2397    tokenizer: &dyn Tokenizer,
2398) -> BudgetAllocation {
2399    let total = config
2400        .token_budget
2401        .saturating_sub(estimate_context_format_overhead(
2402            config,
2403            working_entries,
2404            contradictions,
2405            classified,
2406            tokenizer,
2407        ));
2408
2409    // Working memory: mandatory reserve.
2410    let wm_needed: usize = working_entries
2411        .iter()
2412        .map(|w| tokenizer.count_tokens(&w.content) + 5)
2413        .sum();
2414    let wm_reserve = (total as f32 * config.working_memory_reserve) as usize;
2415    let wm_budget = if wm_needed == 0 { 0 } else { wm_reserve.max(1) };
2416
2417    // Contradiction overhead.
2418    let contra_budget = if contradictions.is_empty() {
2419        0
2420    } else {
2421        let actual: usize = contradictions
2422            .iter()
2423            .map(|group| tokenizer.count_tokens(&format_conflict_group_line(group)) + 2)
2424            .sum();
2425        actual.min(total / 4)
2426    };
2427
2428    // Remaining budget after WM + contradictions.
2429    let remaining = total.saturating_sub(wm_budget + contra_budget);
2430
2431    // F-43: Tiered allocation of the remaining budget.
2432    // Tier 1: Direct results (50%) — split among semantic/episodic/procedural
2433    // Tier 2: Graph-connected (25%) — only if graph context is enabled.
2434    // Tier 3: Causal-upstream (15%) — only if causal chains are enabled.
2435    // Tier 4: Filler (10%) — redistributed to Tier 1
2436    let graph_fraction = if config.features.include_graph_context() {
2437        0.25
2438    } else {
2439        0.0
2440    };
2441    let causal_fraction = if config.features.include_causal_chains() {
2442        0.15
2443    } else {
2444        0.0
2445    };
2446    let filler_fraction = 0.10_f32;
2447    let direct_fraction = 1.0 - graph_fraction - causal_fraction - filler_fraction;
2448
2449    let direct_budget = (remaining as f32 * (direct_fraction + filler_fraction)) as usize;
2450    let graph_budget = (remaining as f32 * graph_fraction) as usize;
2451    let causal_budget = (remaining as f32 * causal_fraction) as usize;
2452
2453    // Split direct budget among semantic, episodic, and procedural.
2454    let has_semantic = classified.iter().any(|c| c.layer == Layer::Semantic);
2455    let has_episodic = classified.iter().any(|c| c.layer == Layer::Episodic);
2456    let has_procedural = classified.iter().any(|c| c.layer == Layer::Procedural);
2457
2458    let active_layers = has_semantic as usize + has_episodic as usize + has_procedural as usize;
2459    let (sem_budget, ep_budget, proc_budget) = if active_layers == 0 {
2460        (0, 0, 0)
2461    } else {
2462        // Weighted split: semantic gets `semantic_weight`, episodic and procedural share the rest.
2463        let sw = config.semantic_weight;
2464        let sem = if has_semantic {
2465            (direct_budget as f32 * sw) as usize
2466        } else {
2467            0
2468        };
2469        let rest = direct_budget.saturating_sub(sem);
2470        let ep = if has_episodic && has_procedural {
2471            // 70/30 split between episodic and procedural
2472            (rest as f32 * 0.7) as usize
2473        } else if has_episodic {
2474            rest
2475        } else {
2476            0
2477        };
2478        let proc = rest.saturating_sub(ep);
2479        (sem, ep, proc)
2480    };
2481
2482    BudgetAllocation {
2483        working_memory: wm_budget,
2484        contradictions: contra_budget,
2485        semantic: sem_budget,
2486        episodic: ep_budget,
2487        procedural: proc_budget,
2488        graph_connected: graph_budget,
2489        causal_upstream: causal_budget,
2490    }
2491}
2492
2493fn estimate_context_format_overhead(
2494    config: &ContextConfig,
2495    working_entries: &[WorkingMemoryEntry],
2496    contradictions: &[ConflictGroup],
2497    classified: &[Candidate],
2498    tokenizer: &dyn Tokenizer,
2499) -> usize {
2500    let include_working = !working_entries.is_empty();
2501    let include_conflicts = !contradictions.is_empty();
2502    let include_semantic = classified.iter().any(|c| c.layer == Layer::Semantic);
2503    let include_episodic = classified.iter().any(|c| c.layer == Layer::Episodic);
2504    let include_procedural = classified.iter().any(|c| c.layer == Layer::Procedural);
2505    let include_graph = config.features.include_graph_context() && !classified.is_empty();
2506    let include_causal = config.features.include_causal_chains() && !classified.is_empty();
2507
2508    let placeholder_entry = ContextEntry {
2509        id: MemoryId::new(),
2510        content: "x".to_string(),
2511        token_cost: 0,
2512        resource_evidence: Vec::new(),
2513        resource_preview_packages: Vec::new(),
2514        resource_score_attribution: Vec::new(),
2515    };
2516    let placeholder_line = String::from("x");
2517
2518    let working = if include_working {
2519        vec![placeholder_entry.clone()]
2520    } else {
2521        vec![]
2522    };
2523    let conflicts = if include_conflicts {
2524        vec![placeholder_line.clone()]
2525    } else {
2526        vec![]
2527    };
2528    let semantic = if include_semantic {
2529        vec![placeholder_entry.clone()]
2530    } else {
2531        vec![]
2532    };
2533    let episodic = if include_episodic {
2534        vec![placeholder_entry.clone()]
2535    } else {
2536        vec![]
2537    };
2538    let procedural = if include_procedural {
2539        vec![placeholder_entry]
2540    } else {
2541        vec![]
2542    };
2543    let graph = if include_graph {
2544        vec![placeholder_line.clone()]
2545    } else {
2546        vec![]
2547    };
2548    let causal = if include_causal {
2549        vec![placeholder_line]
2550    } else {
2551        vec![]
2552    };
2553
2554    let placeholder_tokens = [
2555        include_working,
2556        include_conflicts,
2557        include_semantic,
2558        include_episodic,
2559        include_procedural,
2560        include_graph,
2561        include_causal,
2562    ]
2563    .into_iter()
2564    .filter(|included| *included)
2565    .count()
2566        * tokenizer.count_tokens("x");
2567
2568    let formatted = format_context(
2569        config.output_format,
2570        &working,
2571        &conflicts,
2572        &semantic,
2573        &episodic,
2574        &procedural,
2575        &graph,
2576        &causal,
2577    );
2578
2579    tokenizer
2580        .count_tokens(&formatted)
2581        .saturating_sub(placeholder_tokens)
2582}
2583
2584// ── Step 5: Build sections ─────────────────────────────────────────────
2585
2586fn build_working_memory_section(
2587    entries: &[WorkingMemoryEntry],
2588    budget_tokens: usize,
2589    tokenizer: &dyn Tokenizer,
2590) -> (Vec<ContextEntry>, usize) {
2591    let mut lines = Vec::new();
2592    let mut used = 0;
2593
2594    for entry in entries {
2595        let line = format!("• {}", entry.content);
2596        let tokens = tokenizer.count_tokens(&line);
2597        if used + tokens > budget_tokens {
2598            // Truncate last entry if partially fits.
2599            let remaining = budget_tokens.saturating_sub(used);
2600            if remaining > 5 {
2601                let truncated = truncate_to_budget(&line, remaining, tokenizer);
2602                used += tokenizer.count_tokens(&truncated);
2603                let content = truncated
2604                    .strip_prefix("• ")
2605                    .unwrap_or(truncated.as_str())
2606                    .to_string();
2607                let content_tokens = tokenizer.count_tokens(&content);
2608                lines.push(ContextEntry {
2609                    id: entry.id,
2610                    content,
2611                    token_cost: content_tokens,
2612                    resource_evidence: Vec::new(),
2613                    resource_preview_packages: Vec::new(),
2614                    resource_score_attribution: Vec::new(),
2615                });
2616            }
2617            break;
2618        }
2619        used += tokens;
2620        let content_tokens = tokenizer.count_tokens(&entry.content);
2621        lines.push(ContextEntry {
2622            id: entry.id,
2623            content: entry.content.clone(),
2624            token_cost: content_tokens,
2625            resource_evidence: Vec::new(),
2626            resource_preview_packages: Vec::new(),
2627            resource_score_attribution: Vec::new(),
2628        });
2629    }
2630
2631    (lines, used)
2632}
2633
2634fn build_contradiction_section(
2635    conflicts: &[ConflictGroup],
2636    budget_tokens: usize,
2637    tokenizer: &dyn Tokenizer,
2638) -> (Vec<String>, usize) {
2639    let mut lines = Vec::new();
2640    let mut used = 0;
2641
2642    for conflict in conflicts {
2643        let line = format_conflict_group_line(conflict);
2644        let tokens = tokenizer.count_tokens(&line);
2645        if used + tokens > budget_tokens {
2646            break;
2647        }
2648        used += tokens;
2649        lines.push(line);
2650    }
2651
2652    (lines, used)
2653}
2654
2655fn build_direct_sections(
2656    sorted: &SortedDirectCandidates<'_>,
2657    allocation: &BudgetAllocation,
2658    config: &ContextConfig,
2659    tokenizer: &dyn Tokenizer,
2660) -> (Vec<ContextEntry>, Vec<ContextEntry>, Vec<ContextEntry>) {
2661    let (semantic_section, _sem_tokens) = build_layer_section_from_sorted(
2662        &sorted.semantic,
2663        allocation.semantic,
2664        config.compression_threshold,
2665        None,
2666        tokenizer,
2667    );
2668
2669    let (episodic_section, _ep_tokens) = build_layer_section_from_sorted(
2670        &sorted.episodic,
2671        allocation.episodic,
2672        config.compression_threshold,
2673        Some(config.max_episodic_entries),
2674        tokenizer,
2675    );
2676
2677    let (procedural_section, _proc_tokens) = build_layer_section_from_sorted(
2678        &sorted.procedural,
2679        allocation.procedural,
2680        config.compression_threshold,
2681        None,
2682        tokenizer,
2683    );
2684
2685    (semantic_section, episodic_section, procedural_section)
2686}
2687
2688fn collect_direct_section_ids(
2689    semantic: &[ContextEntry],
2690    episodic: &[ContextEntry],
2691    procedural: &[ContextEntry],
2692) -> HashSet<MemoryId> {
2693    semantic
2694        .iter()
2695        .chain(episodic.iter())
2696        .chain(procedural.iter())
2697        .map(|entry| entry.id)
2698        .collect()
2699}
2700
2701/// Build a section for a specific layer, applying progressive compression.
2702/// Returns (entries as (id, text) pairs, total tokens used).
2703const MAX_CONTEXT_EVIDENCE_ITEMS: usize = 2;
2704
2705struct SortedDirectCandidates<'a> {
2706    semantic: Vec<&'a Candidate>,
2707    episodic: Vec<&'a Candidate>,
2708    procedural: Vec<&'a Candidate>,
2709}
2710
2711fn sort_candidates_by_weighted_score(candidates: &mut Vec<&Candidate>) {
2712    candidates.sort_by(|a, b| {
2713        let weighted_a = a.score * a.trust_score;
2714        let weighted_b = b.score * b.trust_score;
2715        weighted_b
2716            .partial_cmp(&weighted_a)
2717            .unwrap_or(std::cmp::Ordering::Equal)
2718    });
2719}
2720
2721fn prepare_sorted_direct_candidates(classified: &[Candidate]) -> SortedDirectCandidates<'_> {
2722    let mut semantic = classified
2723        .iter()
2724        .filter(|candidate| candidate.layer == Layer::Semantic)
2725        .collect::<Vec<_>>();
2726    sort_candidates_by_weighted_score(&mut semantic);
2727
2728    let mut episodic = classified
2729        .iter()
2730        .filter(|candidate| candidate.layer == Layer::Episodic)
2731        .collect::<Vec<_>>();
2732    sort_candidates_by_weighted_score(&mut episodic);
2733
2734    let mut procedural = classified
2735        .iter()
2736        .filter(|candidate| candidate.layer == Layer::Procedural)
2737        .collect::<Vec<_>>();
2738    sort_candidates_by_weighted_score(&mut procedural);
2739
2740    SortedDirectCandidates {
2741        semantic,
2742        episodic,
2743        procedural,
2744    }
2745}
2746
2747fn access_mode_label(summary: &ResourceEvidenceSummary) -> &'static str {
2748    if summary.can_hydrate_full {
2749        "full"
2750    } else if summary.can_hydrate_preview {
2751        "preview"
2752    } else {
2753        "metadata"
2754    }
2755}
2756
2757fn evidence_identity(summary: &ResourceEvidenceSummary) -> String {
2758    summary
2759        .display_name
2760        .clone()
2761        .or_else(|| summary.mime_type.clone())
2762        .unwrap_or_else(|| summary.resource_id.to_string())
2763}
2764
2765fn summarize_resource_evidence(summary: &ResourceEvidenceSummary) -> String {
2766    let mut qualifiers = Vec::new();
2767    qualifiers.push(summary.provenance.as_str().to_string());
2768    if let Some(modality) = summary.modality {
2769        qualifiers.push(modality.as_str().to_string());
2770    }
2771    if let Some(artifact_kind) = summary.artifact_kind {
2772        qualifiers.push(format!("artifact={}", artifact_kind.as_str()));
2773    }
2774    if summary.lifecycle_state != ResourceGovernanceState::Active {
2775        qualifiers.push(summary.lifecycle_state.as_str().to_string());
2776    }
2777    qualifiers.push(access_mode_label(summary).to_string());
2778    if summary.has_preview {
2779        qualifiers.push("preview".to_string());
2780    }
2781    if !summary.available_artifacts.is_empty() {
2782        let artifacts = summary
2783            .available_artifacts
2784            .iter()
2785            .map(|kind| kind.as_str())
2786            .collect::<Vec<_>>()
2787            .join("|");
2788        qualifiers.push(format!("artifacts={artifacts}"));
2789    }
2790
2791    format!(
2792        "{} {} [{}]",
2793        summary.role.as_str(),
2794        evidence_identity(summary),
2795        qualifiers.join(", ")
2796    )
2797}
2798
2799fn evidence_suffix(resource_evidence: &[ResourceEvidenceSummary]) -> String {
2800    if resource_evidence.is_empty() {
2801        return String::new();
2802    }
2803
2804    let mut parts = resource_evidence
2805        .iter()
2806        .take(MAX_CONTEXT_EVIDENCE_ITEMS)
2807        .map(summarize_resource_evidence)
2808        .collect::<Vec<_>>();
2809    if resource_evidence.len() > MAX_CONTEXT_EVIDENCE_ITEMS {
2810        parts.push(format!(
2811            "+{} more",
2812            resource_evidence.len() - MAX_CONTEXT_EVIDENCE_ITEMS
2813        ));
2814    }
2815
2816    format!(" Evidence: {}.", parts.join("; "))
2817}
2818
2819async fn hydrate_selected_resource_previews(
2820    db: &HirnDB,
2821    actor_id: &AgentId,
2822    sections: &mut ContextSections,
2823    config: &ContextConfig,
2824) -> HirnResult<()> {
2825    if !should_package_resource_previews(config) {
2826        return Ok(());
2827    }
2828
2829    let mut preview_cache = PreviewPackageCache::default();
2830    hydrate_resource_preview_packages_for_entries(
2831        db,
2832        actor_id,
2833        &mut sections.semantic,
2834        config,
2835        &mut preview_cache,
2836    )
2837    .await?;
2838    hydrate_resource_preview_packages_for_entries(
2839        db,
2840        actor_id,
2841        &mut sections.episodic,
2842        config,
2843        &mut preview_cache,
2844    )
2845    .await?;
2846    hydrate_resource_preview_packages_for_entries(
2847        db,
2848        actor_id,
2849        &mut sections.procedural,
2850        config,
2851        &mut preview_cache,
2852    )
2853    .await?;
2854
2855    Ok(())
2856}
2857
2858fn should_package_resource_previews(config: &ContextConfig) -> bool {
2859    config.output_format == ContextFormat::Json
2860        && config.features.package_resource_previews()
2861        && config.max_resource_previews_per_entry > 0
2862        && config.max_resource_preview_chars > 0
2863}
2864
2865async fn hydrate_resource_preview_packages_for_entries(
2866    db: &HirnDB,
2867    actor_id: &AgentId,
2868    entries: &mut [ContextEntry],
2869    config: &ContextConfig,
2870    preview_cache: &mut PreviewPackageCache,
2871) -> HirnResult<()> {
2872    for entry in entries {
2873        entry.resource_preview_packages = package_resource_preview_packages_for_evidence(
2874            db,
2875            actor_id,
2876            &entry.resource_evidence,
2877            &entry.resource_preview_packages,
2878            config.max_resource_previews_per_entry,
2879            config.max_resource_preview_chars,
2880            preview_cache,
2881            PreviewPackageSurface::Think,
2882        )
2883        .await;
2884    }
2885
2886    Ok(())
2887}
2888
2889fn compose_candidate_text(candidate: &Candidate, compression: CompressionLevel) -> String {
2890    let raw_text = match compression {
2891        CompressionLevel::Full => candidate.full_content.clone(),
2892        CompressionLevel::Summary => {
2893            if candidate.summary.is_empty() {
2894                candidate.full_content.clone()
2895            } else {
2896                candidate.summary.clone()
2897            }
2898        }
2899        CompressionLevel::EntityOnly => {
2900            if candidate.entities.is_empty() {
2901                format!("(record {}, score: {:.2})", candidate.id, candidate.score)
2902            } else {
2903                format!(
2904                    "Re: {} (score: {:.2})",
2905                    candidate.entities.join(", "),
2906                    candidate.score
2907                )
2908            }
2909        }
2910    };
2911
2912    let mut text = if candidate.trust_score < 0.5 {
2913        format!("[low trust: {:.2}] {}", candidate.trust_score, raw_text)
2914    } else {
2915        raw_text
2916    };
2917    text.push_str(&evidence_suffix(&candidate.resource_evidence));
2918    text
2919}
2920
2921#[cfg(test)]
2922fn build_layer_section(
2923    classified: &[Candidate],
2924    layer: Layer,
2925    budget_tokens: usize,
2926    compression_threshold: f32,
2927    max_entries: Option<usize>,
2928    tokenizer: &dyn Tokenizer,
2929) -> (Vec<ContextEntry>, usize) {
2930    let mut layer_candidates: Vec<&Candidate> =
2931        classified.iter().filter(|c| c.layer == layer).collect();
2932
2933    sort_candidates_by_weighted_score(&mut layer_candidates);
2934
2935    build_layer_section_from_sorted(
2936        &layer_candidates,
2937        budget_tokens,
2938        compression_threshold,
2939        max_entries,
2940        tokenizer,
2941    )
2942}
2943
2944fn build_layer_section_from_sorted(
2945    layer_candidates: &[&Candidate],
2946    budget_tokens: usize,
2947    compression_threshold: f32,
2948    max_entries: Option<usize>,
2949    tokenizer: &dyn Tokenizer,
2950) -> (Vec<ContextEntry>, usize) {
2951    let mut entries = Vec::new();
2952    let mut used = 0;
2953
2954    for candidate in layer_candidates {
2955        if max_entries.is_some_and(|limit| entries.len() >= limit) {
2956            break;
2957        }
2958
2959        let preferred = determine_compression(candidate, compression_threshold);
2960        let selected = match preferred {
2961            CompressionLevel::Full => select_candidate_render(
2962                candidate,
2963                &[
2964                    CompressionLevel::Full,
2965                    CompressionLevel::Summary,
2966                    CompressionLevel::EntityOnly,
2967                ],
2968                used,
2969                budget_tokens,
2970                tokenizer,
2971            ),
2972            CompressionLevel::Summary => select_candidate_render(
2973                candidate,
2974                &[CompressionLevel::Summary, CompressionLevel::EntityOnly],
2975                used,
2976                budget_tokens,
2977                tokenizer,
2978            ),
2979            CompressionLevel::EntityOnly => select_candidate_render(
2980                candidate,
2981                &[CompressionLevel::EntityOnly],
2982                used,
2983                budget_tokens,
2984                tokenizer,
2985            ),
2986        };
2987
2988        let Some((content, tokens)) = selected else {
2989            break;
2990        };
2991
2992        used += tokens;
2993        entries.push(ContextEntry {
2994            id: candidate.id,
2995            content,
2996            token_cost: tokens,
2997            resource_evidence: candidate.resource_evidence.clone(),
2998            resource_preview_packages: candidate.resource_preview_packages.clone(),
2999            resource_score_attribution: candidate.resource_score_attribution.clone(),
3000        });
3001    }
3002
3003    (entries, used)
3004}
3005
3006fn select_candidate_render(
3007    candidate: &Candidate,
3008    levels: &[CompressionLevel],
3009    used_tokens: usize,
3010    budget_tokens: usize,
3011    tokenizer: &dyn Tokenizer,
3012) -> Option<(String, usize)> {
3013    for &level in levels {
3014        // Use the pre-computed token cost when available (non-zero).  A zero
3015        // cost means finalize_candidate_render_tokens has not run yet (e.g.
3016        // in isolated tests), so we fall back to live counting in that branch.
3017        let precomputed = match level {
3018            CompressionLevel::Full => candidate.tokens_full,
3019            CompressionLevel::Summary => candidate.tokens_summary,
3020            CompressionLevel::EntityOnly => candidate.tokens_entity,
3021        };
3022        if precomputed > 0 {
3023            if used_tokens + precomputed <= budget_tokens {
3024                return Some((compose_candidate_text(candidate, level), precomputed));
3025            }
3026            // Pre-computed cost confirms this level doesn't fit; skip text alloc.
3027            continue;
3028        }
3029        // Fallback: compute text and measure on the fly.
3030        let text = compose_candidate_text(candidate, level);
3031        let tokens = tokenizer.count_tokens(&text);
3032        if used_tokens + tokens <= budget_tokens {
3033            return Some((text, tokens));
3034        }
3035    }
3036
3037    None
3038}
3039
3040/// Build graph-connected and causal-upstream sections in a **single** async
3041/// pass, sharing one batch hydration call for all neighbour IDs.
3042///
3043/// Collecting IDs from the hot graph is synchronous (zero I/O).  Hydrating
3044/// record content requires Lance reads; combining both sections into one
3045/// `hydrate_context_contents_batch` call reduces the number of Lance
3046/// round-trips from 2–4 sequential per-section chunks to a single batched
3047/// request, cutting the wall-clock contribution of this step roughly in half.
3048async fn build_graph_and_causal_sections(
3049    db: &HirnDB,
3050    candidates: &[ScoredMemory],
3051    content_pool: &[ScoredMemory],
3052    graph_budget: usize,
3053    causal_budget: usize,
3054    tokenizer: &dyn Tokenizer,
3055) -> (Vec<String>, Vec<String>) {
3056    let candidate_ids: HashSet<MemoryId> = candidates.iter().map(|c| c.record.id()).collect();
3057
3058    // ── Collect all needed IDs from the hot graph (sync, no I/O) ────────
3059    let (neighbor_ids, causal_ids) = {
3060        let graph = db.cached_graph().hot_graph();
3061
3062        let mut neighbor_ids: Vec<(MemoryId, String)> = Vec::new();
3063        if graph_budget > 0 {
3064            let mut seen: HashSet<MemoryId> = HashSet::new();
3065            for sm in candidates {
3066                for edge in graph.get_edges(sm.record.id()) {
3067                    let neighbor = if edge.source == sm.record.id() {
3068                        edge.target
3069                    } else {
3070                        edge.source
3071                    };
3072                    if !candidate_ids.contains(&neighbor) && seen.insert(neighbor) {
3073                        let rel_label = format!("{:?}", edge.relation);
3074                        neighbor_ids.push((neighbor, rel_label));
3075                    }
3076                }
3077            }
3078        }
3079
3080        let mut causal_ids: Vec<MemoryId> = Vec::new();
3081        if causal_budget > 0 {
3082            let mut visited = candidate_ids.clone();
3083            let mut frontier: Vec<MemoryId> = candidates.iter().map(|c| c.record.id()).collect();
3084            for _depth in 0..3 {
3085                let mut next_frontier = Vec::new();
3086                for id in &frontier {
3087                    for edge in graph.get_edges(*id) {
3088                        if edge.relation != EdgeRelation::CausedBy
3089                            && edge.relation != EdgeRelation::Causes
3090                        {
3091                            continue;
3092                        }
3093                        let upstream = if edge.source == *id {
3094                            edge.target
3095                        } else {
3096                            edge.source
3097                        };
3098                        if visited.insert(upstream) {
3099                            causal_ids.push(upstream);
3100                            next_frontier.push(upstream);
3101                        }
3102                    }
3103                }
3104                frontier = next_frontier;
3105                if frontier.is_empty() {
3106                    break;
3107                }
3108            }
3109        }
3110
3111        (neighbor_ids, causal_ids)
3112    };
3113
3114    // ── Single batch hydration for ALL IDs across both sections ─────────
3115    let all_ids: Vec<MemoryId> = {
3116        let mut seen = HashSet::new();
3117        neighbor_ids
3118            .iter()
3119            .map(|(id, _)| *id)
3120            .chain(causal_ids.iter().copied())
3121            .filter(|id| seen.insert(*id))
3122            .collect()
3123    };
3124
3125    if all_ids.is_empty() {
3126        return (Vec::new(), Vec::new());
3127    }
3128
3129    // ── Build preliminary cache from content_pool (zero extra I/O) ──────
3130    // Any neighbor that was already loaded in the initial DataFusion scan is
3131    // served from this borrow-only map, avoiding a second Lance round-trip.
3132    let pool_cache: HashMap<MemoryId, &str> = content_pool
3133        .iter()
3134        .map(|sm| (sm.record.id(), extract_content_str(&sm.record)))
3135        .collect();
3136
3137    let hydrated = hydrate_context_contents_batch(db, all_ids, &pool_cache).await;
3138
3139    // ── Build graph section from cached results ──────────────────────────
3140    let mut graph_lines: Vec<String> = Vec::new();
3141    let mut graph_used = 0usize;
3142    if graph_budget > 0 {
3143        for (nid, rel) in &neighbor_ids {
3144            let content = hydrated
3145                .get(nid)
3146                .cloned()
3147                .unwrap_or_else(|| format!("(record {})", nid));
3148            let line = format!("[via {}] {}", rel, content);
3149            let tokens = tokenizer.count_tokens(&line);
3150            if graph_used + tokens > graph_budget {
3151                break;
3152            }
3153            graph_used += tokens;
3154            graph_lines.push(line);
3155        }
3156    }
3157
3158    // ── Build causal section from cached results ─────────────────────────
3159    let mut causal_lines: Vec<String> = Vec::new();
3160    let mut causal_used = 0usize;
3161    if causal_budget > 0 {
3162        for cid in &causal_ids {
3163            let content = hydrated
3164                .get(cid)
3165                .cloned()
3166                .unwrap_or_else(|| format!("(record {})", cid));
3167            let line = format!("[causal] {}", content);
3168            let tokens = tokenizer.count_tokens(&line);
3169            if causal_used + tokens > causal_budget {
3170                break;
3171            }
3172            causal_used += tokens;
3173            causal_lines.push(line);
3174        }
3175    }
3176
3177    (graph_lines, causal_lines)
3178}
3179
3180async fn hydrate_context_contents_batch(
3181    db: &HirnDB,
3182    ids: impl IntoIterator<Item = MemoryId>,
3183    preliminary_cache: &HashMap<MemoryId, &str>,
3184) -> HashMap<MemoryId, String> {
3185    let unique_ids: Vec<MemoryId> = ids
3186        .into_iter()
3187        .collect::<HashSet<_>>()
3188        .into_iter()
3189        .collect();
3190    if unique_ids.is_empty() {
3191        return HashMap::new();
3192    }
3193
3194    // Serve any IDs already present in the preliminary cache (zero Lance I/O).
3195    let mut contents = HashMap::with_capacity(unique_ids.len());
3196    let mut cache_miss_ids: Vec<MemoryId> = Vec::new();
3197    for id in &unique_ids {
3198        if let Some(&content) = preliminary_cache.get(id) {
3199            contents.insert(*id, content.to_string());
3200        } else {
3201            cache_miss_ids.push(*id);
3202        }
3203    }
3204
3205    if cache_miss_ids.is_empty() {
3206        return contents;
3207    }
3208
3209    // Single cross-layer batch fetch. Eliminates the previous two-phase
3210    // (episodic-first then non-episodic) serial approach — one fewer Lance
3211    // round-trip on the hot path. `get_memories_batch` scans all relevant
3212    // datasets in a single DataFusion plan.
3213    let records = db
3214        .get_memories_batch(&cache_miss_ids)
3215        .await
3216        .unwrap_or_default();
3217    for (id, record) in records {
3218        contents.insert(id, extract_content_str(&record).to_string());
3219    }
3220
3221    contents
3222}
3223
3224fn determine_compression(candidate: &Candidate, threshold: f32) -> CompressionLevel {
3225    if candidate.score >= threshold {
3226        CompressionLevel::Full
3227    } else if candidate.score >= threshold * 0.5 {
3228        CompressionLevel::Summary
3229    } else {
3230        CompressionLevel::EntityOnly
3231    }
3232}
3233
3234// ── Step 6: Score distribution ─────────────────────────────────────────
3235
3236fn compute_score_distribution(
3237    candidates: &[ScoredMemory],
3238    included_ids: &[MemoryId],
3239) -> ScoreDistribution {
3240    let included_set: HashSet<MemoryId> = included_ids.iter().copied().collect();
3241    let scores: Vec<f32> = candidates
3242        .iter()
3243        .filter(|c| included_set.contains(&c.record.id()))
3244        .map(|c| c.score)
3245        .collect();
3246
3247    if scores.is_empty() {
3248        return ScoreDistribution::default();
3249    }
3250
3251    let min = scores.iter().copied().fold(f32::INFINITY, f32::min);
3252    let max = scores.iter().copied().fold(f32::NEG_INFINITY, f32::max);
3253    let mean = scores.iter().sum::<f32>() / scores.len() as f32;
3254
3255    ScoreDistribution { min, max, mean }
3256}
3257
3258// ── Step 8: Format output ──────────────────────────────────────────────
3259
3260/// Per-entry token overhead added by the formatter beyond the raw content
3261/// token cost stored in `ContextEntry::token_cost`.  Used only for the
3262/// greedy trim estimate; the exact binary search corrects any discrepancy.
3263fn per_entry_format_overhead(format: ContextFormat) -> usize {
3264    match format {
3265        // "• " prefix (1 token) + trailing newline token (1 token)
3266        ContextFormat::Structured => 2,
3267        // Separators like ". Also, " or ". Then, " average ~2 tokens
3268        ContextFormat::Narrative => 2,
3269        // JSON object wrapper: `{"id":"…","content":"…"}` adds ~5 tokens
3270        ContextFormat::Json => 5,
3271    }
3272}
3273
3274fn fit_context_to_budget(
3275    format: ContextFormat,
3276    sections: &mut ContextSections,
3277    token_budget: usize,
3278    tokenizer: &dyn Tokenizer,
3279) -> String {
3280    let context = format_context_for_lengths(format, sections, sections.section_lengths());
3281    let tokens = tokenizer.count_tokens(&context);
3282
3283    if tokens <= token_budget {
3284        return context;
3285    }
3286
3287    let max_trims = sections.trimmable_count();
3288    if max_trims == 0 {
3289        return match format {
3290            ContextFormat::Json => minimal_json_context(token_budget, tokenizer),
3291            ContextFormat::Structured | ContextFormat::Narrative => String::new(),
3292        };
3293    }
3294
3295    // ── Prefix-sum phase ────────────────────────────────────────────────
3296    // Pre-compute the exact formatted cost of every trimmable entry (O(N)
3297    // small-string tokeniser calls) and find the minimum trim count K via a
3298    // linear scan over the cumulative savings.
3299    //
3300    // For `Structured` format the per-entry cost is the exact rendered bullet
3301    // line, so the prefix sum is accurate and K is found on the first attempt.
3302    // For `Narrative`/`Json` a fixed overhead constant is used, which may
3303    // under-estimate (especially for JSON where serde_json pretty-print adds
3304    // ~50 tokens of field names and indentation per entry vs the 5-token
3305    // constant).  When the prefix-sum exhausts all entries without covering
3306    // the deficit (k == max_trims), the estimate is unreliable and we skip
3307    // directly to binary search.
3308    let entry_costs = sections.compute_formatted_entry_costs(format, tokenizer);
3309    let deficit = tokens.saturating_sub(token_budget);
3310    let mut cum_cost = 0usize;
3311    let mut k = 0usize;
3312    let mut prefix_sum_covered = false;
3313    for &cost in &entry_costs {
3314        cum_cost += cost;
3315        k += 1;
3316        if cum_cost >= deficit {
3317            prefix_sum_covered = true;
3318            break;
3319        }
3320    }
3321    let k = k.min(max_trims);
3322
3323    // ── Single-shot format at estimated K ───────────────────────────────
3324    // Only attempt the single-shot when the prefix-sum actually covered the
3325    // deficit (prefix_sum_covered == true).  When k == max_trims with
3326    // prefix_sum_covered == false the overhead was grossly underestimated;
3327    // trimming everything trivially fits but produces an empty context — skip
3328    // straight to binary search which finds the true minimum trim count.
3329    if prefix_sum_covered {
3330        let keep = sections.keep_lengths_after_trim(k);
3331        let trial = format_context_for_lengths(format, sections, keep);
3332        let trial_tokens = tokenizer.count_tokens(&trial);
3333        if trial_tokens <= token_budget {
3334            sections.truncate_to_lengths(keep);
3335            return trial;
3336        }
3337    }
3338
3339    // ── Binary-search fallback ───────────────────────────────────────────
3340    // Covers two cases:
3341    // 1. prefix_sum_covered && single-shot failed  → search [k+1, max_trims]
3342    //    (small residual from section-header token changes on boundary trims)
3343    // 2. !prefix_sum_covered (k == max_trims)       → search [1, max_trims]
3344    //    (large overhead underestimation, e.g. JSON format with many entries)
3345    // Binary search guarantees O(log max_trims) full-context renders.
3346    let mut best_fit: Option<(ContextSectionLengths, String)> = None;
3347    let mut low = if prefix_sum_covered { k + 1 } else { 1 };
3348    let mut high = max_trims;
3349
3350    while low <= high {
3351        let mid = low + (high - low) / 2;
3352        let keep = sections.keep_lengths_after_trim(mid);
3353        let trial = format_context_for_lengths(format, sections, keep);
3354        let trial_tokens = tokenizer.count_tokens(&trial);
3355
3356        if trial_tokens <= token_budget {
3357            best_fit = Some((keep, trial));
3358            if mid == low {
3359                break;
3360            }
3361            high = mid - 1;
3362        } else {
3363            low = mid + 1;
3364        }
3365    }
3366
3367    if let Some((best_lengths, best_context)) = best_fit {
3368        sections.truncate_to_lengths(best_lengths);
3369        return best_context;
3370    }
3371
3372    // Fully-trimmed fallback (only reachable when every entry alone exceeds
3373    // the token budget — essentially empty output).
3374    let fully_trimmed = sections.keep_lengths_after_trim(max_trims);
3375    sections.truncate_to_lengths(fully_trimmed);
3376
3377    match format {
3378        ContextFormat::Json => minimal_json_context(token_budget, tokenizer),
3379        ContextFormat::Structured | ContextFormat::Narrative => String::new(),
3380    }
3381}
3382
3383fn format_context_for_lengths(
3384    format: ContextFormat,
3385    sections: &ContextSections,
3386    lengths: ContextSectionLengths,
3387) -> String {
3388    format_context(
3389        format,
3390        &sections.working_memory[..lengths.working_memory],
3391        &sections.contradictions[..lengths.contradictions],
3392        &sections.semantic[..lengths.semantic],
3393        &sections.episodic[..lengths.episodic],
3394        &sections.procedural[..lengths.procedural],
3395        &sections.graph_connected[..lengths.graph_connected],
3396        &sections.causal_upstream[..lengths.causal_upstream],
3397    )
3398}
3399
3400fn minimal_json_context(token_budget: usize, tokenizer: &dyn Tokenizer) -> String {
3401    const FULL_SCHEMA: &str = concat!(
3402        "{",
3403        "\"working_memory\":[],",
3404        "\"conflicts\":[],",
3405        "\"semantic\":[],",
3406        "\"episodic\":[],",
3407        "\"procedural\":[],",
3408        "\"graph_connected\":[],",
3409        "\"causal_upstream\":[]",
3410        "}"
3411    );
3412
3413    for candidate in [FULL_SCHEMA, r#"{"truncated":true}"#, "{}"] {
3414        if tokenizer.count_tokens(candidate) <= token_budget {
3415            return candidate.to_string();
3416        }
3417    }
3418
3419    "{}".to_string()
3420}
3421
3422fn format_context(
3423    format: ContextFormat,
3424    working_section: &[ContextEntry],
3425    contradiction_section: &[String],
3426    semantic_section: &[ContextEntry],
3427    episodic_section: &[ContextEntry],
3428    procedural_section: &[ContextEntry],
3429    graph_section: &[String],
3430    causal_section: &[String],
3431) -> String {
3432    match format {
3433        ContextFormat::Structured => format_structured(
3434            working_section,
3435            contradiction_section,
3436            semantic_section,
3437            episodic_section,
3438            procedural_section,
3439            graph_section,
3440            causal_section,
3441        ),
3442        ContextFormat::Narrative => format_narrative(
3443            working_section,
3444            contradiction_section,
3445            semantic_section,
3446            episodic_section,
3447            procedural_section,
3448            graph_section,
3449            causal_section,
3450        ),
3451        ContextFormat::Json => format_json(
3452            working_section,
3453            contradiction_section,
3454            semantic_section,
3455            episodic_section,
3456            procedural_section,
3457            graph_section,
3458            causal_section,
3459        ),
3460    }
3461}
3462
3463fn format_structured(
3464    working_section: &[ContextEntry],
3465    contradiction_section: &[String],
3466    semantic_section: &[ContextEntry],
3467    episodic_section: &[ContextEntry],
3468    procedural_section: &[ContextEntry],
3469    graph_section: &[String],
3470    causal_section: &[String],
3471) -> String {
3472    // Pre-size the buffer: header bytes (~30 per section) + per-entry content.
3473    // "• " prefix + content + "\n" = 3 extra bytes per entry.
3474    let capacity = working_section
3475        .iter()
3476        .map(|e| e.content.len() + 3)
3477        .sum::<usize>()
3478        + contradiction_section
3479            .iter()
3480            .map(|s| s.len() + 1)
3481            .sum::<usize>()
3482        + semantic_section
3483            .iter()
3484            .map(|e| e.content.len() + 3)
3485            .sum::<usize>()
3486        + episodic_section
3487            .iter()
3488            .map(|e| e.content.len() + 3)
3489            .sum::<usize>()
3490        + procedural_section
3491            .iter()
3492            .map(|e| e.content.len() + 3)
3493            .sum::<usize>()
3494        + graph_section.iter().map(|s| s.len() + 3).sum::<usize>()
3495        + causal_section.iter().map(|s| s.len() + 3).sum::<usize>()
3496        + 240; // section headers: 7 × ~30 bytes + padding
3497    let mut out = String::with_capacity(capacity);
3498
3499    if !working_section.is_empty() {
3500        out.push_str("## Working Memory\n");
3501        for entry in working_section {
3502            out.push_str("• ");
3503            out.push_str(&entry.content);
3504            out.push('\n');
3505        }
3506        out.push('\n');
3507    }
3508
3509    if !contradiction_section.is_empty() {
3510        out.push_str("## Conflicts\n");
3511        for line in contradiction_section {
3512            out.push_str(line);
3513            out.push('\n');
3514        }
3515        out.push('\n');
3516    }
3517
3518    if !semantic_section.is_empty() {
3519        out.push_str("## Semantic Knowledge\n");
3520        for entry in semantic_section {
3521            out.push_str("• ");
3522            out.push_str(&entry.content);
3523            out.push('\n');
3524        }
3525        out.push('\n');
3526    }
3527
3528    if !episodic_section.is_empty() {
3529        out.push_str("## Episodic Records\n");
3530        for entry in episodic_section {
3531            out.push_str("• ");
3532            out.push_str(&entry.content);
3533            out.push('\n');
3534        }
3535        out.push('\n');
3536    }
3537
3538    if !procedural_section.is_empty() {
3539        out.push_str("## Procedural Knowledge\n");
3540        for entry in procedural_section {
3541            out.push_str("• ");
3542            out.push_str(&entry.content);
3543            out.push('\n');
3544        }
3545        out.push('\n');
3546    }
3547
3548    if !graph_section.is_empty() {
3549        out.push_str("## Related (Graph-Connected)\n");
3550        for line in graph_section {
3551            out.push_str("• ");
3552            out.push_str(line);
3553            out.push('\n');
3554        }
3555        out.push('\n');
3556    }
3557
3558    if !causal_section.is_empty() {
3559        out.push_str("## Causal Context\n");
3560        for line in causal_section {
3561            out.push_str("• ");
3562            out.push_str(line);
3563            out.push('\n');
3564        }
3565        out.push('\n');
3566    }
3567
3568    // Truncate trailing whitespace in-place — avoids a second full-string allocation.
3569    let trimmed_len = out.trim_end().len();
3570    out.truncate(trimmed_len);
3571    out
3572}
3573
3574fn format_narrative(
3575    working_section: &[ContextEntry],
3576    contradiction_section: &[String],
3577    semantic_section: &[ContextEntry],
3578    episodic_section: &[ContextEntry],
3579    procedural_section: &[ContextEntry],
3580    graph_section: &[String],
3581    causal_section: &[String],
3582) -> String {
3583    // Narrative format has connective phrases (~30 bytes per section) + content.
3584    let capacity = working_section
3585        .iter()
3586        .map(|e| e.content.len() + 8)
3587        .sum::<usize>()
3588        + contradiction_section
3589            .iter()
3590            .map(|s| s.len() + 2)
3591            .sum::<usize>()
3592        + semantic_section
3593            .iter()
3594            .map(|e| e.content.len() + 4)
3595            .sum::<usize>()
3596        + episodic_section
3597            .iter()
3598            .map(|e| e.content.len() + 8)
3599            .sum::<usize>()
3600        + procedural_section
3601            .iter()
3602            .map(|e| e.content.len() + 8)
3603            .sum::<usize>()
3604        + graph_section.iter().map(|s| s.len() + 4).sum::<usize>()
3605        + causal_section.iter().map(|s| s.len() + 4).sum::<usize>()
3606        + 200; // per-section lead phrases
3607    let mut out = String::with_capacity(capacity);
3608
3609    if !working_section.is_empty() {
3610        out.push_str("Currently in focus: ");
3611        for (i, entry) in working_section.iter().enumerate() {
3612            if i > 0 {
3613                out.push_str(". Also, ");
3614            }
3615            out.push_str(&entry.content);
3616        }
3617        out.push_str(".\n\n");
3618    }
3619
3620    if !contradiction_section.is_empty() {
3621        out.push_str("Note: There are conflicting memories. ");
3622        for line in contradiction_section {
3623            out.push_str(line);
3624            out.push_str(". ");
3625        }
3626        out.push('\n');
3627        out.push('\n');
3628    }
3629
3630    if !semantic_section.is_empty() {
3631        out.push_str("Known facts: ");
3632        for (i, entry) in semantic_section.iter().enumerate() {
3633            if i > 0 {
3634                out.push_str(". ");
3635            }
3636            out.push_str(&entry.content);
3637        }
3638        out.push_str(".\n\n");
3639    }
3640
3641    if !episodic_section.is_empty() {
3642        out.push_str("From recent experience: ");
3643        for (i, entry) in episodic_section.iter().enumerate() {
3644            if i > 0 {
3645                out.push_str(". Then, ");
3646            }
3647            out.push_str(&entry.content);
3648        }
3649        out.push_str(".\n\n");
3650    }
3651
3652    if !procedural_section.is_empty() {
3653        out.push_str("Known procedures: ");
3654        for (i, entry) in procedural_section.iter().enumerate() {
3655            if i > 0 {
3656                out.push_str(". Also, ");
3657            }
3658            out.push_str(&entry.content);
3659        }
3660        out.push_str(".\n\n");
3661    }
3662
3663    if !graph_section.is_empty() {
3664        out.push_str("Related context: ");
3665        for (i, line) in graph_section.iter().enumerate() {
3666            if i > 0 {
3667                out.push_str(". ");
3668            }
3669            out.push_str(line);
3670        }
3671        out.push_str(".\n\n");
3672    }
3673
3674    if !causal_section.is_empty() {
3675        out.push_str("Causal background: ");
3676        for (i, line) in causal_section.iter().enumerate() {
3677            if i > 0 {
3678                out.push_str(". ");
3679            }
3680            out.push_str(line);
3681        }
3682        out.push('.');
3683    }
3684
3685    // Truncate trailing whitespace in-place — avoids a second full-string allocation.
3686    let trimmed_len = out.trim_end().len();
3687    out.truncate(trimmed_len);
3688    out
3689}
3690
3691// ── JSON serialisation helpers ──────────────────────────────────────────────
3692// Typed structs replace per-entry `serde_json::json!` macro calls, eliminating
3693// one BTreeMap allocation per section entry (O(N) savings over the old path).
3694
3695#[derive(Serialize)]
3696struct JsonWmEntry<'a> {
3697    id: String,
3698    content: &'a str,
3699}
3700
3701#[derive(Serialize)]
3702struct JsonRichEntry<'a> {
3703    id: String,
3704    content: &'a str,
3705    resource_evidence: serde_json::Value,
3706    resource_hydration_available: serde_json::Value,
3707    resource_preview_packages: serde_json::Value,
3708    resource_score_attribution: serde_json::Value,
3709}
3710
3711#[derive(Serialize)]
3712struct JsonContextRoot<'a> {
3713    working_memory: Vec<JsonWmEntry<'a>>,
3714    conflicts: &'a [String],
3715    semantic: Vec<JsonRichEntry<'a>>,
3716    episodic: Vec<JsonRichEntry<'a>>,
3717    procedural: Vec<JsonRichEntry<'a>>,
3718    graph_connected: &'a [String],
3719    causal_upstream: &'a [String],
3720}
3721
3722fn format_json(
3723    working_section: &[ContextEntry],
3724    contradiction_section: &[String],
3725    semantic_section: &[ContextEntry],
3726    episodic_section: &[ContextEntry],
3727    procedural_section: &[ContextEntry],
3728    graph_section: &[String],
3729    causal_section: &[String],
3730) -> String {
3731    fn rich_entry(entry: &ContextEntry) -> JsonRichEntry<'_> {
3732        JsonRichEntry {
3733            id: entry.id.to_string(),
3734            content: &entry.content,
3735            resource_evidence: resource_evidence_to_json(&entry.resource_evidence),
3736            resource_hydration_available: resource_hydration_to_json(&entry.resource_evidence),
3737            resource_preview_packages: resource_preview_packages_to_json(
3738                &entry.resource_preview_packages,
3739            ),
3740            resource_score_attribution: resource_score_attribution_to_json(
3741                &entry.resource_score_attribution,
3742            ),
3743        }
3744    }
3745
3746    let root = JsonContextRoot {
3747        working_memory: working_section
3748            .iter()
3749            .map(|e| JsonWmEntry {
3750                id: e.id.to_string(),
3751                content: &e.content,
3752            })
3753            .collect(),
3754        conflicts: contradiction_section,
3755        semantic: semantic_section.iter().map(rich_entry).collect(),
3756        episodic: episodic_section.iter().map(rich_entry).collect(),
3757        procedural: procedural_section.iter().map(rich_entry).collect(),
3758        graph_connected: graph_section,
3759        causal_upstream: causal_section,
3760    };
3761
3762    serde_json::to_string_pretty(&root).unwrap_or_else(|_| "{}".to_string())
3763}
3764
3765// ── Truncation ─────────────────────────────────────────────────────────
3766
3767fn truncate_to_budget(text: &str, max_tokens: usize, tokenizer: &dyn Tokenizer) -> String {
3768    if tokenizer.count_tokens(text) <= max_tokens {
3769        return text.to_string();
3770    }
3771
3772    // Binary search for the right character position.
3773    let chars: Vec<char> = text.chars().collect();
3774    let mut lo = 0;
3775    let mut hi = chars.len();
3776
3777    while lo < hi {
3778        let mid = (lo + hi).div_ceil(2);
3779        let slice: String = chars[..mid].iter().collect();
3780        if tokenizer.count_tokens(&slice) <= max_tokens {
3781            lo = mid;
3782        } else {
3783            hi = mid - 1;
3784        }
3785    }
3786
3787    let result: String = chars[..lo].iter().collect();
3788    result
3789}
3790
3791// ── AS clause formatters for RECALL ────────────────────────────────────
3792
3793/// Format RECALL results as a narrative.
3794pub async fn format_as_narrative(db: &HirnDB, records: &[ScoredMemory]) -> String {
3795    if records.is_empty() {
3796        return String::new();
3797    }
3798
3799    let mut out = String::new();
3800
3801    // Pre-collect contradiction edges for all records so we can show evolution.
3802    let mut contradictions: Vec<Vec<MemoryId>> = Vec::new();
3803    for sm in records {
3804        let edges = db
3805            .cached_graph()
3806            .get_edges_of_type(sm.record.id(), EdgeRelation::Contradicts)
3807            .await
3808            .unwrap_or_default();
3809        let ids = edges
3810            .iter()
3811            .map(|e| {
3812                if e.source == sm.record.id() {
3813                    e.target
3814                } else {
3815                    e.source
3816                }
3817            })
3818            .collect();
3819        contradictions.push(ids);
3820    }
3821
3822    // Track which record IDs are in the set for contradiction annotation.
3823    let record_ids: HashSet<MemoryId> = records.iter().map(|sm| sm.record.id()).collect();
3824
3825    for (i, sm) in records.iter().enumerate() {
3826        let content = extract_content_str(&sm.record);
3827        let transition = if i == 0 {
3828            String::new()
3829        } else {
3830            // Check if there's a causal link from the previous record.
3831            let prev_id = records[i - 1].record.id();
3832            let curr_id = sm.record.id();
3833            let causal = db
3834                .cached_graph()
3835                .get_edges_of_type(prev_id, EdgeRelation::Causes)
3836                .await
3837                .unwrap_or_default();
3838            let is_causal = causal
3839                .iter()
3840                .any(|e| e.target == curr_id || e.source == curr_id);
3841
3842            // Check if this record contradicts an earlier one in the sequence.
3843            let contradicts_earlier = contradictions[i]
3844                .iter()
3845                .any(|cid| records[..i].iter().any(|r| r.record.id() == *cid));
3846
3847            if contradicts_earlier {
3848                "However, this was later revised: ".to_string()
3849            } else if is_causal {
3850                "As a result of this, ".to_string()
3851            } else {
3852                // Check for temporal gap.
3853                let prev_ts = extract_timestamp(&records[i - 1].record);
3854                let curr_ts = extract_timestamp(&sm.record);
3855                let gap_hours = (curr_ts.timestamp_ms() as f64 - prev_ts.timestamp_ms() as f64)
3856                    / (3600.0 * 1000.0);
3857                if gap_hours > 24.0 {
3858                    format!("After a gap of {gap_hours:.0} hours, ")
3859                } else {
3860                    "Then, ".to_string()
3861                }
3862            }
3863        };
3864
3865        // Add timestamp context.
3866        let ts = extract_timestamp(&sm.record);
3867        let dt = ts.as_datetime();
3868        let ts_str = dt.format("%Y-%m-%d %H:%M").to_string();
3869
3870        if !transition.is_empty() {
3871            out.push_str(&transition);
3872        }
3873
3874        // Annotate superseded records (contradicted by a later record in sequence).
3875        let is_superseded = contradictions[i].iter().any(|cid| {
3876            records[i + 1..]
3877                .iter()
3878                .any(|r| r.record.id() == *cid && record_ids.contains(cid))
3879        });
3880
3881        if is_superseded {
3882            write!(out, "[{ts_str}] [superseded] {content}").ok();
3883        } else {
3884            write!(out, "[{ts_str}] {content}").ok();
3885        }
3886        if i < records.len() - 1 {
3887            out.push_str(". ");
3888        }
3889    }
3890
3891    out
3892}
3893
3894/// Format RECALL results as a causal chain.
3895pub async fn format_as_causal_chain(
3896    db: &HirnDB,
3897    records: &[ScoredMemory],
3898    depth: Option<usize>,
3899) -> String {
3900    if records.is_empty() {
3901        return String::new();
3902    }
3903
3904    let max_depth = depth.unwrap_or(usize::MAX);
3905    let record_ids: HashSet<MemoryId> = records.iter().map(|sm| sm.record.id()).collect();
3906
3907    // Build causal chains starting from each record.
3908    let mut chains: Vec<Vec<(MemoryId, String, f32)>> = Vec::new();
3909    let mut visited: HashSet<MemoryId> = HashSet::new();
3910
3911    for sm in records {
3912        let id = sm.record.id();
3913        if visited.contains(&id) {
3914            continue;
3915        }
3916
3917        let mut chain = vec![(id, extract_content_str(&sm.record).to_string(), 0.0f32)];
3918        visited.insert(id);
3919        let mut current = id;
3920        let mut hops = 0;
3921
3922        // Follow causes edges forward.
3923        while hops < max_depth {
3924            let causal_edges = db
3925                .cached_graph()
3926                .get_edges_of_type(current, EdgeRelation::Causes)
3927                .await
3928                .unwrap_or_default();
3929            let next_info = causal_edges
3930                .iter()
3931                .find(|e| {
3932                    let target = if e.source == current {
3933                        e.target
3934                    } else {
3935                        e.source
3936                    };
3937                    record_ids.contains(&target) && !visited.contains(&target)
3938                })
3939                .map(|e| {
3940                    let target = if e.source == current {
3941                        e.target
3942                    } else {
3943                        e.source
3944                    };
3945                    (target, e.weight)
3946                });
3947
3948            if let Some((target, weight)) = next_info {
3949                if let Some(sm) = records.iter().find(|r| r.record.id() == target) {
3950                    chain.push((target, extract_content_str(&sm.record).to_string(), weight));
3951                    visited.insert(target);
3952                    current = target;
3953                    hops += 1;
3954                } else {
3955                    break;
3956                }
3957            } else {
3958                break;
3959            }
3960        }
3961
3962        if chain.len() > 1 || chains.is_empty() {
3963            chains.push(chain);
3964        }
3965    }
3966
3967    // Format chains.
3968    let mut out = String::new();
3969    for (ci, chain) in chains.iter().enumerate() {
3970        if ci > 0 {
3971            out.push_str("\n\n");
3972        }
3973        for (i, (_, content, weight)) in chain.iter().enumerate() {
3974            if i > 0 {
3975                write!(out, " → [caused, w={weight:.2}] → ").ok();
3976            }
3977            out.push_str(content);
3978        }
3979    }
3980
3981    out
3982}
3983
3984/// Format RECALL results as a graph structure.
3985pub async fn format_as_graph(db: &HirnDB, records: &[ScoredMemory]) -> String {
3986    if records.is_empty() {
3987        return "{}".to_string();
3988    }
3989
3990    let record_ids: HashSet<MemoryId> = records.iter().map(|sm| sm.record.id()).collect();
3991
3992    // Build nodes.
3993    let nodes: Vec<serde_json::Value> = records
3994        .iter()
3995        .map(|sm| {
3996            let (importance, _) = extract_record_stats_context(&sm.record);
3997            serde_json::json!({
3998                "id": sm.record.id().to_string(),
3999                "content": extract_content_str(&sm.record),
4000                "layer": format!("{:?}", sm.record.layer()),
4001                "importance": importance,
4002                "activation": sm.score_breakdown.activation,
4003                "score": sm.score,
4004            })
4005        })
4006        .collect();
4007
4008    // Build edges (only between result nodes).
4009    let mut edges_out: Vec<serde_json::Value> = Vec::new();
4010    let mut seen_edges: HashSet<MemoryId> = HashSet::new();
4011
4012    for sm in records {
4013        let id = sm.record.id();
4014        let all_edges = db.cached_graph().get_edges(id).await.unwrap_or_default();
4015        for edge in all_edges {
4016            if seen_edges.contains(&edge.id) {
4017                continue;
4018            }
4019            let other = if edge.source == id {
4020                edge.target
4021            } else {
4022                edge.source
4023            };
4024            if record_ids.contains(&other) {
4025                seen_edges.insert(edge.id);
4026                edges_out.push(serde_json::json!({
4027                    "source": edge.source.to_string(),
4028                    "target": edge.target.to_string(),
4029                    "relation": format!("{:?}", edge.relation),
4030                    "weight": edge.weight,
4031                }));
4032            }
4033        }
4034    }
4035
4036    let graph_json = serde_json::json!({
4037        "nodes": nodes,
4038        "edges": edges_out,
4039    });
4040
4041    serde_json::to_string_pretty(&graph_json).unwrap_or_else(|_| "{}".to_string())
4042}
4043
4044// ── Helpers ────────────────────────────────────────────────────────────
4045
4046const fn extract_timestamp(record: &MemoryRecord) -> hirn_core::Timestamp {
4047    match record {
4048        MemoryRecord::Episodic(e) => e.timestamp,
4049        MemoryRecord::Semantic(s) => s.created_at,
4050        MemoryRecord::Working(w) => w.created_at,
4051        MemoryRecord::Procedural(p) => p.created_at,
4052    }
4053}
4054
4055const fn extract_record_stats_context(record: &MemoryRecord) -> (f32, hirn_core::Timestamp) {
4056    match record {
4057        MemoryRecord::Episodic(e) => (e.importance, e.timestamp),
4058        MemoryRecord::Semantic(s) => (s.confidence, s.created_at),
4059        MemoryRecord::Working(w) => (w.relevance_score, w.created_at),
4060        MemoryRecord::Procedural(p) => (p.success_rate, p.created_at),
4061    }
4062}
4063
4064// ── ScopedContextAssemblyRuntime ──────────────────────────────────────────
4065
4066/// Engine-side implementation of [`hirn_exec::extensions::ContextAssemblyRuntime`].
4067///
4068/// Registered once per THINK query execution, capturing the `HirnDB` reference,
4069/// actor identity, scored candidates, and context config. `ContextAssemblyExec`
4070/// (pipeline mode) calls `assemble_from_batches`, passing the raw Arrow output
4071/// from `ContextBudgetExec`.  This makes THINK context assembly a true
4072/// `SendableRecordBatchStream` operator rather than an imperative post-step.
4073///
4074/// # Design Notes
4075/// - The `candidates` and `content_pool` are pre-computed before plan execution;
4076///   they are used as the hydration pool for graph/causal neighbor resolution.
4077/// - Raw Arrow batches from `ContextBudgetExec` bypass the secondary Lance scan
4078///   for the top-k candidate set (the Arrow fast path in `assemble_think_context`).
4079/// - The runtime is RAII-managed via `RegisteredContextAssemblyRuntime`; the
4080///   caller must hold the guard until the plan output is decoded.
4081pub struct ScopedContextAssemblyRuntime {
4082    pub db: std::sync::Arc<HirnDB>,
4083    pub actor_id: AgentId,
4084    pub candidates: Vec<super::results::ScoredMemory>,
4085    pub content_pool: Vec<super::results::ScoredMemory>,
4086    pub config: ContextConfig,
4087    pub allowed_namespaces: Option<Vec<Namespace>>,
4088}
4089
4090#[async_trait]
4091impl hirn_exec::extensions::ContextAssemblyRuntime for ScopedContextAssemblyRuntime {
4092    async fn assemble_from_batches(
4093        &self,
4094        raw_batches: Vec<arrow_array::RecordBatch>,
4095    ) -> HirnResult<Vec<u8>> {
4096        let result = assemble_think_context(
4097            &self.db,
4098            &self.actor_id,
4099            &self.candidates,
4100            &self.config,
4101            self.allowed_namespaces.as_deref(),
4102            Some(&self.content_pool),
4103            Some(&raw_batches),
4104        )
4105        .await?;
4106
4107        let output = ThinkAssemblyOutput {
4108            context: result.context,
4109            token_count: result.token_count,
4110            records: self.candidates.clone(),
4111            records_included: result.records_included,
4112            records_excluded_count: result.records_excluded_count,
4113            contradictions: result.contradictions,
4114            conflict_groups: result.conflict_groups,
4115            score_distribution: result.score_distribution,
4116        };
4117
4118        serde_json::to_vec(&output).map_err(|e| {
4119            hirn_core::HirnError::InvalidInput(format!(
4120                "ScopedContextAssemblyRuntime: ThinkAssemblyOutput serialization failed: {e}"
4121            ))
4122        })
4123    }
4124}
4125
4126// ── Tests ──────────────────────────────────────────────────────────────
4127
4128#[cfg(test)]
4129mod tests {
4130    use std::sync::atomic::{AtomicUsize, Ordering};
4131
4132    use super::*;
4133    use std::sync::Arc;
4134
4135    use hirn_core::HirnConfig;
4136    use hirn_core::Timestamp;
4137    use hirn_core::episodic::EpisodicRecord;
4138    use hirn_core::metadata::Metadata;
4139    use hirn_core::resource::{
4140        DerivedArtifactKind, EvidenceProvenance, EvidenceRole, ModalityProfile, ResourceId,
4141    };
4142    use hirn_core::semantic::SemanticRecord;
4143    use hirn_core::types::{AgentId, EdgeRelation, EventType, KnowledgeType};
4144    use hirn_storage::memory_store::MemoryStore;
4145
4146    use super::super::results::ScoreBreakdown;
4147
4148    fn make_episodic(content: &str, summary: &str, importance: f32) -> ScoredMemory {
4149        ScoredMemory {
4150            record: MemoryRecord::Episodic(
4151                EpisodicRecord::builder()
4152                    .event_type(EventType::Observation)
4153                    .content(content)
4154                    .summary(summary)
4155                    .importance(importance)
4156                    .agent_id(AgentId::new("test").unwrap())
4157                    .build()
4158                    .unwrap(),
4159            ),
4160            revision: None,
4161            score: importance,
4162            score_breakdown: ScoreBreakdown {
4163                similarity: importance,
4164                importance,
4165                recency: 0.5,
4166                activation: 0.0,
4167                causal_relevance: 0.0,
4168                surprise: 0.0,
4169                source_reliability: 0.0,
4170            },
4171            resource_evidence: Vec::new(),
4172            resource_preview_packages: Vec::new(),
4173            resource_score_attribution: Vec::new(),
4174        }
4175    }
4176
4177    fn make_semantic(concept: &str, description: &str, confidence: f32) -> ScoredMemory {
4178        ScoredMemory {
4179            record: MemoryRecord::Semantic(
4180                SemanticRecord::builder()
4181                    .concept(concept)
4182                    .knowledge_type(KnowledgeType::Propositional)
4183                    .description(description)
4184                    .confidence(confidence)
4185                    .agent_id(AgentId::new("test").unwrap())
4186                    .build()
4187                    .unwrap(),
4188            ),
4189            revision: None,
4190            score: confidence,
4191            score_breakdown: ScoreBreakdown {
4192                similarity: confidence,
4193                importance: confidence,
4194                recency: 0.5,
4195                activation: 0.0,
4196                causal_relevance: 0.0,
4197                surprise: 0.0,
4198                source_reliability: 0.0,
4199            },
4200            resource_evidence: Vec::new(),
4201            resource_preview_packages: Vec::new(),
4202            resource_score_attribution: Vec::new(),
4203        }
4204    }
4205
4206    fn make_context_entry(content: &str) -> ContextEntry {
4207        ContextEntry {
4208            id: MemoryId::new(),
4209            content: content.to_string(),
4210            token_cost: 0,
4211            resource_evidence: Vec::new(),
4212            resource_preview_packages: Vec::new(),
4213            resource_score_attribution: Vec::new(),
4214        }
4215    }
4216
4217    fn make_resource_evidence_summary() -> ResourceEvidenceSummary {
4218        ResourceEvidenceSummary {
4219            resource_id: ResourceId::new(),
4220            role: EvidenceRole::Source,
4221            provenance: EvidenceProvenance::ObservedResource,
4222            artifact_id: None,
4223            artifact_kind: None,
4224            lifecycle_state: ResourceGovernanceState::Active,
4225            modality: Some(ModalityProfile::Document),
4226            mime_type: Some("application/pdf".into()),
4227            display_name: Some("architecture.pdf".into()),
4228            available_artifacts: vec![DerivedArtifactKind::Preview],
4229            has_preview: true,
4230            can_hydrate_preview: true,
4231            can_hydrate_full: false,
4232        }
4233    }
4234
4235    fn test_tokenizer() -> std::sync::Arc<dyn Tokenizer> {
4236        hirn_provider::default_tokenizer()
4237    }
4238
4239    struct CountingTokenizer {
4240        count_calls: AtomicUsize,
4241    }
4242
4243    impl hirn_core::embed::TokenCounter for CountingTokenizer {
4244        fn count_tokens(&self, text: &str) -> usize {
4245            self.count_calls.fetch_add(1, Ordering::SeqCst);
4246            text.split_whitespace().count()
4247        }
4248    }
4249
4250    impl Tokenizer for CountingTokenizer {
4251        fn truncate(&self, text: &str, max_tokens: usize) -> String {
4252            text.split_whitespace()
4253                .take(max_tokens)
4254                .collect::<Vec<_>>()
4255                .join(" ")
4256        }
4257
4258        fn encode(&self, text: &str) -> Vec<usize> {
4259            (0..text.split_whitespace().count()).collect()
4260        }
4261
4262        fn decode(&self, tokens: &[usize]) -> HirnResult<String> {
4263            Ok(tokens
4264                .iter()
4265                .map(|token| token.to_string())
4266                .collect::<Vec<_>>()
4267                .join(" "))
4268        }
4269
4270        fn model_id(&self) -> &str {
4271            "counting"
4272        }
4273
4274        fn max_tokens(&self) -> usize {
4275            4096
4276        }
4277    }
4278
4279    async fn temp_db() -> (HirnDB, tempfile::TempDir) {
4280        let dir = tempfile::tempdir().unwrap();
4281        let path = dir.path().join("ql-context-tests");
4282        let config = HirnConfig::builder()
4283            .db_path(&path)
4284            .embedding_dimensions(4)
4285            .working_memory_token_limit(1000)
4286            .build()
4287            .unwrap();
4288        let db = HirnDB::open_with_config(config, Arc::new(MemoryStore::new()))
4289            .await
4290            .unwrap();
4291        (db, dir)
4292    }
4293
4294    fn scored_record(record: MemoryRecord, score: f32) -> ScoredMemory {
4295        ScoredMemory {
4296            record,
4297            revision: None,
4298            score,
4299            score_breakdown: ScoreBreakdown {
4300                similarity: score,
4301                importance: score,
4302                recency: 0.5,
4303                activation: 0.0,
4304                causal_relevance: 0.0,
4305                surprise: 0.0,
4306                source_reliability: 0.0,
4307            },
4308            resource_evidence: Vec::new(),
4309            resource_preview_packages: Vec::new(),
4310            resource_score_attribution: Vec::new(),
4311        }
4312    }
4313
4314    fn make_conflict_member(memory_id: MemoryId, status: ConflictMemberStatus) -> ConflictMember {
4315        ConflictMember {
4316            memory_id,
4317            logical_memory_id: None,
4318            revision_id: None,
4319            status,
4320            layer: Layer::Semantic,
4321            content: format!("claim {memory_id}"),
4322            in_result_set: true,
4323            source_reliability: 1.0,
4324            recency_basis_ms: memory_id.timestamp_ms() as i64,
4325        }
4326    }
4327
4328    fn default_conflict_policy() -> ConflictResolutionPolicy {
4329        ConflictResolutionPolicy::default()
4330    }
4331
4332    #[test]
4333    fn classify_candidates_layers() {
4334        let tok = test_tokenizer();
4335        let candidates = vec![
4336            make_episodic("episode content", "ep summary", 0.9),
4337            make_semantic("concept_a", "semantic description", 0.8),
4338        ];
4339
4340        let classified = classify_candidates(&candidates, tok.as_ref());
4341
4342        assert_eq!(classified.len(), 2);
4343        assert_eq!(classified[0].layer, Layer::Episodic);
4344        assert_eq!(classified[1].layer, Layer::Semantic);
4345        assert!(classified[0].token_count_full > 0);
4346        assert!(classified[1].token_count_full > 0);
4347    }
4348
4349    #[test]
4350    fn classify_candidates_token_counts() {
4351        let tok = test_tokenizer();
4352        let candidates = vec![make_episodic(
4353            "This is a longer piece of content for testing token counting",
4354            "short summary",
4355            0.7,
4356        )];
4357
4358        let classified = classify_candidates(&candidates, tok.as_ref());
4359
4360        assert!(classified[0].token_count_full > classified[0].token_count_summary);
4361    }
4362
4363    #[test]
4364    fn classify_and_build_layer_preserve_seeded_preview_packages() {
4365        let tok = test_tokenizer();
4366        let mut candidate = make_episodic("episode content", "ep summary", 0.9);
4367        candidate
4368            .resource_preview_packages
4369            .push(ResourcePreviewPackage {
4370                resource_id: ResourceId::new(),
4371                role: hirn_core::EvidenceRole::Source,
4372                display_name: Some("diagram.png".into()),
4373                modality: Some(ModalityProfile::Image),
4374                artifact_kind: DerivedArtifactKind::Preview,
4375                artifact_modality: ModalityProfile::Text,
4376                text_content: "seeded topology preview".into(),
4377                truncated: false,
4378            });
4379
4380        let classified = classify_candidates(&[candidate], tok.as_ref());
4381        assert_eq!(classified[0].resource_preview_packages.len(), 1);
4382        assert_eq!(
4383            classified[0].resource_preview_packages[0].text_content,
4384            "seeded topology preview"
4385        );
4386
4387        let (entries, _used) =
4388            build_layer_section(&classified, Layer::Episodic, 256, 0.4, None, tok.as_ref());
4389        assert_eq!(entries.len(), 1);
4390        assert_eq!(entries[0].resource_preview_packages.len(), 1);
4391        assert_eq!(
4392            entries[0].resource_preview_packages[0].text_content,
4393            "seeded topology preview"
4394        );
4395    }
4396
4397    #[test]
4398    fn budget_allocation_with_no_working_memory() {
4399        let config = ContextConfig {
4400            token_budget: 1000,
4401            working_memory_reserve: 0.2,
4402            ..Default::default()
4403        };
4404        let tok = test_tokenizer();
4405        let wm: Vec<WorkingMemoryEntry> = vec![];
4406        let conflicts: Vec<ConflictGroup> = vec![];
4407        let classified = vec![
4408            Candidate {
4409                id: MemoryId::new(),
4410                layer: Layer::Semantic,
4411                full_content: "sem".into(),
4412                summary: "s".into(),
4413                score: 0.9,
4414                trust_score: 1.0,
4415                token_count_full: 10,
4416                token_count_summary: 2,
4417                tokens_full: 0,
4418                tokens_summary: 0,
4419                tokens_entity: 0,
4420                is_contradiction: false,
4421                entities: vec![],
4422                resource_evidence: Vec::new(),
4423                resource_preview_packages: Vec::new(),
4424                resource_score_attribution: Vec::new(),
4425            },
4426            Candidate {
4427                id: MemoryId::new(),
4428                layer: Layer::Episodic,
4429                full_content: "ep".into(),
4430                summary: "e".into(),
4431                score: 0.8,
4432                trust_score: 1.0,
4433                token_count_full: 10,
4434                token_count_summary: 2,
4435                tokens_full: 0,
4436                tokens_summary: 0,
4437                tokens_entity: 0,
4438                is_contradiction: false,
4439                entities: vec![],
4440                resource_evidence: Vec::new(),
4441                resource_preview_packages: Vec::new(),
4442                resource_score_attribution: Vec::new(),
4443            },
4444        ];
4445
4446        let alloc = allocate_budget(&config, &wm, &conflicts, &classified, tok.as_ref());
4447
4448        // No WM → no WM budget.
4449        assert_eq!(alloc.working_memory, 0);
4450        // Full budget distributed across tiers (direct + graph + causal).
4451        let total_alloc = alloc.semantic
4452            + alloc.episodic
4453            + alloc.procedural
4454            + alloc.graph_connected
4455            + alloc.causal_upstream;
4456        assert!(total_alloc > 0);
4457        assert!(total_alloc <= 1000);
4458    }
4459
4460    #[test]
4461    fn budget_allocation_only_semantic() {
4462        let config = ContextConfig {
4463            token_budget: 1000,
4464            ..Default::default()
4465        };
4466        let tok = test_tokenizer();
4467        let classified = vec![Candidate {
4468            id: MemoryId::new(),
4469            layer: Layer::Semantic,
4470            full_content: "sem".into(),
4471            summary: "s".into(),
4472            score: 0.9,
4473            trust_score: 1.0,
4474            token_count_full: 10,
4475            token_count_summary: 2,
4476            tokens_full: 0,
4477            tokens_summary: 0,
4478            tokens_entity: 0,
4479            is_contradiction: false,
4480            entities: vec![],
4481            resource_evidence: Vec::new(),
4482            resource_preview_packages: Vec::new(),
4483            resource_score_attribution: Vec::new(),
4484        }];
4485
4486        let alloc = allocate_budget(&config, &[], &[], &classified, tok.as_ref());
4487
4488        // Only semantic → gets all remaining budget.
4489        assert_eq!(alloc.episodic, 0);
4490        assert!(alloc.semantic > 0);
4491    }
4492
4493    #[test]
4494    fn budget_allocation_reserves_output_format_overhead() {
4495        let tok = std::sync::Arc::new(CountingTokenizer {
4496            count_calls: AtomicUsize::new(0),
4497        });
4498        let classified = vec![
4499            Candidate {
4500                id: MemoryId::new(),
4501                layer: Layer::Semantic,
4502                full_content: "semantic".into(),
4503                summary: "semantic".into(),
4504                score: 0.9,
4505                trust_score: 1.0,
4506                token_count_full: 1,
4507                token_count_summary: 1,
4508                tokens_full: 0,
4509                tokens_summary: 0,
4510                tokens_entity: 0,
4511                is_contradiction: false,
4512                entities: vec![],
4513                resource_evidence: Vec::new(),
4514                resource_preview_packages: Vec::new(),
4515                resource_score_attribution: Vec::new(),
4516            },
4517            Candidate {
4518                id: MemoryId::new(),
4519                layer: Layer::Episodic,
4520                full_content: "episodic".into(),
4521                summary: "episodic".into(),
4522                score: 0.8,
4523                trust_score: 1.0,
4524                token_count_full: 1,
4525                token_count_summary: 1,
4526                tokens_full: 0,
4527                tokens_summary: 0,
4528                tokens_entity: 0,
4529                is_contradiction: false,
4530                entities: vec![],
4531                resource_evidence: Vec::new(),
4532                resource_preview_packages: Vec::new(),
4533                resource_score_attribution: Vec::new(),
4534            },
4535            Candidate {
4536                id: MemoryId::new(),
4537                layer: Layer::Procedural,
4538                full_content: "procedural".into(),
4539                summary: "procedural".into(),
4540                score: 0.7,
4541                trust_score: 1.0,
4542                token_count_full: 1,
4543                token_count_summary: 1,
4544                tokens_full: 0,
4545                tokens_summary: 0,
4546                tokens_entity: 0,
4547                is_contradiction: false,
4548                entities: vec![],
4549                resource_evidence: Vec::new(),
4550                resource_preview_packages: Vec::new(),
4551                resource_score_attribution: Vec::new(),
4552            },
4553        ];
4554        let structured = ContextConfig {
4555            token_budget: 120,
4556            output_format: ContextFormat::Structured,
4557            features: ContextFeatures::empty(),
4558            ..Default::default()
4559        };
4560        let json = ContextConfig {
4561            token_budget: 120,
4562            output_format: ContextFormat::Json,
4563            features: ContextFeatures::empty(),
4564            ..Default::default()
4565        };
4566
4567        let structured_alloc = allocate_budget(&structured, &[], &[], &classified, tok.as_ref());
4568        let json_alloc = allocate_budget(&json, &[], &[], &classified, tok.as_ref());
4569        let structured_total = structured_alloc.working_memory
4570            + structured_alloc.contradictions
4571            + structured_alloc.semantic
4572            + structured_alloc.episodic
4573            + structured_alloc.procedural
4574            + structured_alloc.graph_connected
4575            + structured_alloc.causal_upstream;
4576        let json_total = json_alloc.working_memory
4577            + json_alloc.contradictions
4578            + json_alloc.semantic
4579            + json_alloc.episodic
4580            + json_alloc.procedural
4581            + json_alloc.graph_connected
4582            + json_alloc.causal_upstream;
4583
4584        assert!(structured_total < structured.token_budget);
4585        assert!(json_total < json.token_budget);
4586        assert!(json_total < structured_total);
4587    }
4588
4589    #[test]
4590    fn progressive_compression_full_content() {
4591        let c = Candidate {
4592            id: MemoryId::new(),
4593            layer: Layer::Episodic,
4594            full_content: "full".into(),
4595            summary: "sum".into(),
4596            score: 0.9,
4597            trust_score: 1.0,
4598            token_count_full: 5,
4599            token_count_summary: 2,
4600            tokens_full: 0,
4601            tokens_summary: 0,
4602            tokens_entity: 0,
4603            is_contradiction: false,
4604            entities: vec![],
4605            resource_evidence: Vec::new(),
4606            resource_preview_packages: Vec::new(),
4607            resource_score_attribution: Vec::new(),
4608        };
4609
4610        assert_eq!(determine_compression(&c, 0.4), CompressionLevel::Full);
4611    }
4612
4613    #[test]
4614    fn progressive_compression_summary() {
4615        let mut c = Candidate {
4616            id: MemoryId::new(),
4617            layer: Layer::Episodic,
4618            full_content: "full".into(),
4619            summary: "sum".into(),
4620            score: 0.3,
4621            trust_score: 1.0,
4622            token_count_full: 5,
4623            token_count_summary: 2,
4624            tokens_full: 0,
4625            tokens_summary: 0,
4626            tokens_entity: 0,
4627            is_contradiction: false,
4628            entities: vec![],
4629            resource_evidence: Vec::new(),
4630            resource_preview_packages: Vec::new(),
4631            resource_score_attribution: Vec::new(),
4632        };
4633
4634        assert_eq!(determine_compression(&c, 0.4), CompressionLevel::Summary);
4635
4636        c.score = 0.1;
4637        assert_eq!(determine_compression(&c, 0.4), CompressionLevel::EntityOnly);
4638    }
4639
4640    #[test]
4641    fn truncate_to_budget_exact() {
4642        let tok = test_tokenizer();
4643        let text = "hello world this is a test of truncation to budget limits";
4644        let truncated = truncate_to_budget(text, 5, tok.as_ref());
4645        let count = tok.count_tokens(&truncated);
4646        assert!(count <= 5);
4647    }
4648
4649    #[test]
4650    fn format_structured_sections() {
4651        let wm = vec![make_context_entry("active task")];
4652        let conflicts = vec![];
4653        let sem = vec![make_context_entry("semantic fact")];
4654        let ep = vec![make_context_entry("episode event")];
4655
4656        let output = format_structured(&wm, &conflicts, &sem, &ep, &[], &[], &[]);
4657
4658        assert!(output.contains("## Working Memory"));
4659        assert!(output.contains("## Semantic Knowledge"));
4660        assert!(output.contains("## Episodic Records"));
4661        assert!(!output.contains("## Conflicts"));
4662    }
4663
4664    #[test]
4665    fn format_structured_empty_sections_omitted() {
4666        let output = format_structured(&[], &[], &[], &[], &[], &[], &[]);
4667        assert!(output.is_empty());
4668    }
4669
4670    #[test]
4671    fn format_json_valid() {
4672        let wm = vec![make_context_entry("context item")];
4673        let sem = vec![make_context_entry("fact")];
4674        let ep = vec![make_context_entry("event")];
4675
4676        let output = format_json(&wm, &[], &sem, &ep, &[], &[], &[]);
4677        let parsed: serde_json::Value = serde_json::from_str(&output).unwrap();
4678
4679        assert!(parsed.get("working_memory").is_some());
4680        assert_eq!(parsed["working_memory"][0]["content"], "context item");
4681        assert!(parsed["working_memory"][0].get("id").is_some());
4682        assert!(parsed.get("semantic").is_some());
4683        assert!(parsed.get("episodic").is_some());
4684        assert!(parsed.get("conflicts").is_some());
4685        assert!(parsed.get("procedural").is_some());
4686        assert!(parsed.get("graph_connected").is_some());
4687        assert!(parsed.get("causal_upstream").is_some());
4688    }
4689
4690    #[test]
4691    fn format_json_includes_resource_evidence() {
4692        let entry = ContextEntry {
4693            id: MemoryId::new(),
4694            content: "fact Evidence: source architecture.pdf [document, preview, preview, artifacts=preview].".to_string(),
4695            token_cost: 0,
4696            resource_evidence: vec![make_resource_evidence_summary()],
4697            resource_preview_packages: vec![ResourcePreviewPackage {
4698                resource_id: ResourceId::new(),
4699                role: hirn_core::EvidenceRole::Source,
4700                display_name: Some("architecture.pdf".into()),
4701                modality: Some(ModalityProfile::Document),
4702                artifact_kind: DerivedArtifactKind::Preview,
4703                artifact_modality: ModalityProfile::Text,
4704                text_content: "deployment preview".into(),
4705                truncated: false,
4706            }],
4707            resource_score_attribution: Vec::new(),
4708        };
4709
4710        let output = format_json(&[], &[], &[entry], &[], &[], &[], &[]);
4711        let parsed: serde_json::Value = serde_json::from_str(&output).unwrap();
4712
4713        assert_eq!(
4714            parsed["semantic"][0]["resource_evidence"][0]["display_name"],
4715            "architecture.pdf"
4716        );
4717        assert_eq!(
4718            parsed["semantic"][0]["resource_evidence"][0]["role"],
4719            "source"
4720        );
4721        assert_eq!(
4722            parsed["semantic"][0]["resource_evidence"][0]["has_preview"],
4723            true
4724        );
4725        assert_eq!(
4726            parsed["semantic"][0]["resource_hydration_available"]["preview"][0]["display_name"],
4727            "architecture.pdf"
4728        );
4729        assert!(
4730            parsed["semantic"][0]["resource_hydration_available"]["full"]
4731                .as_array()
4732                .unwrap()
4733                .is_empty()
4734        );
4735        assert_eq!(
4736            parsed["semantic"][0]["resource_preview_packages"][0]["text_content"],
4737            "deployment preview"
4738        );
4739        assert_eq!(
4740            parsed["semantic"][0]["resource_preview_packages"][0]["artifact_kind"],
4741            "preview"
4742        );
4743    }
4744
4745    #[test]
4746    fn format_narrative_flowing() {
4747        let wm = vec![make_context_entry("current task")];
4748        let sem = vec![make_context_entry("known fact about caching")];
4749        let ep = vec![make_context_entry("observed deployment")];
4750
4751        let output = format_narrative(&wm, &[], &sem, &ep, &[], &[], &[]);
4752
4753        assert!(output.contains("Currently in focus:"));
4754        assert!(output.contains("Known facts:"));
4755        assert!(output.contains("From recent experience:"));
4756    }
4757
4758    #[test]
4759    fn fit_context_to_budget_keeps_json_valid() {
4760        let tok = test_tokenizer();
4761        let mut sections = ContextSections {
4762            semantic: vec![ContextEntry {
4763                id: MemoryId::new(),
4764                content: "fact ".repeat(64),
4765                token_cost: 0,
4766                resource_evidence: vec![make_resource_evidence_summary()],
4767                resource_preview_packages: Vec::new(),
4768                resource_score_attribution: Vec::new(),
4769            }],
4770            episodic: vec![make_context_entry(&"event ".repeat(64))],
4771            ..Default::default()
4772        };
4773
4774        let output = fit_context_to_budget(ContextFormat::Json, &mut sections, 64, tok.as_ref());
4775        let parsed: serde_json::Value = serde_json::from_str(&output).unwrap();
4776
4777        assert!(parsed.is_object());
4778        assert!(tok.count_tokens(&output) <= 64);
4779    }
4780
4781    #[test]
4782    fn fit_context_to_budget_avoids_linear_full_rerenders() {
4783        let tok = std::sync::Arc::new(CountingTokenizer {
4784            count_calls: AtomicUsize::new(0),
4785        });
4786        let n = 16usize;
4787        let mut sections = ContextSections {
4788            semantic: (0..n)
4789                .map(|index| make_context_entry(&format!("fact {index} payload")))
4790                .collect(),
4791            ..Default::default()
4792        };
4793
4794        let output =
4795            fit_context_to_budget(ContextFormat::Structured, &mut sections, 10, tok.as_ref());
4796
4797        assert!(output.split_whitespace().count() <= 10);
4798        assert!(sections.semantic.len() < n);
4799        // New algorithm: O(N) individual entry tokenisations plus O(1) full-context
4800        // calls, instead of the previous O(log N) full-context binary search.
4801        // Upper bound: N (compute_formatted_entry_costs) + a small constant for the
4802        // initial and final full-context calls.
4803        assert!(tok.count_calls.load(Ordering::SeqCst) <= n + 10);
4804    }
4805
4806    #[tokio::test(flavor = "multi_thread")]
4807    async fn assemble_think_context_json_preserves_preview_packages() {
4808        let (db, _dir) = temp_db().await;
4809        let actor_id = AgentId::new("test").unwrap();
4810        let mut candidate = make_semantic("architecture", "deployment architecture", 0.9);
4811        candidate.resource_evidence = vec![make_resource_evidence_summary()];
4812        candidate
4813            .resource_preview_packages
4814            .push(ResourcePreviewPackage {
4815                resource_id: ResourceId::new(),
4816                role: hirn_core::EvidenceRole::Source,
4817                display_name: Some("architecture.pdf".into()),
4818                modality: Some(ModalityProfile::Document),
4819                artifact_kind: DerivedArtifactKind::Preview,
4820                artifact_modality: ModalityProfile::Text,
4821                text_content: "deployment preview".into(),
4822                truncated: false,
4823            });
4824        let config = ContextConfig {
4825            token_budget: 1024,
4826            output_format: ContextFormat::Json,
4827            features: ContextFeatures::empty()
4828                .with_resource_previews(true)
4829                .with_graph_context(false)
4830                .with_causal_chains(false)
4831                .with_surface_contradictions(false),
4832            ..Default::default()
4833        };
4834
4835        let result =
4836            assemble_think_context(&db, &actor_id, &[candidate], &config, None, None, None)
4837                .await
4838                .unwrap();
4839        let parsed: serde_json::Value = serde_json::from_str(&result.context).unwrap();
4840
4841        assert!(result.token_count <= config.token_budget);
4842        assert_eq!(result.records_included.len(), 1);
4843        assert!(
4844            parsed["semantic"][0]["content"]
4845                .as_str()
4846                .unwrap()
4847                .contains("deployment architecture")
4848        );
4849        assert_eq!(
4850            parsed["semantic"][0]["resource_preview_packages"][0]["text_content"],
4851            "deployment preview"
4852        );
4853    }
4854
4855    #[tokio::test(flavor = "multi_thread")]
4856    async fn assemble_think_context_surfaces_conflicts_after_preselection() {
4857        let (db, _dir) = temp_db().await;
4858        let actor_id = AgentId::new("test").unwrap();
4859
4860        let older_id = db
4861            .store_semantic(
4862                SemanticRecord::builder()
4863                    .concept("database-postgres")
4864                    .knowledge_type(KnowledgeType::Propositional)
4865                    .description("service uses postgres")
4866                    .confidence(0.92)
4867                    .agent_id(actor_id)
4868                    .build()
4869                    .unwrap(),
4870            )
4871            .await
4872            .unwrap();
4873        let newer_id = db
4874            .store_semantic(
4875                SemanticRecord::builder()
4876                    .concept("database-mysql")
4877                    .knowledge_type(KnowledgeType::Propositional)
4878                    .description("service uses mysql")
4879                    .confidence(0.91)
4880                    .contradiction(older_id)
4881                    .agent_id(actor_id)
4882                    .build()
4883                    .unwrap(),
4884            )
4885            .await
4886            .unwrap();
4887        let filler_id = db
4888            .store_semantic(
4889                SemanticRecord::builder()
4890                    .concept("cache")
4891                    .knowledge_type(KnowledgeType::Propositional)
4892                    .description("service uses redis")
4893                    .confidence(0.20)
4894                    .agent_id(actor_id)
4895                    .build()
4896                    .unwrap(),
4897            )
4898            .await
4899            .unwrap();
4900
4901        let candidates = vec![
4902            scored_record(db.get_memory(older_id).await.unwrap(), 0.92),
4903            scored_record(db.get_memory(newer_id).await.unwrap(), 0.91),
4904            scored_record(db.get_memory(filler_id).await.unwrap(), 0.20),
4905        ];
4906        let config = ContextConfig {
4907            token_budget: 256,
4908            features: ContextFeatures::empty()
4909                .with_surface_contradictions(true)
4910                .with_graph_context(false)
4911                .with_causal_chains(false)
4912                .with_resource_previews(false),
4913            ..Default::default()
4914        };
4915
4916        let result = assemble_think_context(&db, &actor_id, &candidates, &config, None, None, None)
4917            .await
4918            .unwrap();
4919
4920        assert_eq!(result.conflict_groups.len(), 1);
4921        let member_contents = result.conflict_groups[0]
4922            .members
4923            .iter()
4924            .map(|member| member.content.as_str())
4925            .collect::<Vec<_>>();
4926        assert!(
4927            member_contents
4928                .iter()
4929                .any(|content| content.contains("postgres"))
4930        );
4931        assert!(
4932            member_contents
4933                .iter()
4934                .any(|content| content.contains("mysql"))
4935        );
4936    }
4937
4938    #[tokio::test(flavor = "multi_thread")]
4939    async fn assemble_think_context_graph_context_ignores_excluded_tail_candidates() {
4940        let (db, _dir) = temp_db().await;
4941        let tokenizer = test_tokenizer();
4942        let source_id = db
4943            .remember(
4944                EpisodicRecord::builder()
4945                    .event_type(EventType::Observation)
4946                    .content("included seed")
4947                    .summary("included seed")
4948                    .embedding(vec![1.0, 0.0, 0.0, 0.0])
4949                    .importance(0.9)
4950                    .agent_id(AgentId::new("test").unwrap())
4951                    .build()
4952                    .unwrap(),
4953            )
4954            .await
4955            .unwrap();
4956        let excluded_seed_id = db
4957            .remember(
4958                EpisodicRecord::builder()
4959                    .event_type(EventType::Observation)
4960                    .content("excluded tail seed")
4961                    .summary("excluded tail seed")
4962                    .embedding(vec![0.0, 1.0, 0.0, 0.0])
4963                    .importance(0.1)
4964                    .agent_id(AgentId::new("test").unwrap())
4965                    .build()
4966                    .unwrap(),
4967            )
4968            .await
4969            .unwrap();
4970        let hidden_neighbor_id = db
4971            .remember(
4972                EpisodicRecord::builder()
4973                    .event_type(EventType::Observation)
4974                    .content("hidden graph neighbor")
4975                    .summary("hidden graph neighbor")
4976                    .embedding(vec![0.0, 0.0, 1.0, 0.0])
4977                    .importance(0.8)
4978                    .agent_id(AgentId::new("test").unwrap())
4979                    .build()
4980                    .unwrap(),
4981            )
4982            .await
4983            .unwrap();
4984
4985        {
4986            let mut hot_graph = db.cached_graph().hot_graph_mut();
4987            hot_graph
4988                .add_edge(
4989                    excluded_seed_id,
4990                    hidden_neighbor_id,
4991                    EdgeRelation::RelatedTo,
4992                    0.7,
4993                    Metadata::new(),
4994                )
4995                .unwrap();
4996        }
4997
4998        let config = ContextConfig {
4999            token_budget: 256,
5000            max_episodic_entries: 1,
5001            features: ContextFeatures::empty()
5002                .with_graph_context(true)
5003                .with_causal_chains(true)
5004                .with_surface_contradictions(false)
5005                .with_resource_previews(false),
5006            ..Default::default()
5007        };
5008        let candidates = vec![
5009            scored_record(db.get_memory(source_id).await.unwrap(), 0.9),
5010            scored_record(db.get_memory(excluded_seed_id).await.unwrap(), 0.1),
5011        ];
5012
5013        let result = assemble_think_context(
5014            &db,
5015            &AgentId::new("test").unwrap(),
5016            &candidates,
5017            &config,
5018            None,
5019            None,
5020            None,
5021        )
5022        .await
5023        .unwrap();
5024
5025        assert_eq!(result.records_included, vec![source_id]);
5026        assert!(!result.context.contains("hidden graph neighbor"));
5027
5028        let direct_ids = collect_direct_section_ids(&[make_context_entry("included")], &[], &[]);
5029        assert_eq!(direct_ids.len(), 1);
5030        assert!(tokenizer.count_tokens(&result.context) <= config.token_budget);
5031    }
5032
5033    #[test]
5034    fn build_layer_section_includes_resource_evidence_summary() {
5035        let tok = test_tokenizer();
5036        let candidate = Candidate {
5037            id: MemoryId::new(),
5038            layer: Layer::Semantic,
5039            full_content: "deployment architecture".into(),
5040            summary: "architecture summary".into(),
5041            score: 0.9,
5042            trust_score: 1.0,
5043            token_count_full: 5,
5044            token_count_summary: 2,
5045            tokens_full: 0,
5046            tokens_summary: 0,
5047            tokens_entity: 0,
5048            is_contradiction: false,
5049            entities: vec![],
5050            resource_evidence: vec![make_resource_evidence_summary()],
5051            resource_preview_packages: Vec::new(),
5052            resource_score_attribution: Vec::new(),
5053        };
5054
5055        let (entries, _used) =
5056            build_layer_section(&[candidate], Layer::Semantic, 256, 0.4, None, tok.as_ref());
5057
5058        assert_eq!(entries.len(), 1);
5059        assert!(entries[0].content.contains("Evidence:"));
5060        assert!(entries[0].content.contains("architecture.pdf"));
5061        assert_eq!(entries[0].resource_evidence.len(), 1);
5062    }
5063
5064    #[test]
5065    fn build_layer_section_stops_rendering_after_budget_exhausted() {
5066        let tok = std::sync::Arc::new(CountingTokenizer {
5067            count_calls: AtomicUsize::new(0),
5068        });
5069        let candidates = (0..3)
5070            .map(|index| Candidate {
5071                id: MemoryId::new(),
5072                layer: Layer::Semantic,
5073                full_content: format!("full {index} uses five tokens"),
5074                summary: format!("summary {index}"),
5075                score: 0.9 - (index as f32 * 0.1),
5076                trust_score: 1.0,
5077                token_count_full: 5,
5078                token_count_summary: 2,
5079                tokens_full: 0,
5080                tokens_summary: 0,
5081                tokens_entity: 0,
5082                is_contradiction: false,
5083                entities: vec![format!("entity-{index}")],
5084                resource_evidence: Vec::new(),
5085                resource_preview_packages: Vec::new(),
5086                resource_score_attribution: Vec::new(),
5087            })
5088            .collect::<Vec<_>>();
5089
5090        let (entries, _used) =
5091            build_layer_section(&candidates, Layer::Semantic, 5, 0.4, None, tok.as_ref());
5092
5093        assert_eq!(entries.len(), 1);
5094        assert_eq!(tok.count_calls.load(Ordering::SeqCst), 4);
5095    }
5096
5097    #[test]
5098    fn build_layer_section_stops_after_max_entries() {
5099        let tok = std::sync::Arc::new(CountingTokenizer {
5100            count_calls: AtomicUsize::new(0),
5101        });
5102        let candidates = (0..4)
5103            .map(|index| Candidate {
5104                id: MemoryId::new(),
5105                layer: Layer::Episodic,
5106                full_content: format!("full {index} uses five tokens"),
5107                summary: format!("summary {index}"),
5108                score: 0.9 - (index as f32 * 0.1),
5109                trust_score: 1.0,
5110                token_count_full: 5,
5111                token_count_summary: 2,
5112                tokens_full: 0,
5113                tokens_summary: 0,
5114                tokens_entity: 0,
5115                is_contradiction: false,
5116                entities: vec![format!("entity-{index}")],
5117                resource_evidence: Vec::new(),
5118                resource_preview_packages: Vec::new(),
5119                resource_score_attribution: Vec::new(),
5120            })
5121            .collect::<Vec<_>>();
5122
5123        let (entries, _used) = build_layer_section(
5124            &candidates,
5125            Layer::Episodic,
5126            100,
5127            0.4,
5128            Some(2),
5129            tok.as_ref(),
5130        );
5131
5132        assert_eq!(entries.len(), 2);
5133        assert_eq!(tok.count_calls.load(Ordering::SeqCst), 2);
5134    }
5135
5136    #[test]
5137    fn build_conflict_groups_tracks_omitted_visible_members() {
5138        let visible_a = MemoryId::new();
5139        let visible_b = MemoryId::new();
5140        let hidden = MemoryId::new();
5141
5142        let visible_members = BTreeMap::from([
5143            (
5144                visible_a,
5145                make_conflict_member(visible_a, ConflictMemberStatus::Active),
5146            ),
5147            (
5148                visible_b,
5149                make_conflict_member(visible_b, ConflictMemberStatus::Active),
5150            ),
5151        ]);
5152        let adjacency = BTreeMap::from([
5153            (visible_a, vec![hidden]),
5154            (visible_b, vec![hidden]),
5155            (hidden, vec![visible_a, visible_b]),
5156        ]);
5157        let pair_edges = vec![
5158            ConflictEdgeMeta {
5159                a: normalize_conflict_pair(visible_a, hidden).0,
5160                b: normalize_conflict_pair(visible_a, hidden).1,
5161                confidence: 0.9,
5162                evidence_count: 1,
5163                resolved: false,
5164            },
5165            ConflictEdgeMeta {
5166                a: normalize_conflict_pair(visible_b, hidden).0,
5167                b: normalize_conflict_pair(visible_b, hidden).1,
5168                confidence: 0.8,
5169                evidence_count: 2,
5170                resolved: false,
5171            },
5172        ];
5173
5174        let groups = build_conflict_groups(
5175            &visible_members,
5176            &adjacency,
5177            &pair_edges,
5178            &HashSet::new(),
5179            default_conflict_policy(),
5180        );
5181
5182        assert_eq!(groups.len(), 1);
5183        assert_eq!(groups[0].members.len(), 2);
5184        assert_eq!(groups[0].omitted_member_count, 1);
5185        assert_eq!(groups[0].pair_count, 2);
5186        assert_eq!(
5187            groups[0].arbitration_status,
5188            ConflictArbitrationStatus::Unresolved
5189        );
5190        assert!(groups[0].authoritative_memory_id.is_none());
5191        assert!(groups[0].preferred_memory_id.is_none());
5192    }
5193
5194    #[test]
5195    fn derive_conflict_arbitration_status_marks_superseded_components() {
5196        let active = make_conflict_member(MemoryId::new(), ConflictMemberStatus::Active);
5197        let superseded = make_conflict_member(MemoryId::new(), ConflictMemberStatus::Superseded);
5198        let edges = [ConflictEdgeMeta {
5199            a: normalize_conflict_pair(active.memory_id, superseded.memory_id).0,
5200            b: normalize_conflict_pair(active.memory_id, superseded.memory_id).1,
5201            confidence: 0.95,
5202            evidence_count: 1,
5203            resolved: false,
5204        }];
5205
5206        let status = derive_conflict_arbitration_status(&[active, superseded], &[&edges[0]], 0);
5207
5208        assert_eq!(status, ConflictArbitrationStatus::Superseded);
5209    }
5210
5211    #[test]
5212    fn derive_conflict_arbitration_status_marks_retracted_components_resolved() {
5213        let active = make_conflict_member(MemoryId::new(), ConflictMemberStatus::Active);
5214        let retracted = make_conflict_member(MemoryId::new(), ConflictMemberStatus::Retracted);
5215
5216        let status = derive_conflict_arbitration_status(&[active, retracted], &[], 0);
5217
5218        assert_eq!(status, ConflictArbitrationStatus::Resolved);
5219    }
5220
5221    #[test]
5222    fn authoritative_conflict_memory_id_requires_single_visible_active_member() {
5223        let active = make_conflict_member(MemoryId::new(), ConflictMemberStatus::Active);
5224        let expected = active.memory_id;
5225        let superseded = make_conflict_member(MemoryId::new(), ConflictMemberStatus::Superseded);
5226
5227        let selected = authoritative_conflict_memory_id(&[active, superseded], 0);
5228
5229        assert_eq!(selected, Some(expected));
5230    }
5231
5232    #[test]
5233    fn select_conflict_preferred_memory_id_prefers_reliable_supported_claims() {
5234        let mut less_reliable = make_conflict_member(MemoryId::new(), ConflictMemberStatus::Active);
5235        less_reliable.source_reliability = 0.4;
5236
5237        let mut more_reliable = make_conflict_member(MemoryId::new(), ConflictMemberStatus::Active);
5238        more_reliable.source_reliability = 0.9;
5239        let expected = more_reliable.memory_id;
5240
5241        let supporting = make_conflict_member(MemoryId::new(), ConflictMemberStatus::Superseded);
5242
5243        let edges = [
5244            ConflictEdgeMeta {
5245                a: normalize_conflict_pair(less_reliable.memory_id, supporting.memory_id).0,
5246                b: normalize_conflict_pair(less_reliable.memory_id, supporting.memory_id).1,
5247                confidence: 0.6,
5248                evidence_count: 1,
5249                resolved: false,
5250            },
5251            ConflictEdgeMeta {
5252                a: normalize_conflict_pair(more_reliable.memory_id, supporting.memory_id).0,
5253                b: normalize_conflict_pair(more_reliable.memory_id, supporting.memory_id).1,
5254                confidence: 0.95,
5255                evidence_count: 3,
5256                resolved: false,
5257            },
5258        ];
5259
5260        let selected = select_conflict_preferred_memory_id(
5261            &[less_reliable, more_reliable, supporting],
5262            &[&edges[0], &edges[1]],
5263            0,
5264            &HashSet::new(),
5265            default_conflict_policy(),
5266        );
5267
5268        assert_eq!(selected, Some(expected));
5269    }
5270
5271    #[test]
5272    fn select_conflict_preferred_memory_id_refuses_partial_visibility() {
5273        let active = make_conflict_member(MemoryId::new(), ConflictMemberStatus::Active);
5274
5275        let selected = select_conflict_preferred_memory_id(
5276            &[active],
5277            &[],
5278            1,
5279            &HashSet::new(),
5280            default_conflict_policy(),
5281        );
5282
5283        assert_eq!(selected, None);
5284    }
5285
5286    #[test]
5287    fn select_conflict_preferred_memory_id_can_prioritize_recency() {
5288        let older_id = MemoryId::parse("01ARZ3NDEKTSV4RRFFQ69G5FAV").unwrap();
5289        let newer_id = MemoryId::parse("01ARZ3NDEKTSV4RRFFQ69G5FB0").unwrap();
5290
5291        let mut older = make_conflict_member(older_id, ConflictMemberStatus::Active);
5292        older.source_reliability = 0.95;
5293        older.revision_id = Some(RevisionId::from_memory_id(older_id));
5294
5295        let mut newer = make_conflict_member(newer_id, ConflictMemberStatus::Active);
5296        newer.source_reliability = 0.55;
5297        newer.revision_id = Some(RevisionId::from_memory_id(newer_id));
5298
5299        let policy = ConflictResolutionPolicy {
5300            recency_weight: 0.80,
5301            source_reliability_weight: 0.10,
5302            supporting_evidence_weight: 0.10,
5303            human_override_weight: 0.0,
5304            prefer_human_override: true,
5305        };
5306
5307        let selected =
5308            select_conflict_preferred_memory_id(&[older, newer], &[], 0, &HashSet::new(), policy);
5309
5310        assert_eq!(selected, Some(newer_id));
5311    }
5312
5313    #[test]
5314    fn select_conflict_preferred_memory_id_can_prioritize_reliability_over_recency() {
5315        let older_id = MemoryId::parse("01ARZ3NDEKTSV4RRFFQ69G5FB1").unwrap();
5316        let newer_id = MemoryId::parse("01ARZ3NDEKTSV4RRFFQ69G5FB2").unwrap();
5317
5318        let mut older = make_conflict_member(older_id, ConflictMemberStatus::Active);
5319        older.source_reliability = 0.95;
5320        older.revision_id = Some(RevisionId::from_memory_id(older_id));
5321
5322        let mut newer = make_conflict_member(newer_id, ConflictMemberStatus::Active);
5323        newer.source_reliability = 0.35;
5324        newer.revision_id = Some(RevisionId::from_memory_id(newer_id));
5325
5326        let policy = ConflictResolutionPolicy {
5327            recency_weight: 0.10,
5328            source_reliability_weight: 0.80,
5329            supporting_evidence_weight: 0.10,
5330            human_override_weight: 0.0,
5331            prefer_human_override: true,
5332        };
5333
5334        let selected =
5335            select_conflict_preferred_memory_id(&[older, newer], &[], 0, &HashSet::new(), policy);
5336
5337        assert_eq!(selected, Some(older_id));
5338    }
5339
5340    #[test]
5341    fn select_conflict_preferred_memory_id_is_stable_across_input_order() {
5342        let older_id = MemoryId::parse("01ARZ3NDEKTSV4RRFFQ69G5FB3").unwrap();
5343        let newer_id = MemoryId::parse("01ARZ3NDEKTSV4RRFFQ69G5FB4").unwrap();
5344
5345        let mut older = make_conflict_member(older_id, ConflictMemberStatus::Active);
5346        older.source_reliability = 0.95;
5347        older.revision_id = Some(RevisionId::from_memory_id(older_id));
5348
5349        let mut newer = make_conflict_member(newer_id, ConflictMemberStatus::Active);
5350        newer.source_reliability = 0.55;
5351        newer.revision_id = Some(RevisionId::from_memory_id(newer_id));
5352
5353        let policy = ConflictResolutionPolicy {
5354            recency_weight: 0.15,
5355            source_reliability_weight: 0.75,
5356            supporting_evidence_weight: 0.10,
5357            human_override_weight: 0.0,
5358            prefer_human_override: true,
5359        };
5360
5361        let forward = select_conflict_preferred_memory_id(
5362            &[older.clone(), newer.clone()],
5363            &[],
5364            0,
5365            &HashSet::new(),
5366            policy,
5367        );
5368        let reverse =
5369            select_conflict_preferred_memory_id(&[newer, older], &[], 0, &HashSet::new(), policy);
5370
5371        assert_eq!(forward, Some(older_id));
5372        assert_eq!(reverse, Some(older_id));
5373    }
5374
5375    #[test]
5376    fn select_conflict_preferred_memory_id_prefers_explicit_override() {
5377        let regular_id = MemoryId::new();
5378        let override_id = MemoryId::new();
5379
5380        let mut regular = make_conflict_member(regular_id, ConflictMemberStatus::Active);
5381        regular.source_reliability = 0.95;
5382
5383        let mut overridden = make_conflict_member(override_id, ConflictMemberStatus::Active);
5384        overridden.source_reliability = 0.30;
5385
5386        let override_members = HashSet::from([override_id]);
5387        let selected = select_conflict_preferred_memory_id(
5388            &[regular, overridden],
5389            &[],
5390            0,
5391            &override_members,
5392            default_conflict_policy(),
5393        );
5394
5395        assert_eq!(selected, Some(override_id));
5396    }
5397
5398    #[tokio::test(flavor = "multi_thread")]
5399    async fn context_graph_helpers_use_authoritative_cached_graph_edges() {
5400        let (db, _dir) = temp_db().await;
5401        let _tokenizer = test_tokenizer();
5402        // Both records must share the same namespace — the hot-graph enforces
5403        // that edges are intra-namespace only.
5404        let ns = Namespace::new("graph_helper_ns").unwrap();
5405
5406        let source_id = db
5407            .remember(
5408                EpisodicRecord::builder()
5409                    .event_type(EventType::Observation)
5410                    .content("source event")
5411                    .summary("source event")
5412                    .embedding(vec![1.0, 0.0, 0.0, 0.0])
5413                    .importance(0.9)
5414                    .namespace(ns)
5415                    .agent_id(AgentId::new("test").unwrap())
5416                    .build()
5417                    .unwrap(),
5418            )
5419            .await
5420            .unwrap();
5421        let target_id = db
5422            .remember(
5423                EpisodicRecord::builder()
5424                    .event_type(EventType::Observation)
5425                    .content("hot only neighbor")
5426                    .summary("hot only neighbor")
5427                    .embedding(vec![0.0, 1.0, 0.0, 0.0])
5428                    .importance(0.8)
5429                    .namespace(ns)
5430                    .agent_id(AgentId::new("test").unwrap())
5431                    .build()
5432                    .unwrap(),
5433            )
5434            .await
5435            .unwrap();
5436
5437        {
5438            let mut hot_graph = db.cached_graph().hot_graph_mut();
5439            hot_graph
5440                .add_edge(
5441                    source_id,
5442                    target_id,
5443                    EdgeRelation::Causes,
5444                    0.8,
5445                    Metadata::new(),
5446                )
5447                .unwrap();
5448        }
5449
5450        let source_record = scored_record(
5451            MemoryRecord::Episodic(db.get_episode(source_id).await.unwrap()),
5452            0.9,
5453        );
5454        let target_record = scored_record(
5455            MemoryRecord::Episodic(db.get_episode(target_id).await.unwrap()),
5456            0.8,
5457        );
5458
5459        let narrative =
5460            format_as_narrative(&db, &[source_record.clone(), target_record.clone()]).await;
5461        assert!(narrative.contains("As a result of this"));
5462
5463        let causal_chain = format_as_causal_chain(
5464            &db,
5465            &[source_record.clone(), target_record.clone()],
5466            Some(1),
5467        )
5468        .await;
5469        assert!(causal_chain.contains("source event"));
5470        assert!(causal_chain.contains("hot only neighbor"));
5471        assert!(causal_chain.contains("[caused, w=0.80]"));
5472
5473        let graph_json = format_as_graph(&db, &[source_record, target_record]).await;
5474        let parsed: serde_json::Value = serde_json::from_str(&graph_json).unwrap();
5475        let edges = parsed["edges"].as_array().unwrap();
5476        assert!(edges.iter().any(|edge| {
5477            edge["source"] == source_id.to_string()
5478                && edge["target"] == target_id.to_string()
5479                && edge["relation"] == "Causes"
5480        }));
5481    }
5482
5483    #[test]
5484    fn build_semantic_conflict_groups_surfaces_preferred_and_authoritative_members() {
5485        let mut older = SemanticRecord::builder()
5486            .concept("policy")
5487            .knowledge_type(KnowledgeType::Propositional)
5488            .description("Policy remains disabled")
5489            .confidence(0.55)
5490            .agent_id(AgentId::new("test").unwrap())
5491            .build()
5492            .unwrap();
5493        let mut newer = SemanticRecord::builder()
5494            .concept("policy")
5495            .knowledge_type(KnowledgeType::Propositional)
5496            .description("Policy is enabled")
5497            .confidence(0.55)
5498            .agent_id(AgentId::new("test").unwrap())
5499            .build()
5500            .unwrap();
5501
5502        older.valid_from = Timestamp::from_millis(10);
5503        newer.valid_from = Timestamp::from_millis(20);
5504        older.revision_id = RevisionId::from_memory_id(older.id);
5505        newer.revision_id = RevisionId::from_memory_id(newer.id);
5506        older.contradiction_ids.push(newer.id);
5507        newer.contradiction_ids.push(older.id);
5508
5509        let groups = build_semantic_conflict_groups(
5510            &[older.clone(), newer.clone()],
5511            default_conflict_policy(),
5512        );
5513        assert_eq!(groups.len(), 1);
5514        assert_eq!(groups[0].preferred_memory_id, Some(newer.id));
5515        assert_eq!(groups[0].authoritative_memory_id, None);
5516        assert_eq!(
5517            groups[0].arbitration_status,
5518            ConflictArbitrationStatus::Unresolved
5519        );
5520
5521        older.superseded_by = Some(MemoryId::new());
5522        let groups =
5523            build_semantic_conflict_groups(&[older, newer.clone()], default_conflict_policy());
5524        assert_eq!(groups.len(), 1);
5525        assert_eq!(groups[0].authoritative_memory_id, Some(newer.id));
5526        assert_eq!(
5527            groups[0].arbitration_status,
5528            ConflictArbitrationStatus::Superseded
5529        );
5530    }
5531
5532    #[test]
5533    fn score_distribution_computed() {
5534        let candidates = vec![
5535            make_episodic("a", "s", 0.9),
5536            make_episodic("b", "s", 0.5),
5537            make_episodic("c", "s", 0.3),
5538        ];
5539        let ids: Vec<MemoryId> = candidates.iter().map(|c| c.record.id()).collect();
5540
5541        let dist = compute_score_distribution(&candidates, &ids);
5542
5543        assert!((dist.min - 0.3).abs() < 0.01);
5544        assert!((dist.max - 0.9).abs() < 0.01);
5545    }
5546}