Skip to main content

butterfly_bot/services/
query.rs

1use std::sync::Arc;
2
3use async_stream::try_stream;
4use futures::stream::BoxStream;
5use futures::StreamExt;
6use std::hash::Hasher;
7use std::time::{Instant, SystemTime, UNIX_EPOCH};
8
9use md5::{Digest, Md5};
10
11use crate::error::Result;
12use crate::interfaces::providers::{ImageInput, MemoryProvider};
13use crate::reminders::ReminderStore;
14use crate::services::agent::AgentService;
15use crate::vault;
16use tracing::info;
17
18#[derive(Debug, Clone)]
19pub enum UserInput {
20    Text(String),
21    Audio {
22        bytes: Vec<u8>,
23        input_format: String,
24    },
25}
26
27#[derive(Debug, Clone)]
28pub enum OutputFormat {
29    Text,
30    Audio { voice: String, format: String },
31}
32
33#[derive(Clone)]
34pub struct ProcessOptions {
35    pub prompt: Option<String>,
36    pub images: Vec<ImageInput>,
37    pub output_format: OutputFormat,
38    pub image_detail: String,
39    pub json_schema: Option<serde_json::Value>,
40}
41
42#[derive(Debug, Clone)]
43pub enum ProcessResult {
44    Text(String),
45    Audio(Vec<u8>),
46    Structured(serde_json::Value),
47}
48
49pub struct QueryService {
50    agent_service: Arc<AgentService>,
51    memory_provider: Option<Arc<dyn MemoryProvider>>,
52    reminder_store: Option<Arc<ReminderStore>>,
53    context_cache: tokio::sync::RwLock<Option<u64>>,
54}
55
56impl QueryService {
57    pub fn new(
58        agent_service: Arc<AgentService>,
59        memory_provider: Option<Arc<dyn MemoryProvider>>,
60        reminder_store: Option<Arc<ReminderStore>>,
61    ) -> Self {
62        Self {
63            agent_service,
64            memory_provider,
65            reminder_store,
66            context_cache: tokio::sync::RwLock::new(None),
67        }
68    }
69
70    async fn ensure_context_in_memory(&self, user_id: &str) -> Result<()> {
71        let started = Instant::now();
72        let Some(provider) = &self.memory_provider else {
73            return Ok(());
74        };
75        let _ = self.agent_service.refresh_context_for_user(user_id).await?;
76        info!(
77            "ensure_context_in_memory: refresh_context took {:?}",
78            started.elapsed()
79        );
80        let Some(context_markdown) = self.agent_service.get_context_markdown().await else {
81            return Ok(());
82        };
83        if context_markdown.trim().is_empty() {
84            return Ok(());
85        }
86
87        let mut md5_hasher = Md5::new();
88        md5_hasher.update(context_markdown.as_bytes());
89        let md5_hash = format!("{:x}", md5_hasher.finalize());
90        if let Some(stored) = vault::get_secret("context_md5")? {
91            if stored == md5_hash {
92                let mut guard = self.context_cache.write().await;
93                if guard.is_none() {
94                    *guard = Some(0);
95                }
96                info!(
97                    "ensure_context_in_memory: md5 unchanged, skipping import (elapsed {:?})",
98                    started.elapsed()
99                );
100                return Ok(());
101            }
102        }
103
104        let mut hasher = std::collections::hash_map::DefaultHasher::new();
105        std::hash::Hash::hash(&context_markdown, &mut hasher);
106        let hash = hasher.finish();
107
108        let mut guard = self.context_cache.write().await;
109        if guard.is_none_or(|prev| prev != hash) {
110            let content = format!("CONTEXT_DOC:\n{}", context_markdown);
111            provider
112                .append_message(user_id, "context", &content)
113                .await?;
114            *guard = Some(hash);
115            vault::set_secret("context_md5", &md5_hash)?;
116            info!(
117                "ensure_context_in_memory: imported context into memory (elapsed {:?})",
118                started.elapsed()
119            );
120        }
121        Ok(())
122    }
123
124    pub async fn process_text(
125        &self,
126        user_id: &str,
127        query: &str,
128        prompt: Option<&str>,
129    ) -> Result<String> {
130        let processed_query = query.to_string();
131        let autonomy_tick = is_autonomy_tick(&processed_query);
132
133        self.ensure_context_in_memory(user_id).await?;
134
135        if let Some(response) = self
136            .try_handle_search_command(user_id, &processed_query)
137            .await?
138        {
139            if let Some(provider) = &self.memory_provider {
140                provider
141                    .append_message(user_id, "user", &processed_query)
142                    .await?;
143                provider
144                    .append_message(user_id, "assistant", &response)
145                    .await?;
146            }
147            return Ok(response);
148        }
149
150        if let Some(response) = self
151            .try_handle_tasks_command(user_id, &processed_query)
152            .await?
153        {
154            if let Some(provider) = &self.memory_provider {
155                provider
156                    .append_message(user_id, "user", &processed_query)
157                    .await?;
158                provider
159                    .append_message(user_id, "assistant", &response)
160                    .await?;
161            }
162            return Ok(response);
163        }
164
165        if let Some(response) = self
166            .try_handle_reminders_command(user_id, &processed_query)
167            .await?
168        {
169            if let Some(provider) = &self.memory_provider {
170                provider
171                    .append_message(user_id, "user", &processed_query)
172                    .await?;
173                provider
174                    .append_message(user_id, "assistant", &response)
175                    .await?;
176            }
177            return Ok(response);
178        }
179
180        if let Some(response) = self
181            .try_handle_todo_command(user_id, &processed_query)
182            .await?
183        {
184            if let Some(provider) = &self.memory_provider {
185                provider
186                    .append_message(user_id, "user", &processed_query)
187                    .await?;
188                provider
189                    .append_message(user_id, "assistant", &response)
190                    .await?;
191            }
192            return Ok(response);
193        }
194
195        if let Some(response) = self
196            .try_handle_plans_command(user_id, &processed_query)
197            .await?
198        {
199            if let Some(provider) = &self.memory_provider {
200                provider
201                    .append_message(user_id, "user", &processed_query)
202                    .await?;
203                provider
204                    .append_message(user_id, "assistant", &response)
205                    .await?;
206            }
207            return Ok(response);
208        }
209
210        let reminder_context = if let Some(store) = &self.reminder_store {
211            build_reminder_context(store, user_id).await
212        } else {
213            None
214        };
215        let mut memory_context = if let Some(provider) = &self.memory_provider {
216            let include_semantic = should_include_semantic_memory(&processed_query);
217            let history_future = provider.get_history(user_id, 12);
218            let semantic_future = async {
219                if include_semantic {
220                    provider.search(user_id, &processed_query, 5).await
221                } else {
222                    Ok(Vec::new())
223                }
224            };
225            let (history, semantic) = tokio::try_join!(history_future, semantic_future)?;
226            let history = history.join("\n");
227            build_memory_context(history, semantic, reminder_context)
228        } else {
229            reminder_context.unwrap_or_default()
230        };
231
232        if let Some(context_markdown) = self.context_for_autonomy(user_id, &processed_query).await {
233            if !memory_context.is_empty() {
234                memory_context.push_str("\n\n");
235            }
236            memory_context.push_str(&context_markdown);
237        }
238
239        let response = self
240            .agent_service
241            .generate_response(user_id, &processed_query, &memory_context, prompt)
242            .await?;
243
244        if let Some(provider) = &self.memory_provider {
245            if autonomy_tick {
246                return Ok(response);
247            }
248            provider
249                .append_message(user_id, "user", &processed_query)
250                .await?;
251            provider
252                .append_message(user_id, "assistant", &response)
253                .await?;
254        }
255
256        Ok(response)
257    }
258
259    pub async fn process(
260        &self,
261        user_id: &str,
262        input: UserInput,
263        options: ProcessOptions,
264    ) -> Result<ProcessResult> {
265        let text = match input {
266            UserInput::Text(value) => value,
267            UserInput::Audio {
268                bytes,
269                input_format,
270            } => {
271                self.agent_service
272                    .transcribe_audio(bytes, &input_format)
273                    .await?
274            }
275        };
276        let autonomy_tick = is_autonomy_tick(&text);
277
278        self.ensure_context_in_memory(user_id).await?;
279
280        if let Some(response) = self.try_handle_search_command(user_id, &text).await? {
281            if let Some(provider) = &self.memory_provider {
282                provider.append_message(user_id, "user", &text).await?;
283                provider
284                    .append_message(user_id, "assistant", &response)
285                    .await?;
286            }
287            return Ok(ProcessResult::Text(response));
288        }
289
290        if let Some(response) = self.try_handle_tasks_command(user_id, &text).await? {
291            if let Some(provider) = &self.memory_provider {
292                provider.append_message(user_id, "user", &text).await?;
293                provider
294                    .append_message(user_id, "assistant", &response)
295                    .await?;
296            }
297            return Ok(ProcessResult::Text(response));
298        }
299
300        if let Some(response) = self.try_handle_reminders_command(user_id, &text).await? {
301            if let Some(provider) = &self.memory_provider {
302                provider.append_message(user_id, "user", &text).await?;
303                provider
304                    .append_message(user_id, "assistant", &response)
305                    .await?;
306            }
307            return Ok(ProcessResult::Text(response));
308        }
309
310        if let Some(response) = self.try_handle_todo_command(user_id, &text).await? {
311            if let Some(provider) = &self.memory_provider {
312                provider.append_message(user_id, "user", &text).await?;
313                provider
314                    .append_message(user_id, "assistant", &response)
315                    .await?;
316            }
317            return Ok(ProcessResult::Text(response));
318        }
319
320        if let Some(response) = self.try_handle_plans_command(user_id, &text).await? {
321            if let Some(provider) = &self.memory_provider {
322                provider.append_message(user_id, "user", &text).await?;
323                provider
324                    .append_message(user_id, "assistant", &response)
325                    .await?;
326            }
327            return Ok(ProcessResult::Text(response));
328        }
329
330        let reminder_context = if let Some(store) = &self.reminder_store {
331            build_reminder_context(store, user_id).await
332        } else {
333            None
334        };
335        let mut memory_context = if let Some(provider) = &self.memory_provider {
336            let include_semantic = should_include_semantic_memory(&text);
337            let history_future = provider.get_history(user_id, 12);
338            let semantic_future = async {
339                if include_semantic {
340                    provider.search(user_id, &text, 5).await
341                } else {
342                    Ok(Vec::new())
343                }
344            };
345            let (history, semantic) = tokio::try_join!(history_future, semantic_future)?;
346            let history = history.join("\n");
347            build_memory_context(history, semantic, reminder_context)
348        } else {
349            reminder_context.unwrap_or_default()
350        };
351
352        if let Some(context_markdown) = self.context_for_autonomy(user_id, &text).await {
353            if !memory_context.is_empty() {
354                memory_context.push_str("\n\n");
355            }
356            memory_context.push_str(&context_markdown);
357        }
358
359        let result = if let Some(schema) = options.json_schema {
360            let structured = self
361                .agent_service
362                .generate_structured_response(
363                    user_id,
364                    &text,
365                    &memory_context,
366                    options.prompt.as_deref(),
367                    schema,
368                )
369                .await?;
370            ProcessResult::Structured(structured)
371        } else if !options.images.is_empty() {
372            let response = self
373                .agent_service
374                .generate_response_with_images(
375                    user_id,
376                    &text,
377                    options.images,
378                    &memory_context,
379                    options.prompt.as_deref(),
380                    &options.image_detail,
381                )
382                .await?;
383            ProcessResult::Text(response)
384        } else {
385            let response = self
386                .agent_service
387                .generate_response(user_id, &text, &memory_context, options.prompt.as_deref())
388                .await?;
389            ProcessResult::Text(response)
390        };
391
392        let output = match (result, options.output_format) {
393            (ProcessResult::Text(text), OutputFormat::Audio { voice, format }) => {
394                let bytes = self
395                    .agent_service
396                    .synthesize_audio(&text, &voice, &format)
397                    .await?;
398                ProcessResult::Audio(bytes)
399            }
400            (other, _) => other,
401        };
402
403        if let Some(provider) = &self.memory_provider {
404            if autonomy_tick {
405                return Ok(output);
406            }
407            provider.append_message(user_id, "user", &text).await?;
408            if let ProcessResult::Text(ref message) = output {
409                provider
410                    .append_message(user_id, "assistant", message)
411                    .await?;
412            }
413        }
414
415        Ok(output)
416    }
417
418    pub fn process_text_stream<'a>(
419        &'a self,
420        user_id: &'a str,
421        query: &'a str,
422        prompt: Option<&'a str>,
423    ) -> BoxStream<'a, Result<String>> {
424        Box::pin(try_stream! {
425            let processed_query = query.to_string();
426            let autonomy_tick = is_autonomy_tick(&processed_query);
427
428            self.ensure_context_in_memory(user_id).await?;
429
430            if let Some(response) = self.try_handle_search_command(user_id, &processed_query).await? {
431                if let Some(provider) = &self.memory_provider {
432                    provider.append_message(user_id, "user", &processed_query).await?;
433                    provider.append_message(user_id, "assistant", &response).await?;
434                }
435                yield response;
436                return;
437            }
438
439            if let Some(response) = self.try_handle_tasks_command(user_id, &processed_query).await? {
440                if let Some(provider) = &self.memory_provider {
441                    provider.append_message(user_id, "user", &processed_query).await?;
442                    provider.append_message(user_id, "assistant", &response).await?;
443                }
444                yield response;
445                return;
446            }
447
448            if let Some(response) = self.try_handle_reminders_command(user_id, &processed_query).await? {
449                if let Some(provider) = &self.memory_provider {
450                    provider.append_message(user_id, "user", &processed_query).await?;
451                    provider.append_message(user_id, "assistant", &response).await?;
452                }
453                yield response;
454                return;
455            }
456
457            if let Some(response) = self.try_handle_todo_command(user_id, &processed_query).await? {
458                if let Some(provider) = &self.memory_provider {
459                    provider.append_message(user_id, "user", &processed_query).await?;
460                    provider.append_message(user_id, "assistant", &response).await?;
461                }
462                yield response;
463                return;
464            }
465
466            if let Some(response) = self.try_handle_plans_command(user_id, &processed_query).await? {
467                if let Some(provider) = &self.memory_provider {
468                    provider.append_message(user_id, "user", &processed_query).await?;
469                    provider.append_message(user_id, "assistant", &response).await?;
470                }
471                yield response;
472                return;
473            }
474
475            let reminder_context = if let Some(store) = &self.reminder_store {
476                build_reminder_context(store, user_id).await
477            } else {
478                None
479            };
480            let mut memory_context = if let Some(provider) = &self.memory_provider {
481                let include_semantic = should_include_semantic_memory(&processed_query);
482                let history_future = provider.get_history(user_id, 12);
483                let semantic_future = async {
484                    if include_semantic {
485                        provider.search(user_id, &processed_query, 5).await
486                    } else {
487                        Ok(Vec::new())
488                    }
489                };
490                let (history, semantic) = tokio::try_join!(history_future, semantic_future)?;
491                let history = history.join("\n");
492                build_memory_context(history, semantic, reminder_context)
493            } else {
494                reminder_context.unwrap_or_default()
495            };
496
497            if let Some(context_markdown) = self.context_for_autonomy(user_id, &processed_query).await {
498                if !memory_context.is_empty() {
499                    memory_context.push_str("\n\n");
500                }
501                memory_context.push_str(&context_markdown);
502            }
503
504            let mut response_text = String::new();
505            let mut stream = self.agent_service.generate_response_stream(
506                user_id,
507                &processed_query,
508                &memory_context,
509                prompt,
510            );
511
512            while let Some(chunk) = stream.next().await {
513                let chunk = chunk?;
514                response_text.push_str(&chunk);
515                yield chunk;
516            }
517
518            if let Some(provider) = &self.memory_provider {
519                if autonomy_tick {
520                    return;
521                }
522                provider.append_message(user_id, "user", &processed_query).await?;
523                if !response_text.is_empty() {
524                    provider.append_message(user_id, "assistant", &response_text).await?;
525                }
526            }
527        })
528    }
529
530    pub fn agent_service(&self) -> Arc<AgentService> {
531        self.agent_service.clone()
532    }
533
534    async fn context_for_autonomy(&self, user_id: &str, query: &str) -> Option<String> {
535        if user_id != "system" && !is_autonomy_tick(query) {
536            return None;
537        }
538        let context_markdown = self.agent_service.get_context_markdown().await?;
539        if context_markdown.trim().is_empty() {
540            return None;
541        }
542        let max_len = 8000usize;
543        let trimmed = if context_markdown.len() > max_len {
544            format!(
545                "{}\n...\n[CONTEXT_DOC TRUNCATED]",
546                &context_markdown[..max_len]
547            )
548        } else {
549            context_markdown
550        };
551        Some(format!("CONTEXT_DOC (authoritative):\n{}", trimmed))
552    }
553
554    pub async fn preload_context(&self, user_id: &str) -> Result<()> {
555        self.ensure_context_in_memory(user_id).await
556    }
557
558    pub async fn delete_user_history(&self, user_id: &str) -> Result<()> {
559        if let Some(provider) = &self.memory_provider {
560            provider.clear_history(user_id).await?;
561        }
562        Ok(())
563    }
564
565    pub async fn get_user_history(&self, user_id: &str, limit: usize) -> Result<Vec<String>> {
566        if let Some(provider) = &self.memory_provider {
567            return provider.get_history(user_id, limit).await;
568        }
569        Ok(Vec::new())
570    }
571
572    pub async fn search_memory(
573        &self,
574        user_id: &str,
575        query: &str,
576        limit: usize,
577    ) -> Result<Vec<String>> {
578        if let Some(provider) = &self.memory_provider {
579            return provider.search(user_id, query, limit).await;
580        }
581        Ok(Vec::new())
582    }
583}
584
585fn is_autonomy_tick(query: &str) -> bool {
586    let lower = query.to_lowercase();
587    lower.contains("autonomous") && lower.contains("heartbeat")
588}
589
590fn build_memory_context(
591    history: String,
592    semantic: Vec<String>,
593    reminder_context: Option<String>,
594) -> String {
595    let mut context = String::new();
596    if let Some(reminders) = reminder_context {
597        if !reminders.is_empty() {
598            context.push_str(&reminders);
599            context.push_str("\n\n");
600        }
601    }
602    if !history.is_empty() {
603        let filtered_history = history
604            .lines()
605            .filter(|line| !should_skip_memory_line(line))
606            .collect::<Vec<_>>()
607            .join("\n");
608        if !filtered_history.trim().is_empty() {
609            context.push_str(&filtered_history);
610        }
611    }
612    if !semantic.is_empty() {
613        if !context.is_empty() {
614            context.push_str("\n\n");
615        }
616        context.push_str(
617            "RELEVANT MEMORY (unverified; use only if clearly applicable to the user's request):\n",
618        );
619        for item in semantic
620            .into_iter()
621            .filter(|item| !should_skip_memory_line(item))
622        {
623            context.push_str("- ");
624            context.push_str(&item);
625            context.push('\n');
626        }
627    }
628    context
629}
630
631fn should_skip_memory_line(line: &str) -> bool {
632    let lower = line.to_ascii_lowercase();
633    lower.contains("api key")
634        || lower.contains("api_key")
635        || lower.contains("authorization header")
636        || lower.contains("missing api key")
637        || lower.contains("no api key")
638        || lower.contains("invalid api key")
639        || lower.contains("need your api key")
640}
641
642async fn build_reminder_context(store: &ReminderStore, user_id: &str) -> Option<String> {
643    let now = SystemTime::now().duration_since(UNIX_EPOCH).ok()?.as_secs() as i64;
644    let due = store.due_reminders(user_id, now, 10).await.ok()?;
645    if due.is_empty() {
646        return None;
647    }
648
649    let mut context = String::from("DUE REMINDERS (notify the user naturally in this reply):\n");
650    for reminder in due {
651        context.push_str(&format!(
652            "- {} (id: {}, due_at_unix: {})\n",
653            reminder.title, reminder.id, reminder.due_at
654        ));
655    }
656    Some(context)
657}
658
659fn should_include_semantic_memory(query: &str) -> bool {
660    let trimmed = query.trim();
661    if trimmed.is_empty() {
662        return false;
663    }
664    let lower = trimmed.to_lowercase();
665    if lower.contains("hackathon")
666        || lower.contains("colosseum")
667        || lower.contains("context")
668        || lower.contains("agent hackathon")
669    {
670        return true;
671    }
672    let tokens: Vec<&str> = lower.split_whitespace().collect();
673    if tokens.len() < 3 || trimmed.len() < 12 {
674        return false;
675    }
676    let greeting = matches!(
677        tokens.as_slice(),
678        ["hi"] | ["hello"] | ["hey"] | ["yo"] | ["sup"] | ["hey", "there"] | ["hi", "there"]
679    );
680    !greeting
681}
682
683impl QueryService {
684    async fn try_handle_search_command(&self, user_id: &str, text: &str) -> Result<Option<String>> {
685        let lower = text.to_lowercase();
686        let looks_like_search = lower.contains("search")
687            || lower.contains("latest")
688            || lower.contains("current")
689            || lower.contains("today")
690            || lower.contains("breaking")
691            || lower.contains("news")
692            || lower.contains("headline")
693            || lower.contains("up to date")
694            || lower.contains("what's new")
695            || lower.contains("whats new");
696        if !looks_like_search {
697            return Ok(None);
698        }
699
700        let tool = self
701            .agent_service
702            .tool_registry
703            .get_tool("search_internet")
704            .await;
705        let Some(_tool) = tool else {
706            return Ok(None);
707        };
708
709        let query = if lower.contains("search tool") && lower.contains("error") {
710            "check search tool status".to_string()
711        } else {
712            text.to_string()
713        };
714
715        let result = self
716            .agent_service
717            .tool_registry
718            .execute_tool(
719                "search_internet",
720                serde_json::json!({"query": query, "user_id": user_id}),
721            )
722            .await?;
723        let effective_result = result
724            .get("capability_result")
725            .and_then(|value| value.get("result"))
726            .cloned()
727            .unwrap_or_else(|| result.clone());
728
729        let status = effective_result
730            .get("status")
731            .and_then(|v| v.as_str())
732            .unwrap_or("");
733        if status == "success" {
734            let content = effective_result
735                .get("result")
736                .and_then(|v| v.as_str())
737                .unwrap_or("")
738                .to_string();
739            if content.is_empty() {
740                return Ok(Some(
741                    "Search completed, but no results were returned.".to_string(),
742                ));
743            }
744            return Ok(Some(content));
745        }
746
747        let message = effective_result
748            .get("message")
749            .and_then(|v| v.as_str())
750            .or_else(|| effective_result.get("error").and_then(|v| v.as_str()))
751            .or_else(|| effective_result.get("code").and_then(|v| v.as_str()))
752            .or_else(|| result.get("message").and_then(|v| v.as_str()))
753            .or_else(|| result.get("error").and_then(|v| v.as_str()))
754            .or_else(|| result.get("code").and_then(|v| v.as_str()))
755            .unwrap_or("Search tool error");
756        let details = effective_result
757            .get("details")
758            .and_then(|v| v.as_str())
759            .or_else(|| effective_result.get("reason").and_then(|v| v.as_str()))
760            .or_else(|| result.get("details").and_then(|v| v.as_str()))
761            .or_else(|| result.get("reason").and_then(|v| v.as_str()))
762            .unwrap_or("");
763        let response = if !details.is_empty() {
764            format!("Search tool error: {} ({})", message, details)
765        } else if message != "Search tool error" {
766            format!("Search tool error: {}", message)
767        } else {
768            format!("Search tool error: {}", effective_result)
769        };
770        Ok(Some(response))
771    }
772
773    async fn try_handle_tasks_command(&self, user_id: &str, text: &str) -> Result<Option<String>> {
774        let lower = text.to_lowercase();
775        let trimmed = lower.trim();
776
777        let is_bulk_clear_verb = lower.contains("clear")
778            || lower.contains("delete")
779            || lower.contains("remove")
780            || lower.contains("wipe")
781            || lower.contains("clean");
782        let references_tasks_collection = lower.contains("tasks")
783            || lower.contains("task list")
784            || lower.contains("my task")
785            || lower.contains("all task");
786        if is_bulk_clear_verb && references_tasks_collection {
787            let tool = self.agent_service.tool_registry.get_tool("tasks").await;
788            let Some(_tool) = tool else {
789                return Ok(Some(
790                    "I can’t clear tasks right now because the tasks tool is not available."
791                        .to_string(),
792                ));
793            };
794
795            let clear_status = if lower.contains("disabled") {
796                "disabled"
797            } else if lower.contains("enabled") {
798                "enabled"
799            } else {
800                "all"
801            };
802
803            if let Ok(result) = self
804                .agent_service
805                .tool_registry
806                .execute_tool(
807                    "tasks",
808                    serde_json::json!({
809                        "action": "clear",
810                        "user_id": user_id,
811                        "status": clear_status
812                    }),
813                )
814                .await
815            {
816                let payload = Self::tool_payload(&result);
817                let is_error =
818                    payload.get("status").and_then(|value| value.as_str()) == Some("error");
819                let deleted = payload
820                    .get("deleted")
821                    .and_then(|value| value.as_u64())
822                    .or_else(|| result.get("deleted").and_then(|value| value.as_u64()));
823                if !is_error {
824                    if let Some(deleted) = deleted {
825                        return Ok(Some(format!("Cleared {} task(s).", deleted)));
826                    }
827                }
828            }
829
830            let list_result = match self
831                .agent_service
832                .tool_registry
833                .execute_tool(
834                    "tasks",
835                    serde_json::json!({
836                        "action": "list",
837                        "user_id": user_id,
838                        "status": clear_status,
839                        "limit": 200
840                    }),
841                )
842                .await
843            {
844                Ok(value) => value,
845                Err(err) => {
846                    return Ok(Some(format!("I couldn’t clear tasks right now: {}", err)));
847                }
848            };
849
850            let payload = Self::tool_payload(&list_result);
851            let task_ids: Vec<i64> = payload
852                .get("tasks")
853                .and_then(|value| value.as_array())
854                .map(|tasks| {
855                    tasks
856                        .iter()
857                        .filter_map(|task| task.get("id").and_then(|value| value.as_i64()))
858                        .collect()
859                })
860                .unwrap_or_default();
861
862            let mut deleted_count: u64 = 0;
863            for id in task_ids {
864                if let Ok(delete_result) = self
865                    .agent_service
866                    .tool_registry
867                    .execute_tool(
868                        "tasks",
869                        serde_json::json!({
870                            "action": "delete",
871                            "user_id": user_id,
872                            "id": id
873                        }),
874                    )
875                    .await
876                {
877                    let delete_payload = Self::tool_payload(&delete_result);
878                    if delete_payload
879                        .get("deleted")
880                        .and_then(|value| value.as_bool())
881                        .or_else(|| {
882                            delete_result
883                                .get("deleted")
884                                .and_then(|value| value.as_bool())
885                        })
886                        == Some(true)
887                    {
888                        deleted_count += 1;
889                    }
890                }
891            }
892
893            return Ok(Some(format!("Cleared {} task(s).", deleted_count)));
894        }
895
896        let looks_like_task_list_request = trimmed == "tasks"
897            || trimmed == "task"
898            || lower.contains("what are the tasks")
899            || lower.contains("what's on my tasks")
900            || lower.contains("whats on my tasks")
901            || lower.contains("my tasks")
902            || lower.contains("list tasks")
903            || lower.contains("show tasks")
904            || lower.contains("open tasks");
905
906        if !looks_like_task_list_request {
907            return Ok(None);
908        }
909
910        let tool = self.agent_service.tool_registry.get_tool("tasks").await;
911        let Some(_tool) = tool else {
912            return Ok(Some(
913                "I can’t list tasks right now because the tasks tool is not available.".to_string(),
914            ));
915        };
916
917        let result = match self
918            .agent_service
919            .tool_registry
920            .execute_tool(
921                "tasks",
922                serde_json::json!({
923                    "action": "list",
924                    "user_id": user_id,
925                    "status": "all",
926                    "limit": 50
927                }),
928            )
929            .await
930        {
931            Ok(value) => value,
932            Err(err) => {
933                return Ok(Some(format!("I couldn’t list tasks right now: {}", err)));
934            }
935        };
936
937        let payload = Self::tool_payload(&result);
938
939        let tasks = payload
940            .get("tasks")
941            .and_then(|value| value.as_array())
942            .cloned()
943            .unwrap_or_default();
944
945        if tasks.is_empty() {
946            return Ok(Some("You have no tasks scheduled right now.".to_string()));
947        }
948
949        let mut lines = vec!["Here are your scheduled tasks:".to_string()];
950        for task in tasks {
951            let name = task
952                .get("name")
953                .and_then(|value| value.as_str())
954                .unwrap_or("(unnamed task)");
955            let enabled = task
956                .get("enabled")
957                .and_then(|value| value.as_bool())
958                .unwrap_or(true);
959            let next_run_at = task
960                .get("next_run_at")
961                .and_then(|value| value.as_i64())
962                .map(|value| value.to_string())
963                .unwrap_or_else(|| "unknown".to_string());
964            let interval = task
965                .get("interval_minutes")
966                .and_then(|value| value.as_i64())
967                .map(|value| format!(", every {} min", value))
968                .unwrap_or_default();
969            let state = if enabled { "enabled" } else { "disabled" };
970            lines.push(format!(
971                "- {} ({}, next: {}{})",
972                name, state, next_run_at, interval
973            ));
974        }
975
976        Ok(Some(lines.join("\n")))
977    }
978
979    async fn try_handle_reminders_command(
980        &self,
981        user_id: &str,
982        text: &str,
983    ) -> Result<Option<String>> {
984        let lower = text.to_lowercase();
985        let trimmed = lower.trim();
986
987        let is_bulk_clear_verb = lower.contains("clear")
988            || lower.contains("delete")
989            || lower.contains("remove")
990            || lower.contains("wipe")
991            || lower.contains("clean");
992        let references_reminders_collection = lower.contains("reminders")
993            || lower.contains("reminder list")
994            || lower.contains("my reminder")
995            || lower.contains("all reminder");
996        if is_bulk_clear_verb && references_reminders_collection {
997            let tool = self.agent_service.tool_registry.get_tool("reminders").await;
998            let Some(_tool) = tool else {
999                return Ok(Some(
1000                    "I can’t clear reminders right now because the reminders tool is not available."
1001                        .to_string(),
1002                ));
1003            };
1004
1005            let clear_status = if lower.contains("open") {
1006                "open"
1007            } else {
1008                "all"
1009            };
1010
1011            if let Ok(result) = self
1012                .agent_service
1013                .tool_registry
1014                .execute_tool(
1015                    "reminders",
1016                    serde_json::json!({
1017                        "action": "clear",
1018                        "user_id": user_id,
1019                        "status": clear_status
1020                    }),
1021                )
1022                .await
1023            {
1024                let payload = Self::tool_payload(&result);
1025                let is_error =
1026                    payload.get("status").and_then(|value| value.as_str()) == Some("error");
1027                let deleted = payload
1028                    .get("deleted")
1029                    .and_then(|value| value.as_u64())
1030                    .or_else(|| result.get("deleted").and_then(|value| value.as_u64()));
1031                if !is_error {
1032                    if let Some(deleted) = deleted {
1033                        return Ok(Some(format!("Cleared {} reminder(s).", deleted)));
1034                    }
1035                }
1036            }
1037
1038            let list_result = match self
1039                .agent_service
1040                .tool_registry
1041                .execute_tool(
1042                    "reminders",
1043                    serde_json::json!({
1044                        "action": "list",
1045                        "user_id": user_id,
1046                        "status": clear_status,
1047                        "limit": 200
1048                    }),
1049                )
1050                .await
1051            {
1052                Ok(value) => value,
1053                Err(err) => {
1054                    return Ok(Some(format!(
1055                        "I couldn’t clear reminders right now: {}",
1056                        err
1057                    )));
1058                }
1059            };
1060
1061            let payload = Self::tool_payload(&list_result);
1062            let reminder_ids: Vec<i64> = payload
1063                .get("reminders")
1064                .and_then(|value| value.as_array())
1065                .map(|items| {
1066                    items
1067                        .iter()
1068                        .filter_map(|item| item.get("id").and_then(|value| value.as_i64()))
1069                        .collect()
1070                })
1071                .unwrap_or_default();
1072
1073            let mut deleted_count: u64 = 0;
1074            for id in reminder_ids {
1075                if let Ok(delete_result) = self
1076                    .agent_service
1077                    .tool_registry
1078                    .execute_tool(
1079                        "reminders",
1080                        serde_json::json!({
1081                            "action": "delete",
1082                            "user_id": user_id,
1083                            "id": id
1084                        }),
1085                    )
1086                    .await
1087                {
1088                    let delete_payload = Self::tool_payload(&delete_result);
1089                    if delete_payload
1090                        .get("deleted")
1091                        .and_then(|value| value.as_bool())
1092                        .or_else(|| {
1093                            delete_result
1094                                .get("deleted")
1095                                .and_then(|value| value.as_bool())
1096                        })
1097                        == Some(true)
1098                    {
1099                        deleted_count += 1;
1100                    }
1101                }
1102            }
1103
1104            return Ok(Some(format!("Cleared {} reminder(s).", deleted_count)));
1105        }
1106
1107        let looks_like_reminder_list_request = trimmed == "reminders"
1108            || trimmed == "reminder"
1109            || lower.contains("what reminders are due")
1110            || lower.contains("which reminders are due")
1111            || lower.contains("due reminders")
1112            || lower.contains("what are my reminders")
1113            || lower.contains("my reminders")
1114            || lower.contains("list reminders")
1115            || lower.contains("show reminders")
1116            || lower.contains("open reminders");
1117
1118        if !looks_like_reminder_list_request {
1119            return Ok(None);
1120        }
1121
1122        let tool = self.agent_service.tool_registry.get_tool("reminders").await;
1123        let Some(_tool) = tool else {
1124            return Ok(Some(
1125                "I can’t list reminders right now because the reminders tool is not available."
1126                    .to_string(),
1127            ));
1128        };
1129
1130        let result = match self
1131            .agent_service
1132            .tool_registry
1133            .execute_tool(
1134                "reminders",
1135                serde_json::json!({
1136                    "action": "list",
1137                    "user_id": user_id,
1138                    "status": "open",
1139                    "limit": 50
1140                }),
1141            )
1142            .await
1143        {
1144            Ok(value) => value,
1145            Err(err) => {
1146                return Ok(Some(format!(
1147                    "I couldn’t list reminders right now: {}",
1148                    err
1149                )));
1150            }
1151        };
1152
1153        let payload = Self::tool_payload(&result);
1154        let reminders = payload
1155            .get("reminders")
1156            .and_then(|value| value.as_array())
1157            .cloned()
1158            .unwrap_or_default();
1159
1160        if reminders.is_empty() {
1161            return Ok(Some("No reminders are due at this time.".to_string()));
1162        }
1163
1164        let mut lines = vec!["Here are your open reminders:".to_string()];
1165        for reminder in reminders {
1166            let title = reminder
1167                .get("title")
1168                .and_then(|value| value.as_str())
1169                .unwrap_or("(untitled reminder)");
1170            let due_at = reminder
1171                .get("due_at")
1172                .and_then(|value| value.as_i64())
1173                .map(|value| value.to_string())
1174                .unwrap_or_else(|| "unknown".to_string());
1175            lines.push(format!("- {} (due: {})", title, due_at));
1176        }
1177
1178        Ok(Some(lines.join("\n")))
1179    }
1180
1181    async fn try_handle_todo_command(&self, user_id: &str, text: &str) -> Result<Option<String>> {
1182        let lower = text.to_lowercase();
1183        let trimmed = lower.trim();
1184
1185        let is_bulk_clear_verb = lower.contains("clear")
1186            || lower.contains("delete")
1187            || lower.contains("remove")
1188            || lower.contains("wipe")
1189            || lower.contains("clean");
1190        let references_todo_collection = lower.contains("todos")
1191            || lower.contains("todo list")
1192            || lower.contains("my todo")
1193            || lower.contains("all todo");
1194        let looks_like_bulk_todo_clear_request = is_bulk_clear_verb && references_todo_collection;
1195
1196        if looks_like_bulk_todo_clear_request {
1197            let tool = self.agent_service.tool_registry.get_tool("todo").await;
1198            let Some(_tool) = tool else {
1199                return Ok(Some(
1200                    "I can’t clear todos right now because the todo tool is not available."
1201                        .to_string(),
1202                ));
1203            };
1204
1205            let clear_status = if lower.contains("completed") {
1206                "completed"
1207            } else {
1208                "all"
1209            };
1210
1211            let result = match self
1212                .agent_service
1213                .tool_registry
1214                .execute_tool(
1215                    "todo",
1216                    serde_json::json!({
1217                        "action": "clear",
1218                        "user_id": user_id,
1219                        "status": clear_status
1220                    }),
1221                )
1222                .await
1223            {
1224                Ok(value) => value,
1225                Err(err) => {
1226                    return Ok(Some(format!("I couldn’t clear todos right now: {}", err)));
1227                }
1228            };
1229
1230            let payload = Self::tool_payload(&result);
1231            let error_message = payload
1232                .get("message")
1233                .and_then(|value| value.as_str())
1234                .or_else(|| result.get("message").and_then(|value| value.as_str()));
1235            if payload.get("status").and_then(|value| value.as_str()) == Some("error") {
1236                if let Some(message) = error_message {
1237                    return Ok(Some(format!("I couldn’t clear todos right now: {message}")));
1238                }
1239            }
1240
1241            let deleted = payload
1242                .get("deleted")
1243                .and_then(|value| value.as_u64())
1244                .or_else(|| result.get("deleted").and_then(|value| value.as_u64()))
1245                .unwrap_or(0);
1246            return Ok(Some(format!("Cleared {} todo(s).", deleted)));
1247        }
1248
1249        let looks_like_todo_list_request = trimmed == "todos"
1250            || trimmed == "todo"
1251            || lower.contains("what are the todos")
1252            || lower.contains("what are my todos")
1253            || lower.contains("my todos")
1254            || lower.contains("list todos")
1255            || lower.contains("show todos")
1256            || lower.contains("open todos");
1257
1258        if !looks_like_todo_list_request {
1259            return Ok(None);
1260        }
1261
1262        let tool = self.agent_service.tool_registry.get_tool("todo").await;
1263        let Some(_tool) = tool else {
1264            return Ok(Some(
1265                "I can’t list todos right now because the todo tool is not available.".to_string(),
1266            ));
1267        };
1268
1269        let result = match self
1270            .agent_service
1271            .tool_registry
1272            .execute_tool(
1273                "todo",
1274                serde_json::json!({
1275                    "action": "list",
1276                    "user_id": user_id,
1277                    "status": "open",
1278                    "limit": 50
1279                }),
1280            )
1281            .await
1282        {
1283            Ok(value) => value,
1284            Err(err) => {
1285                return Ok(Some(format!("I couldn’t list todos right now: {}", err)));
1286            }
1287        };
1288
1289        let payload = Self::tool_payload(&result);
1290        let items = payload
1291            .get("items")
1292            .and_then(|value| value.as_array())
1293            .cloned()
1294            .unwrap_or_default();
1295
1296        if items.is_empty() {
1297            return Ok(Some("You have no open todos right now.".to_string()));
1298        }
1299
1300        let mut lines = vec!["Here are your open todos:".to_string()];
1301        for item in items {
1302            let title = item
1303                .get("title")
1304                .and_then(|value| value.as_str())
1305                .unwrap_or("(untitled todo)");
1306            let notes = item
1307                .get("notes")
1308                .and_then(|value| value.as_str())
1309                .map(|value| format!(": {}", value))
1310                .unwrap_or_default();
1311            lines.push(format!("- {}{}", title, notes));
1312        }
1313
1314        Ok(Some(lines.join("\n")))
1315    }
1316
1317    async fn try_handle_plans_command(&self, user_id: &str, text: &str) -> Result<Option<String>> {
1318        let lower = text.to_lowercase();
1319        let trimmed = lower.trim();
1320
1321        let is_bulk_clear_verb = lower.contains("clear")
1322            || lower.contains("delete")
1323            || lower.contains("remove")
1324            || lower.contains("wipe")
1325            || lower.contains("clean");
1326        let references_plans_collection = lower.contains("plans")
1327            || lower.contains("plan list")
1328            || lower.contains("my plan")
1329            || lower.contains("all plan");
1330        if is_bulk_clear_verb && references_plans_collection {
1331            let tool = self.agent_service.tool_registry.get_tool("planning").await;
1332            let Some(_tool) = tool else {
1333                return Ok(Some(
1334                    "I can’t clear plans right now because the planning tool is not available."
1335                        .to_string(),
1336                ));
1337            };
1338
1339            if let Ok(result) = self
1340                .agent_service
1341                .tool_registry
1342                .execute_tool(
1343                    "planning",
1344                    serde_json::json!({
1345                        "action": "clear",
1346                        "user_id": user_id
1347                    }),
1348                )
1349                .await
1350            {
1351                let payload = Self::tool_payload(&result);
1352                let is_error =
1353                    payload.get("status").and_then(|value| value.as_str()) == Some("error");
1354                let deleted = payload
1355                    .get("deleted")
1356                    .and_then(|value| value.as_u64())
1357                    .or_else(|| result.get("deleted").and_then(|value| value.as_u64()));
1358                if !is_error {
1359                    if let Some(deleted) = deleted {
1360                        return Ok(Some(format!("Cleared {} plan(s).", deleted)));
1361                    }
1362                }
1363            }
1364
1365            let list_result = match self
1366                .agent_service
1367                .tool_registry
1368                .execute_tool(
1369                    "planning",
1370                    serde_json::json!({
1371                        "action": "list",
1372                        "user_id": user_id,
1373                        "limit": 200
1374                    }),
1375                )
1376                .await
1377            {
1378                Ok(value) => value,
1379                Err(err) => {
1380                    return Ok(Some(format!("I couldn’t clear plans right now: {}", err)));
1381                }
1382            };
1383
1384            let payload = Self::tool_payload(&list_result);
1385            let plan_ids: Vec<i64> = payload
1386                .get("plans")
1387                .and_then(|value| value.as_array())
1388                .map(|items| {
1389                    items
1390                        .iter()
1391                        .filter_map(|item| item.get("id").and_then(|value| value.as_i64()))
1392                        .collect()
1393                })
1394                .unwrap_or_default();
1395
1396            let mut deleted_count: u64 = 0;
1397            for id in plan_ids {
1398                if let Ok(delete_result) = self
1399                    .agent_service
1400                    .tool_registry
1401                    .execute_tool(
1402                        "planning",
1403                        serde_json::json!({
1404                            "action": "delete",
1405                            "user_id": user_id,
1406                            "id": id
1407                        }),
1408                    )
1409                    .await
1410                {
1411                    let delete_payload = Self::tool_payload(&delete_result);
1412                    if delete_payload
1413                        .get("deleted")
1414                        .and_then(|value| value.as_bool())
1415                        .or_else(|| {
1416                            delete_result
1417                                .get("deleted")
1418                                .and_then(|value| value.as_bool())
1419                        })
1420                        == Some(true)
1421                    {
1422                        deleted_count += 1;
1423                    }
1424                }
1425            }
1426
1427            return Ok(Some(format!("Cleared {} plan(s).", deleted_count)));
1428        }
1429
1430        let looks_like_plans_list_request = trimmed == "plans"
1431            || lower.contains("what are the plans")
1432            || lower.contains("what are my plans")
1433            || lower.contains("show plans")
1434            || lower.contains("list plans")
1435            || lower.contains("my plans")
1436            || lower.contains("saved plans")
1437            || lower.contains("current plans");
1438
1439        if !looks_like_plans_list_request {
1440            return Ok(None);
1441        }
1442
1443        let tool = self.agent_service.tool_registry.get_tool("planning").await;
1444        let Some(_tool) = tool else {
1445            return Ok(Some(
1446                "I can’t list plans right now because the planning tool is not available."
1447                    .to_string(),
1448            ));
1449        };
1450
1451        let result = match self
1452            .agent_service
1453            .tool_registry
1454            .execute_tool(
1455                "planning",
1456                serde_json::json!({
1457                    "action": "list",
1458                    "user_id": user_id,
1459                    "limit": 20
1460                }),
1461            )
1462            .await
1463        {
1464            Ok(value) => value,
1465            Err(err) => {
1466                return Ok(Some(format!("I couldn’t list plans right now: {}", err)));
1467            }
1468        };
1469
1470        let payload = Self::tool_payload(&result);
1471        let plans = payload
1472            .get("plans")
1473            .and_then(|value| value.as_array())
1474            .cloned()
1475            .unwrap_or_default();
1476
1477        if plans.is_empty() {
1478            return Ok(Some("You have no saved plans right now.".to_string()));
1479        }
1480
1481        let mut lines = vec!["Here are your saved plans:".to_string()];
1482        for plan in plans {
1483            let title = plan
1484                .get("title")
1485                .and_then(|value| value.as_str())
1486                .unwrap_or("(untitled plan)");
1487            let status = plan
1488                .get("status")
1489                .and_then(|value| value.as_str())
1490                .unwrap_or("unknown");
1491            lines.push(format!("- {} ({})", title, status));
1492        }
1493
1494        Ok(Some(lines.join("\n")))
1495    }
1496
1497    fn tool_payload(result: &serde_json::Value) -> &serde_json::Value {
1498        result
1499            .get("capability_result")
1500            .and_then(|value| value.get("result"))
1501            .unwrap_or(result)
1502    }
1503}