Skip to main content

zeph_context/
assembler.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Stateless context assembler.
5//!
6//! [`ContextAssembler`] gathers all memory-sourced context for a single agent turn by running
7//! all async fetch operations concurrently. It takes only borrowed references via
8//! [`ContextAssemblyInput`] and returns a [`PreparedContext`] ready for injection.
9//!
10//! Invariants:
11//! - No `Agent` field mutations inside `gather()`.
12//! - No channel communication inside `gather()`.
13//! - All `send_status` calls remain in `Agent::prepare_context`.
14//! - `session_digest` is cached (not async) and stays in `Agent::apply_prepared_context`.
15
16use std::future::Future;
17use std::pin::Pin;
18
19use futures::StreamExt as _;
20use futures::stream::FuturesUnordered;
21
22use zeph_llm::provider::{Message, MessageMetadata, MessagePart, Role};
23use zeph_memory::TokenCounter;
24
25use crate::error::ContextError;
26use crate::input::ContextAssemblyInput;
27use crate::slot::ContextSlot;
28
29/// Prefix for past-session summary injections.
30pub const SUMMARY_PREFIX: &str = "[conversation summaries]\n";
31/// Prefix for cross-session context injections.
32pub const CROSS_SESSION_PREFIX: &str = "[cross-session context]\n";
33/// Prefix for semantic recall injections.
34pub const RECALL_PREFIX: &str = "[semantic recall]\n";
35/// Prefix for past-correction injections.
36pub const CORRECTIONS_PREFIX: &str = "[past corrections]\n";
37/// Prefix for document RAG injections.
38pub const DOCUMENT_RAG_PREFIX: &str = "## Relevant documents\n";
39/// Prefix for knowledge graph fact injections.
40pub const GRAPH_FACTS_PREFIX: &str = "[known facts]\n";
41
42/// Result of one context-assembly pass.
43///
44/// All source fields are `Option` — `None` means disabled, empty, or budget-exhausted.
45/// `session_digest` is excluded: it is a cached value injected by `Agent::apply_prepared_context`.
46pub struct PreparedContext {
47    /// Knowledge graph fact recall.
48    pub graph_facts: Option<Message>,
49    /// Document RAG context.
50    pub doc_rag: Option<Message>,
51    /// Past user corrections.
52    pub corrections: Option<Message>,
53    /// Semantic recall results.
54    pub recall: Option<Message>,
55    /// Top-1 similarity score from semantic recall.
56    pub recall_confidence: Option<f32>,
57    /// Cross-session memory context.
58    pub cross_session: Option<Message>,
59    /// Past-conversation summaries.
60    pub summaries: Option<Message>,
61    /// Code-index RAG context (repo map or file context).
62    pub code_context: Option<String>,
63    /// Persona memory facts.
64    pub persona_facts: Option<Message>,
65    /// Trajectory hints.
66    pub trajectory_hints: Option<Message>,
67    /// `TiMem` tree memory summary.
68    pub tree_memory: Option<Message>,
69    /// Distilled reasoning strategies from the `ReasoningBank` (#3343).
70    pub reasoning_hints: Option<Message>,
71    /// Whether the memory-first context strategy is active for this turn.
72    pub memory_first: bool,
73    /// Token budget for recent conversation history (passed to trim step in apply).
74    pub recent_history_budget: usize,
75}
76
77/// Stateless coordinator for parallel context fetching.
78///
79/// All logic is in [`ContextAssembler::gather`]. No state is stored on this type.
80pub struct ContextAssembler;
81
82impl ContextAssembler {
83    /// Gather all context sources concurrently and return a [`PreparedContext`].
84    ///
85    /// Returns an empty `PreparedContext` immediately when `context_manager.budget` is `None`.
86    ///
87    /// # Errors
88    ///
89    /// Propagates errors from any async fetch operation.
90    #[allow(clippy::too_many_lines)]
91    pub async fn gather(input: &ContextAssemblyInput<'_>) -> Result<PreparedContext, ContextError> {
92        type CtxFuture<'a> =
93            Pin<Box<dyn Future<Output = Result<ContextSlot, ContextError>> + Send + 'a>>;
94
95        let Some(ref budget) = input.context_manager.budget else {
96            return Ok(PreparedContext {
97                graph_facts: None,
98                doc_rag: None,
99                corrections: None,
100                recall: None,
101                recall_confidence: None,
102                cross_session: None,
103                summaries: None,
104                code_context: None,
105                persona_facts: None,
106                trajectory_hints: None,
107                tree_memory: None,
108                reasoning_hints: None,
109                memory_first: false,
110                recent_history_budget: 0,
111            });
112        };
113
114        let memory = input.memory;
115        let tc = input.token_counter;
116
117        let effective_strategy = match memory.context_strategy {
118            zeph_config::ContextStrategy::FullHistory => zeph_config::ContextStrategy::FullHistory,
119            zeph_config::ContextStrategy::MemoryFirst => zeph_config::ContextStrategy::MemoryFirst,
120            zeph_config::ContextStrategy::Adaptive => {
121                if input.sidequest_turn_counter >= u64::from(memory.crossover_turn_threshold) {
122                    zeph_config::ContextStrategy::MemoryFirst
123                } else {
124                    zeph_config::ContextStrategy::FullHistory
125                }
126            }
127        };
128        let memory_first = effective_strategy == zeph_config::ContextStrategy::MemoryFirst;
129
130        let system_prompt = input
131            .messages
132            .first()
133            .filter(|m| m.role == Role::System)
134            .map_or("", |m| m.content.as_str());
135
136        let digest_tokens = memory
137            .cached_session_digest
138            .as_ref()
139            .map_or(0, |(_, tokens)| *tokens);
140
141        let graph_enabled = memory.graph_config.enabled;
142
143        let alloc = budget.allocate_with_opts(
144            system_prompt,
145            input.skills_prompt,
146            tc,
147            graph_enabled,
148            digest_tokens,
149            memory_first,
150        );
151
152        let correction_params = input
153            .correction_config
154            .filter(|c| c.correction_detection)
155            .map(|c| {
156                (
157                    c.correction_recall_limit as usize,
158                    c.correction_min_similarity,
159                )
160            });
161        let (recall_limit, min_sim) = correction_params.unwrap_or((3, 0.75));
162
163        let router = input.context_manager.build_router();
164        let router_ref: &dyn zeph_memory::AsyncMemoryRouter = router.as_ref();
165        let query = input.query;
166        let scrub = input.scrub;
167
168        let mut fetchers: FuturesUnordered<CtxFuture<'_>> = FuturesUnordered::new();
169
170        tracing::debug!(
171            active_sources = alloc.active_sources(),
172            "context budget allocated"
173        );
174
175        if alloc.summaries > 0 {
176            fetchers.push(Box::pin(async {
177                fetch_summaries(memory, alloc.summaries, tc)
178                    .await
179                    .map(ContextSlot::Summaries)
180            }));
181        }
182        if alloc.cross_session > 0 {
183            fetchers.push(Box::pin(async {
184                fetch_cross_session(memory, query, alloc.cross_session, tc)
185                    .await
186                    .map(ContextSlot::CrossSession)
187            }));
188        }
189        if alloc.semantic_recall > 0 {
190            fetchers.push(Box::pin(async {
191                fetch_semantic_recall(memory, query, alloc.semantic_recall, tc, Some(router_ref))
192                    .await
193                    .map(|(msg, score)| ContextSlot::SemanticRecall(msg, score))
194            }));
195            fetchers.push(Box::pin(async {
196                fetch_document_rag(memory, query, alloc.semantic_recall, tc)
197                    .await
198                    .map(ContextSlot::DocumentRag)
199            }));
200        }
201        // Corrections are safety-critical and never budget-gated.
202        fetchers.push(Box::pin(async {
203            fetch_corrections(memory, query, recall_limit, min_sim, scrub)
204                .await
205                .map(ContextSlot::Corrections)
206        }));
207        if alloc.code_context > 0
208            && let Some(index) = input.index
209        {
210            let budget = alloc.code_context;
211            fetchers.push(Box::pin(async move {
212                let result: Result<Option<String>, ContextError> =
213                    index.fetch_code_rag(query, budget).await;
214                result.map(ContextSlot::CodeContext)
215            }));
216        }
217        if alloc.graph_facts > 0 {
218            fetchers.push(Box::pin(async {
219                fetch_graph_facts(memory, query, alloc.graph_facts, tc)
220                    .await
221                    .map(ContextSlot::GraphFacts)
222            }));
223        }
224        if memory.persona_config.context_budget_tokens > 0 {
225            fetchers.push(Box::pin(async {
226                let persona_budget = memory.persona_config.context_budget_tokens;
227                fetch_persona_facts(memory, persona_budget, tc)
228                    .await
229                    .map(ContextSlot::PersonaFacts)
230            }));
231        }
232        if memory.trajectory_config.context_budget_tokens > 0 {
233            fetchers.push(Box::pin(async {
234                let tbudget = memory.trajectory_config.context_budget_tokens;
235                fetch_trajectory_hints(memory, tbudget, tc)
236                    .await
237                    .map(ContextSlot::TrajectoryHints)
238            }));
239        }
240        if memory.tree_config.context_budget_tokens > 0 {
241            fetchers.push(Box::pin(async {
242                let tbudget = memory.tree_config.context_budget_tokens;
243                fetch_tree_memory(memory, tbudget, tc)
244                    .await
245                    .map(ContextSlot::TreeMemory)
246            }));
247        }
248        if memory.reasoning_config.enabled && memory.reasoning_config.context_budget_tokens > 0 {
249            fetchers.push(Box::pin(async {
250                let rbudget = memory.reasoning_config.context_budget_tokens;
251                let top_k = memory.reasoning_config.top_k;
252                fetch_reasoning_strategies(memory, query, rbudget, top_k, tc)
253                    .await
254                    .map(ContextSlot::ReasoningStrategies)
255            }));
256        }
257
258        let mut prepared = PreparedContext {
259            graph_facts: None,
260            doc_rag: None,
261            corrections: None,
262            recall: None,
263            recall_confidence: None,
264            cross_session: None,
265            summaries: None,
266            code_context: None,
267            persona_facts: None,
268            trajectory_hints: None,
269            tree_memory: None,
270            reasoning_hints: None,
271            memory_first,
272            recent_history_budget: alloc.recent_history,
273        };
274
275        while let Some(result) = fetchers.next().await {
276            match result {
277                Ok(slot) => match slot {
278                    ContextSlot::Summaries(msg) => prepared.summaries = msg,
279                    ContextSlot::CrossSession(msg) => prepared.cross_session = msg,
280                    ContextSlot::SemanticRecall(msg, score) => {
281                        prepared.recall = msg;
282                        prepared.recall_confidence = score;
283                    }
284                    ContextSlot::DocumentRag(msg) => prepared.doc_rag = msg,
285                    ContextSlot::Corrections(msg) => prepared.corrections = msg,
286                    ContextSlot::CodeContext(text) => prepared.code_context = text,
287                    ContextSlot::GraphFacts(msg) => prepared.graph_facts = msg,
288                    ContextSlot::PersonaFacts(msg) => prepared.persona_facts = msg,
289                    ContextSlot::TrajectoryHints(msg) => prepared.trajectory_hints = msg,
290                    ContextSlot::TreeMemory(msg) => prepared.tree_memory = msg,
291                    ContextSlot::ReasoningStrategies(msg) => prepared.reasoning_hints = msg,
292                },
293                Err(e) => return Err(e),
294            }
295        }
296
297        Ok(prepared)
298    }
299}
300
301/// Clamp recall timeout to a safe minimum.
302///
303/// A configured value of 0 would disable spreading activation recall entirely;
304/// clamping to 100ms preserves the user's intent while preventing a silent no-op.
305pub fn effective_recall_timeout_ms(configured: u64) -> u64 {
306    if configured == 0 {
307        tracing::warn!(
308            "recall_timeout_ms is 0, which would disable spreading activation recall; \
309             clamping to 100ms"
310        );
311        100
312    } else {
313        configured
314    }
315}
316
317use crate::input::ContextMemoryView;
318
319pub(crate) async fn fetch_graph_facts(
320    memory: &ContextMemoryView,
321    query: &str,
322    budget_tokens: usize,
323    tc: &TokenCounter,
324) -> Result<Option<Message>, ContextError> {
325    if budget_tokens == 0 || !memory.graph_config.enabled {
326        return Ok(None);
327    }
328    let Some(ref mem) = memory.memory else {
329        return Ok(None);
330    };
331    let recall_limit = memory.graph_config.recall_limit;
332    let temporal_decay_rate = memory.graph_config.temporal_decay_rate;
333    let edge_types = zeph_memory::classify_graph_subgraph(query);
334    let sa_config = &memory.graph_config.spreading_activation;
335
336    let mut body = String::from(GRAPH_FACTS_PREFIX);
337    let mut tokens_so_far = tc.count_tokens(&body);
338
339    if sa_config.enabled {
340        let sa_params = zeph_memory::graph::SpreadingActivationParams {
341            decay_lambda: sa_config.decay_lambda,
342            max_hops: sa_config.max_hops,
343            activation_threshold: sa_config.activation_threshold,
344            inhibition_threshold: sa_config.inhibition_threshold,
345            max_activated_nodes: sa_config.max_activated_nodes,
346            temporal_decay_rate,
347            seed_structural_weight: sa_config.seed_structural_weight,
348            seed_community_cap: sa_config.seed_community_cap,
349        };
350        let timeout_ms = effective_recall_timeout_ms(sa_config.recall_timeout_ms);
351        let recall_fut = mem.recall_graph_activated(query, recall_limit, sa_params, &edge_types);
352        let activated_facts =
353            match tokio::time::timeout(std::time::Duration::from_millis(timeout_ms), recall_fut)
354                .await
355            {
356                Ok(Ok(facts)) => facts,
357                Ok(Err(e)) => {
358                    tracing::warn!("spreading activation recall failed: {e:#}");
359                    Vec::new()
360                }
361                Err(_) => {
362                    tracing::warn!("spreading activation recall timed out ({timeout_ms}ms)");
363                    Vec::new()
364                }
365            };
366
367        if activated_facts.is_empty() {
368            return Ok(None);
369        }
370
371        for f in &activated_facts {
372            let fact_text = f.edge.fact.replace(['\n', '\r', '<', '>'], " ");
373            let line = format!(
374                "- {} (confidence: {:.2}, activation: {:.2})\n",
375                fact_text, f.edge.confidence, f.activation_score
376            );
377            let line_tokens = tc.count_tokens(&line);
378            if tokens_so_far + line_tokens > budget_tokens {
379                break;
380            }
381            body.push_str(&line);
382            tokens_so_far += line_tokens;
383        }
384    } else {
385        let max_hops = memory.graph_config.max_hops;
386        let facts = mem
387            .recall_graph(
388                query,
389                recall_limit,
390                max_hops,
391                None,
392                temporal_decay_rate,
393                &edge_types,
394            )
395            .await
396            .map_err(|e| {
397                tracing::warn!("graph recall failed: {e:#}");
398                ContextError::Memory(e)
399            })?;
400
401        if facts.is_empty() {
402            return Ok(None);
403        }
404
405        for f in &facts {
406            let fact_text = f.fact.replace(['\n', '\r', '<', '>'], " ");
407            let line = format!("- {} (confidence: {:.2})\n", fact_text, f.confidence);
408            let line_tokens = tc.count_tokens(&line);
409            if tokens_so_far + line_tokens > budget_tokens {
410                break;
411            }
412            body.push_str(&line);
413            tokens_so_far += line_tokens;
414        }
415    }
416
417    if body == GRAPH_FACTS_PREFIX {
418        return Ok(None);
419    }
420
421    Ok(Some(Message::from_legacy(Role::System, body)))
422}
423
424pub(crate) async fn fetch_persona_facts(
425    memory: &ContextMemoryView,
426    budget_tokens: usize,
427    tc: &TokenCounter,
428) -> Result<Option<Message>, ContextError> {
429    if budget_tokens == 0 || !memory.persona_config.enabled {
430        return Ok(None);
431    }
432    let Some(ref mem) = memory.memory else {
433        return Ok(None);
434    };
435
436    let min_confidence = memory.persona_config.min_confidence;
437    let facts = mem.sqlite().load_persona_facts(min_confidence).await?;
438
439    if facts.is_empty() {
440        return Ok(None);
441    }
442
443    let mut body = String::from(crate::slot::PERSONA_PREFIX);
444    let mut tokens_so_far = tc.count_tokens(&body);
445
446    for fact in &facts {
447        let line = format!("[{}] {}\n", fact.category, fact.content);
448        let line_tokens = tc.count_tokens(&line);
449        if tokens_so_far + line_tokens > budget_tokens {
450            break;
451        }
452        body.push_str(&line);
453        tokens_so_far += line_tokens;
454    }
455
456    if body == crate::slot::PERSONA_PREFIX {
457        return Ok(None);
458    }
459
460    Ok(Some(Message::from_legacy(Role::System, body)))
461}
462
463pub(crate) async fn fetch_trajectory_hints(
464    memory: &ContextMemoryView,
465    budget_tokens: usize,
466    tc: &TokenCounter,
467) -> Result<Option<Message>, ContextError> {
468    if budget_tokens == 0 || !memory.trajectory_config.enabled {
469        return Ok(None);
470    }
471    let Some(ref mem) = memory.memory else {
472        return Ok(None);
473    };
474
475    let top_k = memory.trajectory_config.recall_top_k;
476    let min_conf = memory.trajectory_config.min_confidence;
477    let entries = mem
478        .sqlite()
479        .load_trajectory_entries(Some("procedural"), top_k)
480        .await?;
481
482    if entries.is_empty() {
483        return Ok(None);
484    }
485
486    let mut body = String::from(crate::slot::TRAJECTORY_PREFIX);
487    let mut tokens_so_far = tc.count_tokens(&body);
488
489    for entry in entries
490        .iter()
491        .filter(|e| e.confidence >= min_conf)
492        .take(top_k)
493    {
494        let line = format!("- {}: {}\n", entry.intent, entry.outcome);
495        let line_tokens = tc.count_tokens(&line);
496        if tokens_so_far + line_tokens > budget_tokens {
497            break;
498        }
499        body.push_str(&line);
500        tokens_so_far += line_tokens;
501    }
502
503    if body == crate::slot::TRAJECTORY_PREFIX {
504        return Ok(None);
505    }
506
507    Ok(Some(Message::from_legacy(Role::System, body)))
508}
509
510pub(crate) async fn fetch_tree_memory(
511    memory: &ContextMemoryView,
512    budget_tokens: usize,
513    tc: &TokenCounter,
514) -> Result<Option<Message>, ContextError> {
515    if budget_tokens == 0 || !memory.tree_config.enabled {
516        return Ok(None);
517    }
518    let Some(ref mem) = memory.memory else {
519        return Ok(None);
520    };
521
522    let top_k = memory.tree_config.recall_top_k;
523    let nodes = mem.sqlite().load_tree_level(1, top_k).await?;
524
525    if nodes.is_empty() {
526        return Ok(None);
527    }
528
529    let mut body = String::from(crate::slot::TREE_MEMORY_PREFIX);
530    let mut tokens_so_far = tc.count_tokens(&body);
531
532    for node in nodes.iter().take(top_k) {
533        let line = format!("- {}\n", node.content);
534        let line_tokens = tc.count_tokens(&line);
535        if tokens_so_far + line_tokens > budget_tokens {
536            break;
537        }
538        body.push_str(&line);
539        tokens_so_far += line_tokens;
540    }
541
542    if body == crate::slot::TREE_MEMORY_PREFIX {
543        return Ok(None);
544    }
545
546    Ok(Some(Message::from_legacy(Role::System, body)))
547}
548
549pub(crate) async fn fetch_reasoning_strategies(
550    memory: &ContextMemoryView,
551    query: &str,
552    budget_tokens: usize,
553    top_k: usize,
554    tc: &TokenCounter,
555) -> Result<Option<Message>, ContextError> {
556    // S1: enforce the ≤500-token spec cap documented in ReasoningConfig.
557    let budget_tokens = budget_tokens.min(500);
558    if budget_tokens == 0 {
559        return Ok(None);
560    }
561    let Some(ref mem) = memory.memory else {
562        return Ok(None);
563    };
564
565    let strategies = mem
566        .retrieve_reasoning_strategies(query, top_k)
567        .await
568        .map_err(ContextError::Memory)?;
569
570    if strategies.is_empty() {
571        return Ok(None);
572    }
573
574    let mut body = String::from(crate::slot::REASONING_PREFIX);
575    let mut tokens_so_far = tc.count_tokens(&body);
576    let mut injected_ids: Vec<String> = Vec::new();
577
578    for s in strategies.iter().take(top_k) {
579        // S-Med1: sanitize distilled summaries to prevent stored injection payloads
580        // from reaching the system prompt (mirrors fetch_graph_facts scrub pattern).
581        let safe_summary = s.summary.replace(['\n', '\r', '<', '>'], " ");
582        let line = format!("- [{}] {}\n", s.outcome.as_str(), safe_summary);
583        let line_tokens = tc.count_tokens(&line);
584        if tokens_so_far + line_tokens > budget_tokens {
585            break;
586        }
587        body.push_str(&line);
588        tokens_so_far += line_tokens;
589        injected_ids.push(s.id.clone());
590    }
591
592    if body == crate::slot::REASONING_PREFIX {
593        return Ok(None);
594    }
595
596    // C4 split: mark_used only for strategies that made it past budget truncation.
597    // P2-1: fire-and-forget — mark_used does not need to block the context build path.
598    if let Some(ref reasoning) = mem.reasoning {
599        let reasoning = reasoning.clone();
600        tokio::spawn(async move {
601            if let Err(e) = reasoning.mark_used(&injected_ids).await {
602                tracing::warn!(error = %e, "reasoning: mark_used failed");
603            }
604        });
605    }
606
607    Ok(Some(Message::from_legacy(Role::System, body)))
608}
609
610pub(crate) async fn fetch_corrections(
611    memory: &ContextMemoryView,
612    query: &str,
613    limit: usize,
614    min_score: f32,
615    scrub: fn(&str) -> std::borrow::Cow<'_, str>,
616) -> Result<Option<Message>, ContextError> {
617    let Some(ref mem) = memory.memory else {
618        return Ok(None);
619    };
620    let corrections = mem
621        .retrieve_similar_corrections(query, limit, min_score)
622        .await
623        .unwrap_or_default();
624    if corrections.is_empty() {
625        return Ok(None);
626    }
627    let mut text = String::from(CORRECTIONS_PREFIX);
628    for c in &corrections {
629        text.push_str("- Past user correction: \"");
630        text.push_str(&scrub(&c.correction_text));
631        text.push_str("\"\n");
632    }
633    Ok(Some(Message::from_legacy(Role::System, text)))
634}
635
636pub(crate) async fn fetch_semantic_recall(
637    memory: &ContextMemoryView,
638    query: &str,
639    token_budget: usize,
640    tc: &TokenCounter,
641    router: Option<&dyn zeph_memory::AsyncMemoryRouter>,
642) -> Result<(Option<Message>, Option<f32>), ContextError> {
643    let Some(ref mem) = memory.memory else {
644        return Ok((None, None));
645    };
646    if memory.recall_limit == 0 || token_budget == 0 {
647        return Ok((None, None));
648    }
649
650    let recalled = if let Some(r) = router {
651        mem.recall_routed_async(query, memory.recall_limit, None, r)
652            .await?
653    } else {
654        mem.recall(query, memory.recall_limit, None).await?
655    };
656    if recalled.is_empty() {
657        return Ok((None, None));
658    }
659
660    let top_score = recalled.first().map(|r| r.score);
661
662    let mut recall_text = String::with_capacity(token_budget * 3);
663    recall_text.push_str(RECALL_PREFIX);
664    let mut tokens_used = tc.count_tokens(&recall_text);
665
666    for item in &recalled {
667        if item.message.content.starts_with("[skipped]")
668            || item.message.content.starts_with("[stopped]")
669        {
670            continue;
671        }
672        let role_label = match item.message.role {
673            Role::User => "user",
674            Role::Assistant => "assistant",
675            Role::System => "system",
676        };
677        let entry = format!("- [{}] {}\n", role_label, item.message.content);
678        let entry_tokens = tc.count_tokens(&entry);
679        if tokens_used + entry_tokens > token_budget {
680            break;
681        }
682        recall_text.push_str(&entry);
683        tokens_used += entry_tokens;
684    }
685
686    if tokens_used > tc.count_tokens(RECALL_PREFIX) {
687        Ok((
688            Some(Message::from_parts(
689                Role::System,
690                vec![MessagePart::Recall { text: recall_text }],
691            )),
692            top_score,
693        ))
694    } else {
695        Ok((None, None))
696    }
697}
698
699pub(crate) async fn fetch_document_rag(
700    memory: &ContextMemoryView,
701    query: &str,
702    token_budget: usize,
703    tc: &TokenCounter,
704) -> Result<Option<Message>, ContextError> {
705    if !memory.document_config.rag_enabled || token_budget == 0 {
706        return Ok(None);
707    }
708    let Some(ref mem) = memory.memory else {
709        return Ok(None);
710    };
711
712    let collection = &memory.document_config.collection;
713    let top_k = memory.document_config.top_k;
714    let points = mem
715        .search_document_collection(collection, query, top_k)
716        .await?;
717    if points.is_empty() {
718        return Ok(None);
719    }
720
721    let mut text = String::from(DOCUMENT_RAG_PREFIX);
722    let mut tokens_used = tc.count_tokens(&text);
723
724    for point in &points {
725        let chunk = point
726            .payload
727            .get("text")
728            .and_then(|v| v.as_str())
729            .unwrap_or_default();
730        if chunk.is_empty() {
731            continue;
732        }
733        let entry = format!("{chunk}\n");
734        let cost = tc.count_tokens(&entry);
735        if tokens_used + cost > token_budget {
736            break;
737        }
738        text.push_str(&entry);
739        tokens_used += cost;
740    }
741
742    if tokens_used > tc.count_tokens(DOCUMENT_RAG_PREFIX) {
743        Ok(Some(Message {
744            role: Role::System,
745            content: text,
746            parts: vec![],
747            metadata: MessageMetadata::default(),
748        }))
749    } else {
750        Ok(None)
751    }
752}
753
754pub(crate) async fn fetch_summaries(
755    memory: &ContextMemoryView,
756    token_budget: usize,
757    tc: &TokenCounter,
758) -> Result<Option<Message>, ContextError> {
759    let (Some(mem), Some(cid)) = (&memory.memory, memory.conversation_id) else {
760        return Ok(None);
761    };
762    if token_budget == 0 {
763        return Ok(None);
764    }
765
766    let summaries = mem.load_summaries(cid).await?;
767    if summaries.is_empty() {
768        return Ok(None);
769    }
770
771    let mut summary_text = String::from(SUMMARY_PREFIX);
772    let mut tokens_used = tc.count_tokens(&summary_text);
773
774    for summary in summaries.iter().rev() {
775        let first = summary.first_message_id.map_or(0, |m| m.0);
776        let last = summary.last_message_id.map_or(0, |m| m.0);
777        let entry = format!("- Messages {first}-{last}: {}\n", summary.content);
778        let cost = tc.count_tokens(&entry);
779        if tokens_used + cost > token_budget {
780            break;
781        }
782        summary_text.push_str(&entry);
783        tokens_used += cost;
784    }
785
786    if tokens_used > tc.count_tokens(SUMMARY_PREFIX) {
787        Ok(Some(Message::from_parts(
788            Role::System,
789            vec![MessagePart::Summary { text: summary_text }],
790        )))
791    } else {
792        Ok(None)
793    }
794}
795
796pub(crate) async fn fetch_cross_session(
797    memory: &ContextMemoryView,
798    query: &str,
799    token_budget: usize,
800    tc: &TokenCounter,
801) -> Result<Option<Message>, ContextError> {
802    let (Some(mem), Some(cid)) = (&memory.memory, memory.conversation_id) else {
803        return Ok(None);
804    };
805    if token_budget == 0 {
806        return Ok(None);
807    }
808
809    let threshold = memory.cross_session_score_threshold;
810    let results: Vec<_> = mem
811        .search_session_summaries(query, 5, Some(cid))
812        .await?
813        .into_iter()
814        .filter(|r| r.score >= threshold)
815        .collect();
816    if results.is_empty() {
817        return Ok(None);
818    }
819
820    let mut text = String::from(CROSS_SESSION_PREFIX);
821    let mut tokens_used = tc.count_tokens(&text);
822
823    for item in &results {
824        let entry = format!("- {}\n", item.summary_text);
825        let cost = tc.count_tokens(&entry);
826        if tokens_used + cost > token_budget {
827            break;
828        }
829        text.push_str(&entry);
830        tokens_used += cost;
831    }
832
833    if tokens_used > tc.count_tokens(CROSS_SESSION_PREFIX) {
834        Ok(Some(Message::from_parts(
835            Role::System,
836            vec![MessagePart::CrossSession { text }],
837        )))
838    } else {
839        Ok(None)
840    }
841}
842
843/// Maximum number of messages scanned backward by [`memory_first_keep_tail`] before
844/// stopping at the next non-`ToolResult` boundary, to avoid O(N) scans on long sessions.
845pub const MAX_KEEP_TAIL_SCAN: usize = 50;
846
847/// Compute how many tail messages to keep when the `MemoryFirst` strategy is active.
848///
849/// Always keeps at least 2 messages. Extends the tail as long as the boundary message is
850/// a `ToolResult` (user message with a `ToolResult` part) to avoid splitting a tool-call
851/// round-trip. Capped at `MAX_KEEP_TAIL_SCAN` to prevent O(N) scans on long sessions.
852///
853/// `history_start` is the index of the first non-system message (typically 1).
854#[must_use]
855pub fn memory_first_keep_tail(messages: &[Message], history_start: usize) -> usize {
856    use zeph_llm::provider::MessagePart;
857
858    let mut keep_tail = 2usize;
859    let len = messages.len();
860    let max = len.saturating_sub(history_start);
861
862    while keep_tail < max {
863        let first_retained = &messages[len - keep_tail];
864        let is_tool_result = first_retained.role == Role::User
865            && first_retained
866                .parts
867                .iter()
868                .any(|p| matches!(p, MessagePart::ToolResult { .. }));
869
870        if is_tool_result {
871            keep_tail += 1;
872        } else {
873            break;
874        }
875
876        if keep_tail >= MAX_KEEP_TAIL_SCAN {
877            let preceding_idx = len.saturating_sub(keep_tail + 1);
878            if preceding_idx >= history_start {
879                let preceding = &messages[preceding_idx];
880                let is_tool_use = preceding.role == Role::Assistant
881                    && preceding
882                        .parts
883                        .iter()
884                        .any(|p| matches!(p, MessagePart::ToolUse { .. }));
885                if is_tool_use {
886                    keep_tail += 1;
887                }
888            }
889            break;
890        }
891    }
892
893    keep_tail
894}
895
896#[cfg(test)]
897mod tests {
898    use super::*;
899    use crate::input::ContextMemoryView;
900    use zeph_config::{
901        ContextStrategy, DocumentConfig, GraphConfig, PersonaConfig, ReasoningConfig,
902        TrajectoryConfig, TreeConfig,
903    };
904    use zeph_memory::TokenCounter;
905
906    fn empty_view() -> ContextMemoryView {
907        ContextMemoryView {
908            memory: None,
909            conversation_id: None,
910            recall_limit: 10,
911            cross_session_score_threshold: 0.5,
912            context_strategy: ContextStrategy::default(),
913            crossover_turn_threshold: 5,
914            cached_session_digest: None,
915            graph_config: GraphConfig::default(),
916            document_config: DocumentConfig::default(),
917            persona_config: PersonaConfig::default(),
918            trajectory_config: TrajectoryConfig::default(),
919            reasoning_config: ReasoningConfig::default(),
920            tree_config: TreeConfig::default(),
921        }
922    }
923
924    // ── fetch_graph_facts ─────────────────────────────────────────────────────
925
926    #[tokio::test]
927    async fn fetch_graph_facts_returns_none_when_memory_is_none() {
928        let view = empty_view();
929        let tc = TokenCounter::new();
930        let result = fetch_graph_facts(&view, "test", 1000, &tc).await.unwrap();
931        assert!(result.is_none());
932    }
933
934    #[tokio::test]
935    async fn fetch_graph_facts_returns_none_when_budget_zero() {
936        let mut view = empty_view();
937        view.graph_config.enabled = true;
938        let tc = TokenCounter::new();
939        let result = fetch_graph_facts(&view, "test", 0, &tc).await.unwrap();
940        assert!(result.is_none());
941    }
942
943    #[tokio::test]
944    async fn fetch_graph_facts_returns_none_when_graph_disabled() {
945        let mut view = empty_view();
946        view.graph_config.enabled = false;
947        let tc = TokenCounter::new();
948        let result = fetch_graph_facts(&view, "test", 1000, &tc).await.unwrap();
949        assert!(result.is_none());
950    }
951
952    // ── fetch_persona_facts ───────────────────────────────────────────────────
953
954    #[tokio::test]
955    async fn fetch_persona_facts_returns_none_when_memory_is_none() {
956        let view = empty_view();
957        let tc = TokenCounter::new();
958        let result = fetch_persona_facts(&view, 1000, &tc).await.unwrap();
959        assert!(result.is_none());
960    }
961
962    #[tokio::test]
963    async fn fetch_persona_facts_returns_none_when_budget_zero() {
964        let mut view = empty_view();
965        view.persona_config.enabled = true;
966        let tc = TokenCounter::new();
967        let result = fetch_persona_facts(&view, 0, &tc).await.unwrap();
968        assert!(result.is_none());
969    }
970
971    // ── fetch_trajectory_hints ────────────────────────────────────────────────
972
973    #[tokio::test]
974    async fn fetch_trajectory_hints_returns_none_when_memory_is_none() {
975        let view = empty_view();
976        let tc = TokenCounter::new();
977        let result = fetch_trajectory_hints(&view, 1000, &tc).await.unwrap();
978        assert!(result.is_none());
979    }
980
981    #[tokio::test]
982    async fn fetch_trajectory_hints_returns_none_when_budget_zero() {
983        let mut view = empty_view();
984        view.trajectory_config.enabled = true;
985        let tc = TokenCounter::new();
986        let result = fetch_trajectory_hints(&view, 0, &tc).await.unwrap();
987        assert!(result.is_none());
988    }
989
990    // ── fetch_tree_memory ─────────────────────────────────────────────────────
991
992    #[tokio::test]
993    async fn fetch_tree_memory_returns_none_when_memory_is_none() {
994        let view = empty_view();
995        let tc = TokenCounter::new();
996        let result = fetch_tree_memory(&view, 1000, &tc).await.unwrap();
997        assert!(result.is_none());
998    }
999
1000    #[tokio::test]
1001    async fn fetch_tree_memory_returns_none_when_budget_zero() {
1002        let mut view = empty_view();
1003        view.tree_config.enabled = true;
1004        let tc = TokenCounter::new();
1005        let result = fetch_tree_memory(&view, 0, &tc).await.unwrap();
1006        assert!(result.is_none());
1007    }
1008
1009    // ── fetch_corrections ─────────────────────────────────────────────────────
1010
1011    #[tokio::test]
1012    async fn fetch_corrections_returns_none_when_memory_is_none() {
1013        let view = empty_view();
1014        let result = fetch_corrections(&view, "test", 10, 0.5, |s| s.into())
1015            .await
1016            .unwrap();
1017        assert!(result.is_none());
1018    }
1019
1020    // ── fetch_semantic_recall ─────────────────────────────────────────────────
1021
1022    #[tokio::test]
1023    async fn fetch_semantic_recall_returns_none_when_memory_is_none() {
1024        let view = empty_view();
1025        let tc = TokenCounter::new();
1026        let result = fetch_semantic_recall(&view, "test", 1000, &tc, None)
1027            .await
1028            .unwrap();
1029        assert!(result.0.is_none() && result.1.is_none());
1030    }
1031
1032    #[tokio::test]
1033    async fn fetch_semantic_recall_returns_none_when_budget_zero() {
1034        let view = empty_view();
1035        let tc = TokenCounter::new();
1036        let result = fetch_semantic_recall(&view, "test", 0, &tc, None)
1037            .await
1038            .unwrap();
1039        assert!(result.0.is_none() && result.1.is_none());
1040    }
1041
1042    // ── fetch_document_rag ────────────────────────────────────────────────────
1043
1044    #[tokio::test]
1045    async fn fetch_document_rag_returns_none_when_memory_is_none() {
1046        let mut view = empty_view();
1047        view.document_config.rag_enabled = true;
1048        let tc = TokenCounter::new();
1049        let result = fetch_document_rag(&view, "test", 1000, &tc).await.unwrap();
1050        assert!(result.is_none());
1051    }
1052
1053    #[tokio::test]
1054    async fn fetch_document_rag_returns_none_when_rag_disabled() {
1055        let view = empty_view();
1056        let tc = TokenCounter::new();
1057        let result = fetch_document_rag(&view, "test", 1000, &tc).await.unwrap();
1058        assert!(result.is_none());
1059    }
1060
1061    // ── fetch_summaries ───────────────────────────────────────────────────────
1062
1063    #[tokio::test]
1064    async fn fetch_summaries_returns_none_when_memory_is_none() {
1065        let view = empty_view();
1066        let tc = TokenCounter::new();
1067        let result = fetch_summaries(&view, 1000, &tc).await.unwrap();
1068        assert!(result.is_none());
1069    }
1070
1071    // ── fetch_cross_session ───────────────────────────────────────────────────
1072
1073    #[tokio::test]
1074    async fn fetch_cross_session_returns_none_when_memory_is_none() {
1075        let view = empty_view();
1076        let tc = TokenCounter::new();
1077        let result = fetch_cross_session(&view, "test", 1000, &tc).await.unwrap();
1078        assert!(result.is_none());
1079    }
1080
1081    // ── fetch_reasoning_strategies ────────────────────────────────────────────
1082
1083    #[tokio::test]
1084    async fn fetch_reasoning_strategies_returns_none_when_memory_is_none() {
1085        let mut view = empty_view();
1086        view.reasoning_config.enabled = true;
1087        let tc = TokenCounter::new();
1088        let result = fetch_reasoning_strategies(&view, "query", 1000, 3, &tc)
1089            .await
1090            .unwrap();
1091        assert!(result.is_none());
1092    }
1093
1094    #[tokio::test]
1095    async fn fetch_reasoning_strategies_returns_none_when_budget_zero() {
1096        let mut view = empty_view();
1097        view.reasoning_config.enabled = true;
1098        let tc = TokenCounter::new();
1099        let result = fetch_reasoning_strategies(&view, "query", 0, 3, &tc)
1100            .await
1101            .unwrap();
1102        assert!(result.is_none());
1103    }
1104}