1use super::Store;
7use kernex_core::{
8 config::SYSTEM_FACT_KEYS,
9 context::{CompactionStrategy, Context, ContextEntry, ContextNeeds},
10 error::KernexError,
11 message::Request,
12 traits::Summarizer,
13};
14
15pub use super::context_helpers::detect_language;
17#[cfg(test)]
18pub(super) use super::context_helpers::onboarding_hint_text;
19pub(super) use super::context_helpers::{
20 build_system_prompt, compute_onboarding_stage, SystemPromptContext,
21};
22
23const IDENTITY_KEYS: &[&str] = &["name", "preferred_name", "pronouns"];
25
26const CONTEXT_KEYS: &[&str] = &["timezone", "location", "occupation"];
28
29impl Store {
30 pub async fn build_context(
40 &self,
41 channel: &str,
42 incoming: &Request,
43 base_system_prompt: &str,
44 needs: &ContextNeeds,
45 active_project: Option<&str>,
46 summarizer: Option<&dyn Summarizer>,
47 ) -> Result<Context, KernexError> {
48 let project_key = active_project.unwrap_or("");
49 let conv_id = self
50 .get_or_create_conversation(channel, &incoming.sender_id, project_key)
51 .await?;
52
53 let history_fut = async {
54 let rows: Vec<(String, String)> = sqlx::query_as(
55 "SELECT role, content FROM (\
56 SELECT role, content, timestamp FROM messages \
57 WHERE conversation_id = ? ORDER BY timestamp DESC LIMIT ?\
58 ) ORDER BY timestamp ASC",
59 )
60 .bind(&conv_id)
61 .bind(self.max_context_messages as i64)
62 .fetch_all(&self.pool)
63 .await
64 .map_err(|e| KernexError::Store(format!("query failed: {e}")))?;
65
66 Ok::<Vec<ContextEntry>, KernexError>(
67 rows.into_iter()
68 .map(|(role, content)| ContextEntry { role, content })
69 .collect(),
70 )
71 };
72
73 let facts_fut = async {
74 self.get_facts(&incoming.sender_id)
75 .await
76 .unwrap_or_default()
77 };
78
79 let summaries_fut = async {
80 if needs.summaries {
81 self.get_recent_summaries(channel, &incoming.sender_id, 3)
82 .await
83 .unwrap_or_default()
84 } else {
85 vec![]
86 }
87 };
88
89 let recall_fut = async {
90 if needs.recall {
91 self.search_messages(&incoming.text, &conv_id, &incoming.sender_id, 5)
92 .await
93 .unwrap_or_default()
94 } else {
95 vec![]
96 }
97 };
98
99 let tasks_fut = async {
100 if needs.pending_tasks {
101 self.get_tasks_for_sender(&incoming.sender_id)
102 .await
103 .unwrap_or_default()
104 } else {
105 vec![]
106 }
107 };
108
109 let outcomes_fut = async {
110 if needs.outcomes {
111 self.get_recent_outcomes(&incoming.sender_id, 15, active_project)
112 .await
113 .unwrap_or_default()
114 } else {
115 vec![]
116 }
117 };
118
119 let lessons_fut = async {
120 self.get_lessons(&incoming.sender_id, active_project)
121 .await
122 .unwrap_or_default()
123 };
124
125 let (history_res, facts, summaries, recall, pending_tasks, outcomes, lessons) = tokio::join!(
126 history_fut,
127 facts_fut,
128 summaries_fut,
129 recall_fut,
130 tasks_fut,
131 outcomes_fut,
132 lessons_fut,
133 );
134
135 let history = history_res?;
136
137 let overflow_count = if history.len() >= self.max_context_messages {
142 let total: (i64,) =
143 sqlx::query_as("SELECT COUNT(*) FROM messages WHERE conversation_id = ?")
144 .bind(&conv_id)
145 .fetch_one(&self.pool)
146 .await
147 .map_err(|e| KernexError::Store(format!("count failed: {e}")))?;
148 (total.0 as usize).saturating_sub(self.max_context_messages)
149 } else {
150 0
151 };
152
153 let compact_summary = if overflow_count > 0 {
155 if let (CompactionStrategy::Summarize, Some(s)) = (&needs.compact, summarizer) {
156 let overflow_rows: Vec<(String, String)> = sqlx::query_as(
157 "SELECT role, content FROM messages \
158 WHERE conversation_id = ? ORDER BY timestamp ASC LIMIT ?",
159 )
160 .bind(&conv_id)
161 .bind(overflow_count as i64)
162 .fetch_all(&self.pool)
163 .await
164 .map_err(|e| KernexError::Store(format!("query failed: {e}")))?;
165
166 if overflow_rows.is_empty() {
167 None
168 } else {
169 let text = overflow_rows
170 .iter()
171 .map(|(role, content)| format!("{role}: {content}"))
172 .collect::<Vec<_>>()
173 .join("\n");
174
175 match s.summarize(&text).await {
176 Ok(summary) if !summary.is_empty() => Some(summary),
177 Ok(_) => {
178 tracing::warn!(
179 conversation_id = %conv_id,
180 overflow = overflow_count,
181 "summarizer returned empty output; dropping {overflow_count} oldest messages",
182 );
183 None
184 }
185 Err(e) => {
186 tracing::warn!(
187 conversation_id = %conv_id,
188 overflow = overflow_count,
189 error = %e,
190 "summarizer failed; falling back to silent drop of {overflow_count} oldest messages",
191 );
192 None
193 }
194 }
195 }
196 } else {
197 tracing::warn!(
202 conversation_id = %conv_id,
203 overflow = overflow_count,
204 max = self.max_context_messages,
205 "history overflow: silently dropping {overflow_count} oldest messages. \
206 Enable RuntimeBuilder::auto_compact for summarization.",
207 );
208 None
209 }
210 } else {
211 None
212 };
213
214 let language =
216 if let Some((_, lang)) = facts.iter().find(|(k, _)| k == "preferred_language") {
217 lang.clone()
218 } else {
219 let detected = detect_language(&incoming.text).to_string();
220 let _ = self
221 .store_fact(&incoming.sender_id, "preferred_language", &detected)
222 .await;
223 detected
224 };
225
226 let real_fact_count = facts
228 .iter()
229 .filter(|(k, _)| !SYSTEM_FACT_KEYS.contains(&k.as_str()))
230 .count();
231 let has_tasks = !pending_tasks.is_empty();
232
233 let current_stage: u8 = facts
234 .iter()
235 .find(|(k, _)| k == "onboarding_stage")
236 .and_then(|(_, v)| v.parse().ok())
237 .unwrap_or(0);
238
239 let new_stage = compute_onboarding_stage(current_stage, real_fact_count, has_tasks);
240
241 let onboarding_hint = if new_stage != current_stage {
242 let _ = self
243 .store_fact(
244 &incoming.sender_id,
245 "onboarding_stage",
246 &new_stage.to_string(),
247 )
248 .await;
249 Some(new_stage)
250 } else if current_stage == 0 && real_fact_count == 0 {
251 Some(0u8)
252 } else {
253 if facts.iter().all(|(k, _)| k != "onboarding_stage") && current_stage == 0 {
254 let bootstrapped = compute_onboarding_stage(0, real_fact_count, has_tasks);
255 let final_stage = (0..=4).fold(0u8, |s, _| {
256 compute_onboarding_stage(s, real_fact_count, has_tasks)
257 });
258 if final_stage > 0 {
259 let _ = self
260 .store_fact(
261 &incoming.sender_id,
262 "onboarding_stage",
263 &final_stage.to_string(),
264 )
265 .await;
266 }
267 let _ = bootstrapped;
268 None
269 } else {
270 None
271 }
272 };
273
274 let facts_for_prompt: &[(String, String)] = if needs.profile { &facts } else { &[] };
275 let built_prompt = build_system_prompt(&SystemPromptContext {
276 base_rules: base_system_prompt,
277 facts: facts_for_prompt,
278 summaries: &summaries,
279 recall: &recall,
280 pending_tasks: &pending_tasks,
281 outcomes: &outcomes,
282 lessons: &lessons,
283 language: &language,
284 onboarding_hint,
285 });
286
287 let system_prompt = if let Some(summary) = compact_summary {
288 format!("[Earlier conversation summary]\n{summary}\n\n{built_prompt}")
289 } else {
290 built_prompt
291 };
292
293 Ok(Context {
294 system_prompt,
295 history,
296 current_message: incoming.text.clone(),
297 mcp_servers: Vec::new(),
298 toolboxes: Vec::new(),
299 max_turns: None,
300 allowed_tools: None,
301 model: None,
302 session_id: None,
303 agent_name: None,
304 hook_runner: None,
305 permission_rules: None,
306 extended_thinking: false,
307 })
308 }
309}
310
311pub fn format_user_profile(facts: &[(String, String)]) -> String {
314 let user_facts: Vec<&(String, String)> = facts
315 .iter()
316 .filter(|(k, _)| !SYSTEM_FACT_KEYS.contains(&k.as_str()))
317 .collect();
318
319 if user_facts.is_empty() {
320 return String::new();
321 }
322
323 let mut lines = vec!["User profile:".to_string()];
324
325 for key in IDENTITY_KEYS {
326 if let Some((_, v)) = user_facts.iter().find(|(k, _)| k == key) {
327 lines.push(format!("- {key}: {v}"));
328 }
329 }
330
331 for key in CONTEXT_KEYS {
332 if let Some((_, v)) = user_facts.iter().find(|(k, _)| k == key) {
333 lines.push(format!("- {key}: {v}"));
334 }
335 }
336
337 let known_keys: Vec<&str> = IDENTITY_KEYS
338 .iter()
339 .chain(CONTEXT_KEYS.iter())
340 .copied()
341 .collect();
342 for (k, v) in &user_facts {
343 if !known_keys.contains(&k.as_str()) {
344 lines.push(format!("- {k}: {v}"));
345 }
346 }
347
348 lines.join("\n")
349}