Skip to main content

kernex_memory/store/
context.rs

1//! Context building and user profile formatting.
2//!
3//! Helper functions for onboarding, system prompt composition, language
4//! detection, and relative time formatting live in `context_helpers`.
5
6use super::Store;
7use crate::error::MemoryError;
8use kernex_core::{
9    config::SYSTEM_FACT_KEYS,
10    context::{CompactionStrategy, Context, ContextEntry, ContextNeeds},
11    message::Request,
12    traits::Summarizer,
13};
14
15// Re-export helpers so existing `super::context::*` paths in tests keep working.
16pub use super::context_helpers::detect_language;
17#[cfg(test)]
18pub(super) use super::context_helpers::onboarding_hint_text;
19pub(super) use super::context_helpers::{
20    build_system_prompt, compute_onboarding_stage, SystemPromptContext,
21};
22
23/// Identity fact keys — shown first in the user profile.
24const IDENTITY_KEYS: &[&str] = &["name", "preferred_name", "pronouns"];
25
26/// Context fact keys — shown second in the user profile.
27const CONTEXT_KEYS: &[&str] = &["timezone", "location", "occupation"];
28
29impl Store {
30    /// Build a conversation context from memory for the provider.
31    ///
32    /// The `channel` parameter identifies the communication channel since
33    /// `Request` is channel-agnostic.
34    ///
35    /// When `needs.compact` is [`CompactionStrategy::Summarize`] and a
36    /// `summarizer` is provided, overflow messages (those beyond
37    /// `max_context_messages`) are summarized and prepended to the system
38    /// prompt instead of being silently dropped.
39    pub async fn build_context(
40        &self,
41        channel: &str,
42        incoming: &Request,
43        base_system_prompt: &str,
44        needs: &ContextNeeds,
45        active_project: Option<&str>,
46        summarizer: Option<&dyn Summarizer>,
47    ) -> Result<Context, MemoryError> {
48        let project_key = active_project.unwrap_or("");
49        let conv_id = self
50            .get_or_create_conversation(channel, &incoming.sender_id, project_key)
51            .await?;
52
53        let history_fut = async {
54            let rows: Vec<(String, String)> = sqlx::query_as(
55                "SELECT role, content FROM (\
56                     SELECT role, content, timestamp FROM messages \
57                     WHERE conversation_id = ? ORDER BY timestamp DESC LIMIT ?\
58                 ) ORDER BY timestamp ASC",
59            )
60            .bind(&conv_id)
61            .bind(self.max_context_messages as i64)
62            .fetch_all(&self.pool)
63            .await
64            .map_err(|e| MemoryError::sqlite("query failed", e))?;
65
66            Ok::<Vec<ContextEntry>, MemoryError>(
67                rows.into_iter()
68                    .map(|(role, content)| ContextEntry { role, content })
69                    .collect(),
70            )
71        };
72
73        let facts_fut = async {
74            self.get_facts(&incoming.sender_id)
75                .await
76                .unwrap_or_default()
77        };
78
79        let summaries_fut = async {
80            if needs.summaries {
81                self.get_recent_summaries(channel, &incoming.sender_id, 3)
82                    .await
83                    .unwrap_or_default()
84            } else {
85                vec![]
86            }
87        };
88
89        let recall_fut = async {
90            if needs.recall {
91                self.search_messages(&incoming.text, &conv_id, &incoming.sender_id, 5, None)
92                    .await
93                    .unwrap_or_default()
94            } else {
95                vec![]
96            }
97        };
98
99        let tasks_fut = async {
100            if needs.pending_tasks {
101                self.get_tasks_for_sender(&incoming.sender_id)
102                    .await
103                    .unwrap_or_default()
104            } else {
105                vec![]
106            }
107        };
108
109        let outcomes_fut = async {
110            if needs.outcomes {
111                self.get_recent_outcomes(&incoming.sender_id, 15, active_project)
112                    .await
113                    .unwrap_or_default()
114            } else {
115                vec![]
116            }
117        };
118
119        let lessons_fut = async {
120            self.get_lessons(&incoming.sender_id, active_project)
121                .await
122                .unwrap_or_default()
123        };
124
125        let (history_res, facts, summaries, recall, pending_tasks, outcomes, lessons) = tokio::join!(
126            history_fut,
127            facts_fut,
128            summaries_fut,
129            recall_fut,
130            tasks_fut,
131            outcomes_fut,
132            lessons_fut,
133        );
134
135        let history = history_res?;
136
137        // Detect history overflow once. We only run the COUNT when the
138        // history loader hit its LIMIT, so short conversations pay nothing.
139        // Whichever branch handles overflow (summarize or warn) reuses the
140        // same count.
141        let overflow_count = if history.len() >= self.max_context_messages {
142            let total: (i64,) =
143                sqlx::query_as("SELECT COUNT(*) FROM messages WHERE conversation_id = ?")
144                    .bind(&conv_id)
145                    .fetch_one(&self.pool)
146                    .await
147                    .map_err(|e| MemoryError::sqlite("count failed", e))?;
148            (total.0 as usize).saturating_sub(self.max_context_messages)
149        } else {
150            0
151        };
152
153        // Auto-compact: summarize overflow messages instead of silently dropping.
154        let compact_summary = if overflow_count > 0 {
155            if let (CompactionStrategy::Summarize, Some(s)) = (&needs.compact, summarizer) {
156                let overflow_rows: Vec<(String, String)> = sqlx::query_as(
157                    "SELECT role, content FROM messages \
158                     WHERE conversation_id = ? ORDER BY timestamp ASC LIMIT ?",
159                )
160                .bind(&conv_id)
161                .bind(overflow_count as i64)
162                .fetch_all(&self.pool)
163                .await
164                .map_err(|e| MemoryError::sqlite("query failed", e))?;
165
166                if overflow_rows.is_empty() {
167                    None
168                } else {
169                    let text = overflow_rows
170                        .iter()
171                        .map(|(role, content)| format!("{role}: {content}"))
172                        .collect::<Vec<_>>()
173                        .join("\n");
174
175                    match s.summarize(&text).await {
176                        Ok(summary) if !summary.is_empty() => Some(summary),
177                        Ok(_) => {
178                            tracing::warn!(
179                                conversation_id = %conv_id,
180                                overflow = overflow_count,
181                                "summarizer returned empty output; dropping {overflow_count} oldest messages",
182                            );
183                            None
184                        }
185                        Err(e) => {
186                            tracing::warn!(
187                                conversation_id = %conv_id,
188                                overflow = overflow_count,
189                                error = %e,
190                                "summarizer failed; falling back to silent drop of {overflow_count} oldest messages",
191                            );
192                            None
193                        }
194                    }
195                }
196            } else {
197                // Drop strategy or no summarizer wired in. Surface this as a
198                // warn so operators running with default tracing see that
199                // history is being lost and have a path to the fix
200                // (RuntimeBuilder::auto_compact). One log per overflow event.
201                tracing::warn!(
202                    conversation_id = %conv_id,
203                    overflow = overflow_count,
204                    max = self.max_context_messages,
205                    "history overflow: silently dropping {overflow_count} oldest messages. \
206                     Enable RuntimeBuilder::auto_compact for summarization.",
207                );
208                None
209            }
210        } else {
211            None
212        };
213
214        // Resolve language: stored preference > auto-detect > English.
215        let language =
216            if let Some((_, lang)) = facts.iter().find(|(k, _)| k == "preferred_language") {
217                lang.clone()
218            } else {
219                let detected = detect_language(&incoming.text).to_string();
220                let _ = self
221                    .store_fact(&incoming.sender_id, "preferred_language", &detected)
222                    .await;
223                detected
224            };
225
226        // Progressive onboarding: compute stage and inject hint on transitions.
227        let real_fact_count = facts
228            .iter()
229            .filter(|(k, _)| !SYSTEM_FACT_KEYS.contains(&k.as_str()))
230            .count();
231        let has_tasks = !pending_tasks.is_empty();
232
233        let current_stage: u8 = facts
234            .iter()
235            .find(|(k, _)| k == "onboarding_stage")
236            .and_then(|(_, v)| v.parse().ok())
237            .unwrap_or(0);
238
239        let new_stage = compute_onboarding_stage(current_stage, real_fact_count, has_tasks);
240
241        let onboarding_hint = if new_stage != current_stage {
242            let _ = self
243                .store_fact(
244                    &incoming.sender_id,
245                    "onboarding_stage",
246                    &new_stage.to_string(),
247                )
248                .await;
249            Some(new_stage)
250        } else if current_stage == 0 && real_fact_count == 0 {
251            Some(0u8)
252        } else {
253            if facts.iter().all(|(k, _)| k != "onboarding_stage") && current_stage == 0 {
254                let bootstrapped = compute_onboarding_stage(0, real_fact_count, has_tasks);
255                let final_stage = (0..=4).fold(0u8, |s, _| {
256                    compute_onboarding_stage(s, real_fact_count, has_tasks)
257                });
258                if final_stage > 0 {
259                    let _ = self
260                        .store_fact(
261                            &incoming.sender_id,
262                            "onboarding_stage",
263                            &final_stage.to_string(),
264                        )
265                        .await;
266                }
267                let _ = bootstrapped;
268                None
269            } else {
270                None
271            }
272        };
273
274        let facts_for_prompt: &[(String, String)] = if needs.profile { &facts } else { &[] };
275        let built_prompt = build_system_prompt(&SystemPromptContext {
276            base_rules: base_system_prompt,
277            facts: facts_for_prompt,
278            summaries: &summaries,
279            recall: &recall,
280            pending_tasks: &pending_tasks,
281            outcomes: &outcomes,
282            lessons: &lessons,
283            language: &language,
284            onboarding_hint,
285        });
286
287        let system_prompt = if let Some(summary) = compact_summary {
288            format!("[Earlier conversation summary]\n{summary}\n\n{built_prompt}")
289        } else {
290            built_prompt
291        };
292
293        Ok(Context {
294            system_prompt,
295            history,
296            current_message: incoming.text.clone(),
297            mcp_servers: Vec::new(),
298            toolboxes: Vec::new(),
299            max_turns: None,
300            allowed_tools: None,
301            model: None,
302            session_id: None,
303            agent_name: None,
304            hook_runner: None,
305            permission_rules: None,
306            extended_thinking: false,
307        })
308    }
309}
310
311/// Format user facts into a structured profile, filtering system keys
312/// and grouping identity facts first, then context, then the rest.
313pub fn format_user_profile(facts: &[(String, String)]) -> String {
314    let user_facts: Vec<&(String, String)> = facts
315        .iter()
316        .filter(|(k, _)| !SYSTEM_FACT_KEYS.contains(&k.as_str()))
317        .collect();
318
319    if user_facts.is_empty() {
320        return String::new();
321    }
322
323    let mut lines = vec!["User profile:".to_string()];
324
325    for key in IDENTITY_KEYS {
326        if let Some((_, v)) = user_facts.iter().find(|(k, _)| k == key) {
327            lines.push(format!("- {key}: {v}"));
328        }
329    }
330
331    for key in CONTEXT_KEYS {
332        if let Some((_, v)) = user_facts.iter().find(|(k, _)| k == key) {
333            lines.push(format!("- {key}: {v}"));
334        }
335    }
336
337    let known_keys: Vec<&str> = IDENTITY_KEYS
338        .iter()
339        .chain(CONTEXT_KEYS.iter())
340        .copied()
341        .collect();
342    for (k, v) in &user_facts {
343        if !known_keys.contains(&k.as_str()) {
344            lines.push(format!("- {k}: {v}"));
345        }
346    }
347
348    lines.join("\n")
349}