1use std::sync::Arc;
2
3use async_stream::try_stream;
4use futures::stream::BoxStream;
5use futures::StreamExt;
6use std::hash::Hasher;
7use std::time::{Instant, SystemTime, UNIX_EPOCH};
8
9use md5::{Digest, Md5};
10
11use crate::error::Result;
12use crate::interfaces::providers::{ImageInput, MemoryProvider};
13use crate::reminders::ReminderStore;
14use crate::services::agent::AgentService;
15use crate::vault;
16use tracing::info;
17
18#[derive(Debug, Clone)]
19pub enum UserInput {
20 Text(String),
21 Audio {
22 bytes: Vec<u8>,
23 input_format: String,
24 },
25}
26
27#[derive(Debug, Clone)]
28pub enum OutputFormat {
29 Text,
30 Audio { voice: String, format: String },
31}
32
33#[derive(Clone)]
34pub struct ProcessOptions {
35 pub prompt: Option<String>,
36 pub images: Vec<ImageInput>,
37 pub output_format: OutputFormat,
38 pub image_detail: String,
39 pub json_schema: Option<serde_json::Value>,
40}
41
42#[derive(Debug, Clone)]
43pub enum ProcessResult {
44 Text(String),
45 Audio(Vec<u8>),
46 Structured(serde_json::Value),
47}
48
49pub struct QueryService {
50 agent_service: Arc<AgentService>,
51 memory_provider: Option<Arc<dyn MemoryProvider>>,
52 reminder_store: Option<Arc<ReminderStore>>,
53 context_cache: tokio::sync::RwLock<Option<u64>>,
54}
55
56impl QueryService {
57 pub fn new(
58 agent_service: Arc<AgentService>,
59 memory_provider: Option<Arc<dyn MemoryProvider>>,
60 reminder_store: Option<Arc<ReminderStore>>,
61 ) -> Self {
62 Self {
63 agent_service,
64 memory_provider,
65 reminder_store,
66 context_cache: tokio::sync::RwLock::new(None),
67 }
68 }
69
70 async fn ensure_context_in_memory(&self, user_id: &str) -> Result<()> {
71 let started = Instant::now();
72 let Some(provider) = &self.memory_provider else {
73 return Ok(());
74 };
75 let _ = self.agent_service.refresh_context_for_user(user_id).await?;
76 info!(
77 "ensure_context_in_memory: refresh_context took {:?}",
78 started.elapsed()
79 );
80 let Some(context_markdown) = self.agent_service.get_context_markdown().await else {
81 return Ok(());
82 };
83 if context_markdown.trim().is_empty() {
84 return Ok(());
85 }
86
87 let mut md5_hasher = Md5::new();
88 md5_hasher.update(context_markdown.as_bytes());
89 let md5_hash = format!("{:x}", md5_hasher.finalize());
90 if let Some(stored) = vault::get_secret("context_md5")? {
91 if stored == md5_hash {
92 let mut guard = self.context_cache.write().await;
93 if guard.is_none() {
94 *guard = Some(0);
95 }
96 info!(
97 "ensure_context_in_memory: md5 unchanged, skipping import (elapsed {:?})",
98 started.elapsed()
99 );
100 return Ok(());
101 }
102 }
103
104 let mut hasher = std::collections::hash_map::DefaultHasher::new();
105 std::hash::Hash::hash(&context_markdown, &mut hasher);
106 let hash = hasher.finish();
107
108 let mut guard = self.context_cache.write().await;
109 if guard.is_none_or(|prev| prev != hash) {
110 let content = format!("CONTEXT_DOC:\n{}", context_markdown);
111 provider
112 .append_message(user_id, "context", &content)
113 .await?;
114 *guard = Some(hash);
115 vault::set_secret("context_md5", &md5_hash)?;
116 info!(
117 "ensure_context_in_memory: imported context into memory (elapsed {:?})",
118 started.elapsed()
119 );
120 }
121 Ok(())
122 }
123
124 pub async fn process_text(
125 &self,
126 user_id: &str,
127 query: &str,
128 prompt: Option<&str>,
129 ) -> Result<String> {
130 let processed_query = query.to_string();
131 let autonomy_tick = is_autonomy_tick(&processed_query);
132
133 self.ensure_context_in_memory(user_id).await?;
134
135 if let Some(response) = self
136 .try_handle_search_command(user_id, &processed_query)
137 .await?
138 {
139 if let Some(provider) = &self.memory_provider {
140 provider
141 .append_message(user_id, "user", &processed_query)
142 .await?;
143 provider
144 .append_message(user_id, "assistant", &response)
145 .await?;
146 }
147 return Ok(response);
148 }
149
150 if let Some(response) = self
151 .try_handle_tasks_command(user_id, &processed_query)
152 .await?
153 {
154 if let Some(provider) = &self.memory_provider {
155 provider
156 .append_message(user_id, "user", &processed_query)
157 .await?;
158 provider
159 .append_message(user_id, "assistant", &response)
160 .await?;
161 }
162 return Ok(response);
163 }
164
165 if let Some(response) = self
166 .try_handle_reminders_command(user_id, &processed_query)
167 .await?
168 {
169 if let Some(provider) = &self.memory_provider {
170 provider
171 .append_message(user_id, "user", &processed_query)
172 .await?;
173 provider
174 .append_message(user_id, "assistant", &response)
175 .await?;
176 }
177 return Ok(response);
178 }
179
180 if let Some(response) = self
181 .try_handle_todo_command(user_id, &processed_query)
182 .await?
183 {
184 if let Some(provider) = &self.memory_provider {
185 provider
186 .append_message(user_id, "user", &processed_query)
187 .await?;
188 provider
189 .append_message(user_id, "assistant", &response)
190 .await?;
191 }
192 return Ok(response);
193 }
194
195 if let Some(response) = self
196 .try_handle_plans_command(user_id, &processed_query)
197 .await?
198 {
199 if let Some(provider) = &self.memory_provider {
200 provider
201 .append_message(user_id, "user", &processed_query)
202 .await?;
203 provider
204 .append_message(user_id, "assistant", &response)
205 .await?;
206 }
207 return Ok(response);
208 }
209
210 let reminder_context = if let Some(store) = &self.reminder_store {
211 build_reminder_context(store, user_id).await
212 } else {
213 None
214 };
215 let mut memory_context = if let Some(provider) = &self.memory_provider {
216 let include_semantic = should_include_semantic_memory(&processed_query);
217 let history_future = provider.get_history(user_id, 12);
218 let semantic_future = async {
219 if include_semantic {
220 provider.search(user_id, &processed_query, 5).await
221 } else {
222 Ok(Vec::new())
223 }
224 };
225 let (history, semantic) = tokio::try_join!(history_future, semantic_future)?;
226 let history = history.join("\n");
227 build_memory_context(history, semantic, reminder_context)
228 } else {
229 reminder_context.unwrap_or_default()
230 };
231
232 if let Some(context_markdown) = self.context_for_autonomy(user_id, &processed_query).await {
233 if !memory_context.is_empty() {
234 memory_context.push_str("\n\n");
235 }
236 memory_context.push_str(&context_markdown);
237 }
238
239 let response = self
240 .agent_service
241 .generate_response(user_id, &processed_query, &memory_context, prompt)
242 .await?;
243
244 if let Some(provider) = &self.memory_provider {
245 if autonomy_tick {
246 return Ok(response);
247 }
248 provider
249 .append_message(user_id, "user", &processed_query)
250 .await?;
251 provider
252 .append_message(user_id, "assistant", &response)
253 .await?;
254 }
255
256 Ok(response)
257 }
258
259 pub async fn process(
260 &self,
261 user_id: &str,
262 input: UserInput,
263 options: ProcessOptions,
264 ) -> Result<ProcessResult> {
265 let text = match input {
266 UserInput::Text(value) => value,
267 UserInput::Audio {
268 bytes,
269 input_format,
270 } => {
271 self.agent_service
272 .transcribe_audio(bytes, &input_format)
273 .await?
274 }
275 };
276 let autonomy_tick = is_autonomy_tick(&text);
277
278 self.ensure_context_in_memory(user_id).await?;
279
280 if let Some(response) = self.try_handle_search_command(user_id, &text).await? {
281 if let Some(provider) = &self.memory_provider {
282 provider.append_message(user_id, "user", &text).await?;
283 provider
284 .append_message(user_id, "assistant", &response)
285 .await?;
286 }
287 return Ok(ProcessResult::Text(response));
288 }
289
290 if let Some(response) = self.try_handle_tasks_command(user_id, &text).await? {
291 if let Some(provider) = &self.memory_provider {
292 provider.append_message(user_id, "user", &text).await?;
293 provider
294 .append_message(user_id, "assistant", &response)
295 .await?;
296 }
297 return Ok(ProcessResult::Text(response));
298 }
299
300 if let Some(response) = self.try_handle_reminders_command(user_id, &text).await? {
301 if let Some(provider) = &self.memory_provider {
302 provider.append_message(user_id, "user", &text).await?;
303 provider
304 .append_message(user_id, "assistant", &response)
305 .await?;
306 }
307 return Ok(ProcessResult::Text(response));
308 }
309
310 if let Some(response) = self.try_handle_todo_command(user_id, &text).await? {
311 if let Some(provider) = &self.memory_provider {
312 provider.append_message(user_id, "user", &text).await?;
313 provider
314 .append_message(user_id, "assistant", &response)
315 .await?;
316 }
317 return Ok(ProcessResult::Text(response));
318 }
319
320 if let Some(response) = self.try_handle_plans_command(user_id, &text).await? {
321 if let Some(provider) = &self.memory_provider {
322 provider.append_message(user_id, "user", &text).await?;
323 provider
324 .append_message(user_id, "assistant", &response)
325 .await?;
326 }
327 return Ok(ProcessResult::Text(response));
328 }
329
330 let reminder_context = if let Some(store) = &self.reminder_store {
331 build_reminder_context(store, user_id).await
332 } else {
333 None
334 };
335 let mut memory_context = if let Some(provider) = &self.memory_provider {
336 let include_semantic = should_include_semantic_memory(&text);
337 let history_future = provider.get_history(user_id, 12);
338 let semantic_future = async {
339 if include_semantic {
340 provider.search(user_id, &text, 5).await
341 } else {
342 Ok(Vec::new())
343 }
344 };
345 let (history, semantic) = tokio::try_join!(history_future, semantic_future)?;
346 let history = history.join("\n");
347 build_memory_context(history, semantic, reminder_context)
348 } else {
349 reminder_context.unwrap_or_default()
350 };
351
352 if let Some(context_markdown) = self.context_for_autonomy(user_id, &text).await {
353 if !memory_context.is_empty() {
354 memory_context.push_str("\n\n");
355 }
356 memory_context.push_str(&context_markdown);
357 }
358
359 let result = if let Some(schema) = options.json_schema {
360 let structured = self
361 .agent_service
362 .generate_structured_response(
363 user_id,
364 &text,
365 &memory_context,
366 options.prompt.as_deref(),
367 schema,
368 )
369 .await?;
370 ProcessResult::Structured(structured)
371 } else if !options.images.is_empty() {
372 let response = self
373 .agent_service
374 .generate_response_with_images(
375 user_id,
376 &text,
377 options.images,
378 &memory_context,
379 options.prompt.as_deref(),
380 &options.image_detail,
381 )
382 .await?;
383 ProcessResult::Text(response)
384 } else {
385 let response = self
386 .agent_service
387 .generate_response(user_id, &text, &memory_context, options.prompt.as_deref())
388 .await?;
389 ProcessResult::Text(response)
390 };
391
392 let output = match (result, options.output_format) {
393 (ProcessResult::Text(text), OutputFormat::Audio { voice, format }) => {
394 let bytes = self
395 .agent_service
396 .synthesize_audio(&text, &voice, &format)
397 .await?;
398 ProcessResult::Audio(bytes)
399 }
400 (other, _) => other,
401 };
402
403 if let Some(provider) = &self.memory_provider {
404 if autonomy_tick {
405 return Ok(output);
406 }
407 provider.append_message(user_id, "user", &text).await?;
408 if let ProcessResult::Text(ref message) = output {
409 provider
410 .append_message(user_id, "assistant", message)
411 .await?;
412 }
413 }
414
415 Ok(output)
416 }
417
418 pub fn process_text_stream<'a>(
419 &'a self,
420 user_id: &'a str,
421 query: &'a str,
422 prompt: Option<&'a str>,
423 ) -> BoxStream<'a, Result<String>> {
424 Box::pin(try_stream! {
425 let processed_query = query.to_string();
426 let autonomy_tick = is_autonomy_tick(&processed_query);
427
428 self.ensure_context_in_memory(user_id).await?;
429
430 if let Some(response) = self.try_handle_search_command(user_id, &processed_query).await? {
431 if let Some(provider) = &self.memory_provider {
432 provider.append_message(user_id, "user", &processed_query).await?;
433 provider.append_message(user_id, "assistant", &response).await?;
434 }
435 yield response;
436 return;
437 }
438
439 if let Some(response) = self.try_handle_tasks_command(user_id, &processed_query).await? {
440 if let Some(provider) = &self.memory_provider {
441 provider.append_message(user_id, "user", &processed_query).await?;
442 provider.append_message(user_id, "assistant", &response).await?;
443 }
444 yield response;
445 return;
446 }
447
448 if let Some(response) = self.try_handle_reminders_command(user_id, &processed_query).await? {
449 if let Some(provider) = &self.memory_provider {
450 provider.append_message(user_id, "user", &processed_query).await?;
451 provider.append_message(user_id, "assistant", &response).await?;
452 }
453 yield response;
454 return;
455 }
456
457 if let Some(response) = self.try_handle_todo_command(user_id, &processed_query).await? {
458 if let Some(provider) = &self.memory_provider {
459 provider.append_message(user_id, "user", &processed_query).await?;
460 provider.append_message(user_id, "assistant", &response).await?;
461 }
462 yield response;
463 return;
464 }
465
466 if let Some(response) = self.try_handle_plans_command(user_id, &processed_query).await? {
467 if let Some(provider) = &self.memory_provider {
468 provider.append_message(user_id, "user", &processed_query).await?;
469 provider.append_message(user_id, "assistant", &response).await?;
470 }
471 yield response;
472 return;
473 }
474
475 let reminder_context = if let Some(store) = &self.reminder_store {
476 build_reminder_context(store, user_id).await
477 } else {
478 None
479 };
480 let mut memory_context = if let Some(provider) = &self.memory_provider {
481 let include_semantic = should_include_semantic_memory(&processed_query);
482 let history_future = provider.get_history(user_id, 12);
483 let semantic_future = async {
484 if include_semantic {
485 provider.search(user_id, &processed_query, 5).await
486 } else {
487 Ok(Vec::new())
488 }
489 };
490 let (history, semantic) = tokio::try_join!(history_future, semantic_future)?;
491 let history = history.join("\n");
492 build_memory_context(history, semantic, reminder_context)
493 } else {
494 reminder_context.unwrap_or_default()
495 };
496
497 if let Some(context_markdown) = self.context_for_autonomy(user_id, &processed_query).await {
498 if !memory_context.is_empty() {
499 memory_context.push_str("\n\n");
500 }
501 memory_context.push_str(&context_markdown);
502 }
503
504 let mut response_text = String::new();
505 let mut stream = self.agent_service.generate_response_stream(
506 user_id,
507 &processed_query,
508 &memory_context,
509 prompt,
510 );
511
512 while let Some(chunk) = stream.next().await {
513 let chunk = chunk?;
514 response_text.push_str(&chunk);
515 yield chunk;
516 }
517
518 if let Some(provider) = &self.memory_provider {
519 if autonomy_tick {
520 return;
521 }
522 provider.append_message(user_id, "user", &processed_query).await?;
523 if !response_text.is_empty() {
524 provider.append_message(user_id, "assistant", &response_text).await?;
525 }
526 }
527 })
528 }
529
530 pub fn agent_service(&self) -> Arc<AgentService> {
531 self.agent_service.clone()
532 }
533
534 async fn context_for_autonomy(&self, user_id: &str, query: &str) -> Option<String> {
535 if user_id != "system" && !is_autonomy_tick(query) {
536 return None;
537 }
538 let context_markdown = self.agent_service.get_context_markdown().await?;
539 if context_markdown.trim().is_empty() {
540 return None;
541 }
542 let max_len = 8000usize;
543 let trimmed = if context_markdown.len() > max_len {
544 format!(
545 "{}\n...\n[CONTEXT_DOC TRUNCATED]",
546 &context_markdown[..max_len]
547 )
548 } else {
549 context_markdown
550 };
551 Some(format!("CONTEXT_DOC (authoritative):\n{}", trimmed))
552 }
553
554 pub async fn preload_context(&self, user_id: &str) -> Result<()> {
555 self.ensure_context_in_memory(user_id).await
556 }
557
558 pub async fn delete_user_history(&self, user_id: &str) -> Result<()> {
559 if let Some(provider) = &self.memory_provider {
560 provider.clear_history(user_id).await?;
561 }
562 Ok(())
563 }
564
565 pub async fn get_user_history(&self, user_id: &str, limit: usize) -> Result<Vec<String>> {
566 if let Some(provider) = &self.memory_provider {
567 return provider.get_history(user_id, limit).await;
568 }
569 Ok(Vec::new())
570 }
571
572 pub async fn search_memory(
573 &self,
574 user_id: &str,
575 query: &str,
576 limit: usize,
577 ) -> Result<Vec<String>> {
578 if let Some(provider) = &self.memory_provider {
579 return provider.search(user_id, query, limit).await;
580 }
581 Ok(Vec::new())
582 }
583}
584
585fn is_autonomy_tick(query: &str) -> bool {
586 let lower = query.to_lowercase();
587 lower.contains("autonomous") && lower.contains("heartbeat")
588}
589
590fn build_memory_context(
591 history: String,
592 semantic: Vec<String>,
593 reminder_context: Option<String>,
594) -> String {
595 let mut context = String::new();
596 if let Some(reminders) = reminder_context {
597 if !reminders.is_empty() {
598 context.push_str(&reminders);
599 context.push_str("\n\n");
600 }
601 }
602 if !history.is_empty() {
603 let filtered_history = history
604 .lines()
605 .filter(|line| !should_skip_memory_line(line))
606 .collect::<Vec<_>>()
607 .join("\n");
608 if !filtered_history.trim().is_empty() {
609 context.push_str(&filtered_history);
610 }
611 }
612 if !semantic.is_empty() {
613 if !context.is_empty() {
614 context.push_str("\n\n");
615 }
616 context.push_str(
617 "RELEVANT MEMORY (unverified; use only if clearly applicable to the user's request):\n",
618 );
619 for item in semantic
620 .into_iter()
621 .filter(|item| !should_skip_memory_line(item))
622 {
623 context.push_str("- ");
624 context.push_str(&item);
625 context.push('\n');
626 }
627 }
628 context
629}
630
631fn should_skip_memory_line(line: &str) -> bool {
632 let lower = line.to_ascii_lowercase();
633 lower.contains("api key")
634 || lower.contains("api_key")
635 || lower.contains("authorization header")
636 || lower.contains("missing api key")
637 || lower.contains("no api key")
638 || lower.contains("invalid api key")
639 || lower.contains("need your api key")
640}
641
642async fn build_reminder_context(store: &ReminderStore, user_id: &str) -> Option<String> {
643 let now = SystemTime::now().duration_since(UNIX_EPOCH).ok()?.as_secs() as i64;
644 let due = store.due_reminders(user_id, now, 10).await.ok()?;
645 if due.is_empty() {
646 return None;
647 }
648
649 let mut context = String::from("DUE REMINDERS (notify the user naturally in this reply):\n");
650 for reminder in due {
651 context.push_str(&format!(
652 "- {} (id: {}, due_at_unix: {})\n",
653 reminder.title, reminder.id, reminder.due_at
654 ));
655 }
656 Some(context)
657}
658
659fn should_include_semantic_memory(query: &str) -> bool {
660 let trimmed = query.trim();
661 if trimmed.is_empty() {
662 return false;
663 }
664 let lower = trimmed.to_lowercase();
665 if lower.contains("hackathon")
666 || lower.contains("colosseum")
667 || lower.contains("context")
668 || lower.contains("agent hackathon")
669 {
670 return true;
671 }
672 let tokens: Vec<&str> = lower.split_whitespace().collect();
673 if tokens.len() < 3 || trimmed.len() < 12 {
674 return false;
675 }
676 let greeting = matches!(
677 tokens.as_slice(),
678 ["hi"] | ["hello"] | ["hey"] | ["yo"] | ["sup"] | ["hey", "there"] | ["hi", "there"]
679 );
680 !greeting
681}
682
683impl QueryService {
684 async fn try_handle_search_command(&self, user_id: &str, text: &str) -> Result<Option<String>> {
685 let lower = text.to_lowercase();
686 let looks_like_search = lower.contains("search")
687 || lower.contains("latest")
688 || lower.contains("current")
689 || lower.contains("today")
690 || lower.contains("breaking")
691 || lower.contains("news")
692 || lower.contains("headline")
693 || lower.contains("up to date")
694 || lower.contains("what's new")
695 || lower.contains("whats new");
696 if !looks_like_search {
697 return Ok(None);
698 }
699
700 let tool = self
701 .agent_service
702 .tool_registry
703 .get_tool("search_internet")
704 .await;
705 let Some(_tool) = tool else {
706 return Ok(None);
707 };
708
709 let query = if lower.contains("search tool") && lower.contains("error") {
710 "check search tool status".to_string()
711 } else {
712 text.to_string()
713 };
714
715 let result = self
716 .agent_service
717 .tool_registry
718 .execute_tool(
719 "search_internet",
720 serde_json::json!({"query": query, "user_id": user_id}),
721 )
722 .await?;
723 let effective_result = result
724 .get("capability_result")
725 .and_then(|value| value.get("result"))
726 .cloned()
727 .unwrap_or_else(|| result.clone());
728
729 let status = effective_result
730 .get("status")
731 .and_then(|v| v.as_str())
732 .unwrap_or("");
733 if status == "success" {
734 let content = effective_result
735 .get("result")
736 .and_then(|v| v.as_str())
737 .unwrap_or("")
738 .to_string();
739 if content.is_empty() {
740 return Ok(Some(
741 "Search completed, but no results were returned.".to_string(),
742 ));
743 }
744 return Ok(Some(content));
745 }
746
747 let message = effective_result
748 .get("message")
749 .and_then(|v| v.as_str())
750 .or_else(|| effective_result.get("error").and_then(|v| v.as_str()))
751 .or_else(|| effective_result.get("code").and_then(|v| v.as_str()))
752 .or_else(|| result.get("message").and_then(|v| v.as_str()))
753 .or_else(|| result.get("error").and_then(|v| v.as_str()))
754 .or_else(|| result.get("code").and_then(|v| v.as_str()))
755 .unwrap_or("Search tool error");
756 let details = effective_result
757 .get("details")
758 .and_then(|v| v.as_str())
759 .or_else(|| effective_result.get("reason").and_then(|v| v.as_str()))
760 .or_else(|| result.get("details").and_then(|v| v.as_str()))
761 .or_else(|| result.get("reason").and_then(|v| v.as_str()))
762 .unwrap_or("");
763 let response = if !details.is_empty() {
764 format!("Search tool error: {} ({})", message, details)
765 } else if message != "Search tool error" {
766 format!("Search tool error: {}", message)
767 } else {
768 format!("Search tool error: {}", effective_result)
769 };
770 Ok(Some(response))
771 }
772
773 async fn try_handle_tasks_command(&self, user_id: &str, text: &str) -> Result<Option<String>> {
774 let lower = text.to_lowercase();
775 let trimmed = lower.trim();
776
777 let is_bulk_clear_verb = lower.contains("clear")
778 || lower.contains("delete")
779 || lower.contains("remove")
780 || lower.contains("wipe")
781 || lower.contains("clean");
782 let references_tasks_collection = lower.contains("tasks")
783 || lower.contains("task list")
784 || lower.contains("my task")
785 || lower.contains("all task");
786 if is_bulk_clear_verb && references_tasks_collection {
787 let tool = self.agent_service.tool_registry.get_tool("tasks").await;
788 let Some(_tool) = tool else {
789 return Ok(Some(
790 "I can’t clear tasks right now because the tasks tool is not available."
791 .to_string(),
792 ));
793 };
794
795 let clear_status = if lower.contains("disabled") {
796 "disabled"
797 } else if lower.contains("enabled") {
798 "enabled"
799 } else {
800 "all"
801 };
802
803 if let Ok(result) = self
804 .agent_service
805 .tool_registry
806 .execute_tool(
807 "tasks",
808 serde_json::json!({
809 "action": "clear",
810 "user_id": user_id,
811 "status": clear_status
812 }),
813 )
814 .await
815 {
816 let payload = Self::tool_payload(&result);
817 let is_error =
818 payload.get("status").and_then(|value| value.as_str()) == Some("error");
819 let deleted = payload
820 .get("deleted")
821 .and_then(|value| value.as_u64())
822 .or_else(|| result.get("deleted").and_then(|value| value.as_u64()));
823 if !is_error {
824 if let Some(deleted) = deleted {
825 return Ok(Some(format!("Cleared {} task(s).", deleted)));
826 }
827 }
828 }
829
830 let list_result = match self
831 .agent_service
832 .tool_registry
833 .execute_tool(
834 "tasks",
835 serde_json::json!({
836 "action": "list",
837 "user_id": user_id,
838 "status": clear_status,
839 "limit": 200
840 }),
841 )
842 .await
843 {
844 Ok(value) => value,
845 Err(err) => {
846 return Ok(Some(format!("I couldn’t clear tasks right now: {}", err)));
847 }
848 };
849
850 let payload = Self::tool_payload(&list_result);
851 let task_ids: Vec<i64> = payload
852 .get("tasks")
853 .and_then(|value| value.as_array())
854 .map(|tasks| {
855 tasks
856 .iter()
857 .filter_map(|task| task.get("id").and_then(|value| value.as_i64()))
858 .collect()
859 })
860 .unwrap_or_default();
861
862 let mut deleted_count: u64 = 0;
863 for id in task_ids {
864 if let Ok(delete_result) = self
865 .agent_service
866 .tool_registry
867 .execute_tool(
868 "tasks",
869 serde_json::json!({
870 "action": "delete",
871 "user_id": user_id,
872 "id": id
873 }),
874 )
875 .await
876 {
877 let delete_payload = Self::tool_payload(&delete_result);
878 if delete_payload
879 .get("deleted")
880 .and_then(|value| value.as_bool())
881 .or_else(|| {
882 delete_result
883 .get("deleted")
884 .and_then(|value| value.as_bool())
885 })
886 == Some(true)
887 {
888 deleted_count += 1;
889 }
890 }
891 }
892
893 return Ok(Some(format!("Cleared {} task(s).", deleted_count)));
894 }
895
896 let looks_like_task_list_request = trimmed == "tasks"
897 || trimmed == "task"
898 || lower.contains("what are the tasks")
899 || lower.contains("what's on my tasks")
900 || lower.contains("whats on my tasks")
901 || lower.contains("my tasks")
902 || lower.contains("list tasks")
903 || lower.contains("show tasks")
904 || lower.contains("open tasks");
905
906 if !looks_like_task_list_request {
907 return Ok(None);
908 }
909
910 let tool = self.agent_service.tool_registry.get_tool("tasks").await;
911 let Some(_tool) = tool else {
912 return Ok(Some(
913 "I can’t list tasks right now because the tasks tool is not available.".to_string(),
914 ));
915 };
916
917 let result = match self
918 .agent_service
919 .tool_registry
920 .execute_tool(
921 "tasks",
922 serde_json::json!({
923 "action": "list",
924 "user_id": user_id,
925 "status": "all",
926 "limit": 50
927 }),
928 )
929 .await
930 {
931 Ok(value) => value,
932 Err(err) => {
933 return Ok(Some(format!("I couldn’t list tasks right now: {}", err)));
934 }
935 };
936
937 let payload = Self::tool_payload(&result);
938
939 let tasks = payload
940 .get("tasks")
941 .and_then(|value| value.as_array())
942 .cloned()
943 .unwrap_or_default();
944
945 if tasks.is_empty() {
946 return Ok(Some("You have no tasks scheduled right now.".to_string()));
947 }
948
949 let mut lines = vec!["Here are your scheduled tasks:".to_string()];
950 for task in tasks {
951 let name = task
952 .get("name")
953 .and_then(|value| value.as_str())
954 .unwrap_or("(unnamed task)");
955 let enabled = task
956 .get("enabled")
957 .and_then(|value| value.as_bool())
958 .unwrap_or(true);
959 let next_run_at = task
960 .get("next_run_at")
961 .and_then(|value| value.as_i64())
962 .map(|value| value.to_string())
963 .unwrap_or_else(|| "unknown".to_string());
964 let interval = task
965 .get("interval_minutes")
966 .and_then(|value| value.as_i64())
967 .map(|value| format!(", every {} min", value))
968 .unwrap_or_default();
969 let state = if enabled { "enabled" } else { "disabled" };
970 lines.push(format!(
971 "- {} ({}, next: {}{})",
972 name, state, next_run_at, interval
973 ));
974 }
975
976 Ok(Some(lines.join("\n")))
977 }
978
979 async fn try_handle_reminders_command(
980 &self,
981 user_id: &str,
982 text: &str,
983 ) -> Result<Option<String>> {
984 let lower = text.to_lowercase();
985 let trimmed = lower.trim();
986
987 let is_bulk_clear_verb = lower.contains("clear")
988 || lower.contains("delete")
989 || lower.contains("remove")
990 || lower.contains("wipe")
991 || lower.contains("clean");
992 let references_reminders_collection = lower.contains("reminders")
993 || lower.contains("reminder list")
994 || lower.contains("my reminder")
995 || lower.contains("all reminder");
996 if is_bulk_clear_verb && references_reminders_collection {
997 let tool = self.agent_service.tool_registry.get_tool("reminders").await;
998 let Some(_tool) = tool else {
999 return Ok(Some(
1000 "I can’t clear reminders right now because the reminders tool is not available."
1001 .to_string(),
1002 ));
1003 };
1004
1005 let clear_status = if lower.contains("open") {
1006 "open"
1007 } else {
1008 "all"
1009 };
1010
1011 if let Ok(result) = self
1012 .agent_service
1013 .tool_registry
1014 .execute_tool(
1015 "reminders",
1016 serde_json::json!({
1017 "action": "clear",
1018 "user_id": user_id,
1019 "status": clear_status
1020 }),
1021 )
1022 .await
1023 {
1024 let payload = Self::tool_payload(&result);
1025 let is_error =
1026 payload.get("status").and_then(|value| value.as_str()) == Some("error");
1027 let deleted = payload
1028 .get("deleted")
1029 .and_then(|value| value.as_u64())
1030 .or_else(|| result.get("deleted").and_then(|value| value.as_u64()));
1031 if !is_error {
1032 if let Some(deleted) = deleted {
1033 return Ok(Some(format!("Cleared {} reminder(s).", deleted)));
1034 }
1035 }
1036 }
1037
1038 let list_result = match self
1039 .agent_service
1040 .tool_registry
1041 .execute_tool(
1042 "reminders",
1043 serde_json::json!({
1044 "action": "list",
1045 "user_id": user_id,
1046 "status": clear_status,
1047 "limit": 200
1048 }),
1049 )
1050 .await
1051 {
1052 Ok(value) => value,
1053 Err(err) => {
1054 return Ok(Some(format!(
1055 "I couldn’t clear reminders right now: {}",
1056 err
1057 )));
1058 }
1059 };
1060
1061 let payload = Self::tool_payload(&list_result);
1062 let reminder_ids: Vec<i64> = payload
1063 .get("reminders")
1064 .and_then(|value| value.as_array())
1065 .map(|items| {
1066 items
1067 .iter()
1068 .filter_map(|item| item.get("id").and_then(|value| value.as_i64()))
1069 .collect()
1070 })
1071 .unwrap_or_default();
1072
1073 let mut deleted_count: u64 = 0;
1074 for id in reminder_ids {
1075 if let Ok(delete_result) = self
1076 .agent_service
1077 .tool_registry
1078 .execute_tool(
1079 "reminders",
1080 serde_json::json!({
1081 "action": "delete",
1082 "user_id": user_id,
1083 "id": id
1084 }),
1085 )
1086 .await
1087 {
1088 let delete_payload = Self::tool_payload(&delete_result);
1089 if delete_payload
1090 .get("deleted")
1091 .and_then(|value| value.as_bool())
1092 .or_else(|| {
1093 delete_result
1094 .get("deleted")
1095 .and_then(|value| value.as_bool())
1096 })
1097 == Some(true)
1098 {
1099 deleted_count += 1;
1100 }
1101 }
1102 }
1103
1104 return Ok(Some(format!("Cleared {} reminder(s).", deleted_count)));
1105 }
1106
1107 let looks_like_reminder_list_request = trimmed == "reminders"
1108 || trimmed == "reminder"
1109 || lower.contains("what reminders are due")
1110 || lower.contains("which reminders are due")
1111 || lower.contains("due reminders")
1112 || lower.contains("what are my reminders")
1113 || lower.contains("my reminders")
1114 || lower.contains("list reminders")
1115 || lower.contains("show reminders")
1116 || lower.contains("open reminders");
1117
1118 if !looks_like_reminder_list_request {
1119 return Ok(None);
1120 }
1121
1122 let tool = self.agent_service.tool_registry.get_tool("reminders").await;
1123 let Some(_tool) = tool else {
1124 return Ok(Some(
1125 "I can’t list reminders right now because the reminders tool is not available."
1126 .to_string(),
1127 ));
1128 };
1129
1130 let result = match self
1131 .agent_service
1132 .tool_registry
1133 .execute_tool(
1134 "reminders",
1135 serde_json::json!({
1136 "action": "list",
1137 "user_id": user_id,
1138 "status": "open",
1139 "limit": 50
1140 }),
1141 )
1142 .await
1143 {
1144 Ok(value) => value,
1145 Err(err) => {
1146 return Ok(Some(format!(
1147 "I couldn’t list reminders right now: {}",
1148 err
1149 )));
1150 }
1151 };
1152
1153 let payload = Self::tool_payload(&result);
1154 let reminders = payload
1155 .get("reminders")
1156 .and_then(|value| value.as_array())
1157 .cloned()
1158 .unwrap_or_default();
1159
1160 if reminders.is_empty() {
1161 return Ok(Some("No reminders are due at this time.".to_string()));
1162 }
1163
1164 let mut lines = vec!["Here are your open reminders:".to_string()];
1165 for reminder in reminders {
1166 let title = reminder
1167 .get("title")
1168 .and_then(|value| value.as_str())
1169 .unwrap_or("(untitled reminder)");
1170 let due_at = reminder
1171 .get("due_at")
1172 .and_then(|value| value.as_i64())
1173 .map(|value| value.to_string())
1174 .unwrap_or_else(|| "unknown".to_string());
1175 lines.push(format!("- {} (due: {})", title, due_at));
1176 }
1177
1178 Ok(Some(lines.join("\n")))
1179 }
1180
1181 async fn try_handle_todo_command(&self, user_id: &str, text: &str) -> Result<Option<String>> {
1182 let lower = text.to_lowercase();
1183 let trimmed = lower.trim();
1184
1185 let is_bulk_clear_verb = lower.contains("clear")
1186 || lower.contains("delete")
1187 || lower.contains("remove")
1188 || lower.contains("wipe")
1189 || lower.contains("clean");
1190 let references_todo_collection = lower.contains("todos")
1191 || lower.contains("todo list")
1192 || lower.contains("my todo")
1193 || lower.contains("all todo");
1194 let looks_like_bulk_todo_clear_request = is_bulk_clear_verb && references_todo_collection;
1195
1196 if looks_like_bulk_todo_clear_request {
1197 let tool = self.agent_service.tool_registry.get_tool("todo").await;
1198 let Some(_tool) = tool else {
1199 return Ok(Some(
1200 "I can’t clear todos right now because the todo tool is not available."
1201 .to_string(),
1202 ));
1203 };
1204
1205 let clear_status = if lower.contains("completed") {
1206 "completed"
1207 } else {
1208 "all"
1209 };
1210
1211 let result = match self
1212 .agent_service
1213 .tool_registry
1214 .execute_tool(
1215 "todo",
1216 serde_json::json!({
1217 "action": "clear",
1218 "user_id": user_id,
1219 "status": clear_status
1220 }),
1221 )
1222 .await
1223 {
1224 Ok(value) => value,
1225 Err(err) => {
1226 return Ok(Some(format!("I couldn’t clear todos right now: {}", err)));
1227 }
1228 };
1229
1230 let payload = Self::tool_payload(&result);
1231 let error_message = payload
1232 .get("message")
1233 .and_then(|value| value.as_str())
1234 .or_else(|| result.get("message").and_then(|value| value.as_str()));
1235 if payload.get("status").and_then(|value| value.as_str()) == Some("error") {
1236 if let Some(message) = error_message {
1237 return Ok(Some(format!("I couldn’t clear todos right now: {message}")));
1238 }
1239 }
1240
1241 let deleted = payload
1242 .get("deleted")
1243 .and_then(|value| value.as_u64())
1244 .or_else(|| result.get("deleted").and_then(|value| value.as_u64()))
1245 .unwrap_or(0);
1246 return Ok(Some(format!("Cleared {} todo(s).", deleted)));
1247 }
1248
1249 let looks_like_todo_list_request = trimmed == "todos"
1250 || trimmed == "todo"
1251 || lower.contains("what are the todos")
1252 || lower.contains("what are my todos")
1253 || lower.contains("my todos")
1254 || lower.contains("list todos")
1255 || lower.contains("show todos")
1256 || lower.contains("open todos");
1257
1258 if !looks_like_todo_list_request {
1259 return Ok(None);
1260 }
1261
1262 let tool = self.agent_service.tool_registry.get_tool("todo").await;
1263 let Some(_tool) = tool else {
1264 return Ok(Some(
1265 "I can’t list todos right now because the todo tool is not available.".to_string(),
1266 ));
1267 };
1268
1269 let result = match self
1270 .agent_service
1271 .tool_registry
1272 .execute_tool(
1273 "todo",
1274 serde_json::json!({
1275 "action": "list",
1276 "user_id": user_id,
1277 "status": "open",
1278 "limit": 50
1279 }),
1280 )
1281 .await
1282 {
1283 Ok(value) => value,
1284 Err(err) => {
1285 return Ok(Some(format!("I couldn’t list todos right now: {}", err)));
1286 }
1287 };
1288
1289 let payload = Self::tool_payload(&result);
1290 let items = payload
1291 .get("items")
1292 .and_then(|value| value.as_array())
1293 .cloned()
1294 .unwrap_or_default();
1295
1296 if items.is_empty() {
1297 return Ok(Some("You have no open todos right now.".to_string()));
1298 }
1299
1300 let mut lines = vec!["Here are your open todos:".to_string()];
1301 for item in items {
1302 let title = item
1303 .get("title")
1304 .and_then(|value| value.as_str())
1305 .unwrap_or("(untitled todo)");
1306 let notes = item
1307 .get("notes")
1308 .and_then(|value| value.as_str())
1309 .map(|value| format!(": {}", value))
1310 .unwrap_or_default();
1311 lines.push(format!("- {}{}", title, notes));
1312 }
1313
1314 Ok(Some(lines.join("\n")))
1315 }
1316
1317 async fn try_handle_plans_command(&self, user_id: &str, text: &str) -> Result<Option<String>> {
1318 let lower = text.to_lowercase();
1319 let trimmed = lower.trim();
1320
1321 let is_bulk_clear_verb = lower.contains("clear")
1322 || lower.contains("delete")
1323 || lower.contains("remove")
1324 || lower.contains("wipe")
1325 || lower.contains("clean");
1326 let references_plans_collection = lower.contains("plans")
1327 || lower.contains("plan list")
1328 || lower.contains("my plan")
1329 || lower.contains("all plan");
1330 if is_bulk_clear_verb && references_plans_collection {
1331 let tool = self.agent_service.tool_registry.get_tool("planning").await;
1332 let Some(_tool) = tool else {
1333 return Ok(Some(
1334 "I can’t clear plans right now because the planning tool is not available."
1335 .to_string(),
1336 ));
1337 };
1338
1339 if let Ok(result) = self
1340 .agent_service
1341 .tool_registry
1342 .execute_tool(
1343 "planning",
1344 serde_json::json!({
1345 "action": "clear",
1346 "user_id": user_id
1347 }),
1348 )
1349 .await
1350 {
1351 let payload = Self::tool_payload(&result);
1352 let is_error =
1353 payload.get("status").and_then(|value| value.as_str()) == Some("error");
1354 let deleted = payload
1355 .get("deleted")
1356 .and_then(|value| value.as_u64())
1357 .or_else(|| result.get("deleted").and_then(|value| value.as_u64()));
1358 if !is_error {
1359 if let Some(deleted) = deleted {
1360 return Ok(Some(format!("Cleared {} plan(s).", deleted)));
1361 }
1362 }
1363 }
1364
1365 let list_result = match self
1366 .agent_service
1367 .tool_registry
1368 .execute_tool(
1369 "planning",
1370 serde_json::json!({
1371 "action": "list",
1372 "user_id": user_id,
1373 "limit": 200
1374 }),
1375 )
1376 .await
1377 {
1378 Ok(value) => value,
1379 Err(err) => {
1380 return Ok(Some(format!("I couldn’t clear plans right now: {}", err)));
1381 }
1382 };
1383
1384 let payload = Self::tool_payload(&list_result);
1385 let plan_ids: Vec<i64> = payload
1386 .get("plans")
1387 .and_then(|value| value.as_array())
1388 .map(|items| {
1389 items
1390 .iter()
1391 .filter_map(|item| item.get("id").and_then(|value| value.as_i64()))
1392 .collect()
1393 })
1394 .unwrap_or_default();
1395
1396 let mut deleted_count: u64 = 0;
1397 for id in plan_ids {
1398 if let Ok(delete_result) = self
1399 .agent_service
1400 .tool_registry
1401 .execute_tool(
1402 "planning",
1403 serde_json::json!({
1404 "action": "delete",
1405 "user_id": user_id,
1406 "id": id
1407 }),
1408 )
1409 .await
1410 {
1411 let delete_payload = Self::tool_payload(&delete_result);
1412 if delete_payload
1413 .get("deleted")
1414 .and_then(|value| value.as_bool())
1415 .or_else(|| {
1416 delete_result
1417 .get("deleted")
1418 .and_then(|value| value.as_bool())
1419 })
1420 == Some(true)
1421 {
1422 deleted_count += 1;
1423 }
1424 }
1425 }
1426
1427 return Ok(Some(format!("Cleared {} plan(s).", deleted_count)));
1428 }
1429
1430 let looks_like_plans_list_request = trimmed == "plans"
1431 || lower.contains("what are the plans")
1432 || lower.contains("what are my plans")
1433 || lower.contains("show plans")
1434 || lower.contains("list plans")
1435 || lower.contains("my plans")
1436 || lower.contains("saved plans")
1437 || lower.contains("current plans");
1438
1439 if !looks_like_plans_list_request {
1440 return Ok(None);
1441 }
1442
1443 let tool = self.agent_service.tool_registry.get_tool("planning").await;
1444 let Some(_tool) = tool else {
1445 return Ok(Some(
1446 "I can’t list plans right now because the planning tool is not available."
1447 .to_string(),
1448 ));
1449 };
1450
1451 let result = match self
1452 .agent_service
1453 .tool_registry
1454 .execute_tool(
1455 "planning",
1456 serde_json::json!({
1457 "action": "list",
1458 "user_id": user_id,
1459 "limit": 20
1460 }),
1461 )
1462 .await
1463 {
1464 Ok(value) => value,
1465 Err(err) => {
1466 return Ok(Some(format!("I couldn’t list plans right now: {}", err)));
1467 }
1468 };
1469
1470 let payload = Self::tool_payload(&result);
1471 let plans = payload
1472 .get("plans")
1473 .and_then(|value| value.as_array())
1474 .cloned()
1475 .unwrap_or_default();
1476
1477 if plans.is_empty() {
1478 return Ok(Some("You have no saved plans right now.".to_string()));
1479 }
1480
1481 let mut lines = vec!["Here are your saved plans:".to_string()];
1482 for plan in plans {
1483 let title = plan
1484 .get("title")
1485 .and_then(|value| value.as_str())
1486 .unwrap_or("(untitled plan)");
1487 let status = plan
1488 .get("status")
1489 .and_then(|value| value.as_str())
1490 .unwrap_or("unknown");
1491 lines.push(format!("- {} ({})", title, status));
1492 }
1493
1494 Ok(Some(lines.join("\n")))
1495 }
1496
1497 fn tool_payload(result: &serde_json::Value) -> &serde_json::Value {
1498 result
1499 .get("capability_result")
1500 .and_then(|value| value.get("result"))
1501 .unwrap_or(result)
1502 }
1503}