1use anyhow::Result;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::path::PathBuf;
10use std::sync::Arc;
11use std::time::Instant;
12use teloxide::prelude::*;
13use teloxide::types::{ChatId, MessageId, ParseMode, ThreadId};
14use tokio::sync::Mutex;
15use tracing::{debug, error, info, warn};
16
17use localgpt_core::agent::{Agent, AgentConfig, StreamEvent, extract_tool_detail, tools::Tool};
18use localgpt_core::concurrency::TurnGate;
19use localgpt_core::config::Config;
20use localgpt_core::memory::MemoryManager;
21
22const TELEGRAM_AGENT_ID: &str = "telegram";
24
25const MAX_MESSAGE_LENGTH: usize = 4096;
27
28const EDIT_DEBOUNCE_SECS: u64 = 2;
30
31pub type ToolFactory = Box<dyn Fn(&Config) -> Result<Vec<Box<dyn Tool>>> + Send + Sync>;
34
35#[derive(Debug, Serialize, Deserialize)]
36struct PairedUser {
37 user_id: u64,
38 username: Option<String>,
39 paired_at: String,
40}
41
42struct SessionEntry {
43 agent: Agent,
44 last_accessed: Instant,
45}
46
47struct BotState {
48 config: Config,
49 sessions: Mutex<HashMap<i64, SessionEntry>>,
50 memory: MemoryManager,
51 turn_gate: TurnGate,
52 paired_user: Mutex<Option<PairedUser>>,
53 pending_pairing_code: Mutex<Option<String>>,
54 tool_factory: Option<ToolFactory>,
55 outbox: Option<localgpt_core::outbox::Outbox>,
56}
57
58fn pairing_file_path() -> Result<PathBuf> {
59 let paths = localgpt_core::paths::Paths::resolve()?;
60 Ok(paths.pairing_file())
61}
62
63fn load_paired_user() -> Option<PairedUser> {
64 let path = pairing_file_path().ok()?;
65 let content = std::fs::read_to_string(path).ok()?;
66 serde_json::from_str(&content).ok()
67}
68
69pub fn load_paired_chat_id() -> Option<i64> {
72 load_paired_user().map(|u| u.user_id as i64)
73}
74
75pub fn create_heartbeat_alert_callback(
78 api_token: &str,
79 topic_id: i32,
80) -> Option<localgpt_core::heartbeat::AlertCallback> {
81 let chat_id = load_paired_chat_id()?;
82 let bot = Bot::new(api_token);
83
84 Some(Box::new(move |text: &str| {
85 let bot = bot.clone();
86 let msg = if text.len() > 4000 {
87 format!("{}...", &text[..text.floor_char_boundary(4000)])
88 } else {
89 text.to_string()
90 };
91 tokio::spawn(async move {
92 let mut req = bot.send_message(ChatId(chat_id), &msg);
93 req = req.message_thread_id(ThreadId(MessageId(topic_id)));
94 if let Err(e) = req.await {
95 tracing::warn!("Failed to send heartbeat alert to Telegram topic: {}", e);
96 }
97 });
98 }))
99}
100
101fn save_paired_user(user: &PairedUser) -> Result<()> {
102 let path = pairing_file_path()?;
103 let content = serde_json::to_string_pretty(user)?;
104 std::fs::write(path, content)?;
105 Ok(())
106}
107
108fn generate_pairing_code() -> String {
109 use std::time::{SystemTime, UNIX_EPOCH};
110 let seed = SystemTime::now()
111 .duration_since(UNIX_EPOCH)
112 .unwrap_or_default()
113 .as_nanos();
114 let code = ((seed.wrapping_mul(6364136223846793005).wrapping_add(1)) % 900000 + 100000) as u32;
116 format!("{:06}", code)
117}
118
119pub async fn run_telegram_bot(
120 config: &Config,
121 turn_gate: TurnGate,
122 tool_factory: Option<ToolFactory>,
123) -> Result<()> {
124 let telegram_config = config
125 .telegram
126 .as_ref()
127 .ok_or_else(|| anyhow::anyhow!("Telegram config not found"))?;
128
129 if !telegram_config.enabled {
130 return Ok(());
131 }
132
133 let token = &telegram_config.api_token;
134 if token.is_empty() || token.starts_with("${") {
135 anyhow::bail!("Telegram API token not configured or not expanded");
136 }
137
138 let bot = Bot::new(token);
139
140 let memory =
141 MemoryManager::new_with_full_config(&config.memory, Some(config), TELEGRAM_AGENT_ID)?;
142
143 let paired_user = load_paired_user();
144 if let Some(ref user) = paired_user {
145 info!(
146 "Telegram bot: paired with user {} (ID: {})",
147 user.username.as_deref().unwrap_or("unknown"),
148 user.user_id
149 );
150 } else {
151 info!("Telegram bot: no paired user. Send any message to start pairing.");
152 }
153
154 let outbox = if config.server.outbox_enabled {
156 let paths = localgpt_core::paths::Paths::resolve()?;
157 let outbox_db = paths.state_dir.join("outbox.sqlite");
158 match localgpt_core::outbox::Outbox::new_with_max_attempts(
159 &outbox_db,
160 config.server.outbox_max_attempts,
161 ) {
162 Ok(ob) => {
163 info!("Outbox enabled (db: {})", outbox_db.display());
164 Some(ob)
165 }
166 Err(e) => {
167 warn!("Failed to initialize outbox: {}. Proceeding without.", e);
168 None
169 }
170 }
171 } else {
172 None
173 };
174
175 let state = Arc::new(BotState {
176 config: config.clone(),
177 sessions: Mutex::new(HashMap::new()),
178 memory,
179 turn_gate,
180 paired_user: Mutex::new(paired_user),
181 pending_pairing_code: Mutex::new(None),
182 tool_factory,
183 outbox,
184 });
185
186 let commands: Vec<teloxide::types::BotCommand> = localgpt_core::commands::COMMANDS
188 .iter()
189 .filter(|c| c.supports(localgpt_core::commands::Interface::Telegram))
190 .map(|c| teloxide::types::BotCommand::new(c.name, c.description))
191 .collect();
192 if let Err(e) = bot.set_my_commands(commands).await {
193 warn!("Failed to set bot commands: {}", e);
194 }
195
196 if let Some(ref outbox) = state.outbox {
198 match outbox.recovery_sweep() {
199 Ok(pending) if !pending.is_empty() => {
200 info!(
201 "Outbox recovery: {} pending messages to retry",
202 pending.len()
203 );
204 let retry_bot = bot.clone();
205 let retry_outbox = outbox.clone();
206 tokio::spawn(async move {
207 for entry in pending {
208 outbox_retry_send(&retry_bot, &retry_outbox, &entry).await;
209 }
210 });
211 }
212 Ok(_) => debug!("Outbox recovery: no pending messages"),
213 Err(e) => warn!("Outbox recovery sweep failed: {}", e),
214 }
215
216 let retry_bot = bot.clone();
218 let retry_outbox = outbox.clone();
219 let retain_days = config.server.outbox_retain_days;
220 tokio::spawn(async move {
221 let mut cleanup_counter = 0u32;
222 loop {
223 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
224 if let Ok(Some(entry)) = retry_outbox.claim_next() {
225 outbox_retry_send(&retry_bot, &retry_outbox, &entry).await;
226 }
227 cleanup_counter += 1;
229 if cleanup_counter >= 60 {
230 cleanup_counter = 0;
231 let _ = retry_outbox.cleanup_delivered(retain_days);
232 }
233 }
234 });
235 }
236
237 info!("Starting Telegram bot...");
238
239 let handler = Update::filter_message().endpoint(handle_message);
240
241 Dispatcher::builder(bot, handler)
242 .default_handler(|_upd| async {})
243 .dependencies(dptree::deps![state])
244 .enable_ctrlc_handler()
245 .build()
246 .dispatch()
247 .await;
248
249 Ok(())
250}
251
252async fn handle_message(bot: Bot, msg: Message, state: Arc<BotState>) -> ResponseResult<()> {
253 let text = match msg.text() {
254 Some(t) => t.to_string(),
255 None => return Ok(()),
256 };
257
258 let user = match msg.from {
259 Some(ref u) => u,
260 None => return Ok(()),
261 };
262
263 let user_id = user.id.0;
264 let chat_id = msg.chat.id;
265
266 {
268 let paired = state.paired_user.lock().await;
269 if let Some(ref pu) = *paired {
270 if pu.user_id != user_id {
271 bot.send_message(
272 chat_id,
273 "Not authorized. This bot is paired with another user.",
274 )
275 .await?;
276 return Ok(());
277 }
278 } else {
279 drop(paired);
281 return handle_pairing(bot, msg, &state, user_id, &text).await;
282 }
283 }
284
285 if text.starts_with('/') {
287 return handle_command(&bot, chat_id, &state, &text).await;
288 }
289
290 handle_chat(&bot, chat_id, &state, &text).await
292}
293
294async fn handle_pairing(
295 bot: Bot,
296 msg: Message,
297 state: &Arc<BotState>,
298 user_id: u64,
299 text: &str,
300) -> ResponseResult<()> {
301 let chat_id = msg.chat.id;
302 let mut pending = state.pending_pairing_code.lock().await;
303
304 if let Some(ref code) = *pending {
305 if text.trim() == code.as_str() {
307 let username = msg.from.as_ref().and_then(|u| u.username.clone());
309 let paired = PairedUser {
310 user_id,
311 username: username.clone(),
312 paired_at: chrono::Utc::now().to_rfc3339(),
313 };
314
315 if let Err(e) = save_paired_user(&paired) {
316 error!("Failed to save pairing: {}", e);
317 bot.send_message(chat_id, "Pairing failed (could not save). Check logs.")
318 .await?;
319 return Ok(());
320 }
321
322 *state.paired_user.lock().await = Some(paired);
323 *pending = None;
324
325 info!(
326 "Telegram bot: paired with user {} (ID: {})",
327 username.as_deref().unwrap_or("unknown"),
328 user_id
329 );
330
331 bot.send_message(chat_id,
332 "Paired successfully! You can now chat with LocalGPT.\n\nUse /new to start a fresh session, /status to see session info.",
333 )
334 .await?;
335 } else {
336 bot.send_message(chat_id, "Invalid pairing code. Please try again.")
337 .await?;
338 }
339 } else {
340 let code = generate_pairing_code();
342 println!("\n========================================");
343 println!(" TELEGRAM PAIRING CODE: {}", code);
344 println!("========================================\n");
345 info!(
346 "Telegram pairing code generated for user {} (ID: {})",
347 msg.from
348 .as_ref()
349 .and_then(|u| u.username.as_deref())
350 .unwrap_or("unknown"),
351 user_id
352 );
353
354 *pending = Some(code);
355
356 bot.send_message(chat_id,
357 "Welcome! A pairing code has been printed to the daemon logs/stdout.\nPlease enter the code to pair this bot with your account.",
358 )
359 .await?;
360 }
361
362 Ok(())
363}
364
365async fn handle_command(
366 bot: &Bot,
367 chat_id: ChatId,
368 state: &Arc<BotState>,
369 text: &str,
370) -> ResponseResult<()> {
371 let parts: Vec<&str> = text.splitn(2, ' ').collect();
372 let cmd = parts[0];
373 let args = parts.get(1).map(|s| s.trim()).unwrap_or("");
374
375 match cmd {
376 "/start" | "/help" => {
377 let help = format!(
378 "LocalGPT Telegram Bot\n\n{}",
379 localgpt_core::commands::format_help_text(
380 localgpt_core::commands::Interface::Telegram
381 )
382 );
383 bot.send_message(chat_id, &help).await?;
384 }
385 "/new" => {
386 let mut sessions = state.sessions.lock().await;
387 sessions.remove(&chat_id.0);
388 bot.send_message(
389 chat_id,
390 "Session cleared. Send a message to start a new conversation.",
391 )
392 .await?;
393 }
394 "/status" => {
395 let sessions = state.sessions.lock().await;
396 let status_text = if let Some(entry) = sessions.get(&chat_id.0) {
397 let status = entry.agent.session_status();
398 let (used, usable, total) = entry.agent.context_usage();
399 let mut text = format!(
400 "Session active\n\
401 Model: {}\n\
402 Messages: {}\n\
403 Tokens: {} / {} (window: {})\n\
404 Compactions: {}\n\
405 Idle: {}s",
406 entry.agent.model(),
407 status.message_count,
408 used,
409 usable,
410 total,
411 status.compaction_count,
412 entry.last_accessed.elapsed().as_secs()
413 );
414 if status.search_queries > 0 {
415 let cache_pct =
416 (status.search_cached_hits as f64 / status.search_queries as f64) * 100.0;
417 text.push_str(&format!(
418 "\nSearch: {} queries ({} cached, {:.0}%) · ${:.3}",
419 status.search_queries,
420 status.search_cached_hits,
421 cache_pct,
422 status.search_cost_usd
423 ));
424 }
425 text
426 } else {
427 "No active session. Send a message to start one.".to_string()
428 };
429 bot.send_message(chat_id, &status_text).await?;
430 }
431 "/compact" => {
432 let mut sessions = state.sessions.lock().await;
433 match sessions.get_mut(&chat_id.0) {
434 Some(entry) => {
435 entry.last_accessed = Instant::now();
436 match entry.agent.compact_session().await {
437 Ok((before, after)) => {
438 bot.send_message(
439 chat_id,
440 format!("Compacted: {} -> {} tokens", before, after),
441 )
442 .await?;
443 }
444 Err(e) => {
445 bot.send_message(chat_id, format!("Compact failed: {}", e))
446 .await?;
447 }
448 }
449 }
450 None => {
451 bot.send_message(chat_id, "No active session.").await?;
452 }
453 }
454 }
455 "/clear" => {
456 let mut sessions = state.sessions.lock().await;
457 if let Some(entry) = sessions.get_mut(&chat_id.0) {
458 entry.agent.clear_session();
459 entry.last_accessed = Instant::now();
460 bot.send_message(chat_id, "Session history cleared.")
461 .await?;
462 } else {
463 bot.send_message(chat_id, "No active session.").await?;
464 }
465 }
466 "/memory" => {
467 if args.is_empty() {
468 bot.send_message(chat_id, "Usage: /memory <search query>")
469 .await?;
470 } else {
471 match state.memory.search(args, 5) {
472 Ok(results) => {
473 if results.is_empty() {
474 bot.send_message(chat_id, "No results found.").await?;
475 } else {
476 let mut text = format!("Memory search: \"{}\"\n\n", args);
477 for (i, r) in results.iter().enumerate() {
478 text.push_str(&format!(
479 "{}. {} (L{}-{})\n{}\n\n",
480 i + 1,
481 r.file,
482 r.line_start,
483 r.line_end,
484 truncate_str(&r.content, 300),
485 ));
486 }
487 send_long_message(bot, chat_id, None, &text).await;
488 }
489 }
490 Err(e) => {
491 bot.send_message(chat_id, format!("Search error: {}", e))
492 .await?;
493 }
494 }
495 }
496 }
497 "/model" => {
498 if args.is_empty() {
499 let sessions = state.sessions.lock().await;
500 let current = sessions
501 .get(&chat_id.0)
502 .map(|e| e.agent.model().to_string())
503 .unwrap_or_else(|| state.config.agent.default_model.clone());
504 bot.send_message(
505 chat_id,
506 format!("Current model: {}\n\nUsage: /model <name>", current),
507 )
508 .await?;
509 } else {
510 let mut sessions = state.sessions.lock().await;
511 if let Some(entry) = sessions.get_mut(&chat_id.0) {
512 match entry.agent.set_model(args) {
513 Ok(()) => {
514 bot.send_message(chat_id, format!("Switched to model: {}", args))
515 .await?;
516 }
517 Err(e) => {
518 bot.send_message(chat_id, format!("Failed to switch model: {}", e))
519 .await?;
520 }
521 }
522 } else {
523 bot.send_message(
524 chat_id,
525 "No active session. Send a message first, then switch models.",
526 )
527 .await?;
528 }
529 }
530 }
531 "/skills" => {
532 let workspace_path = state.config.workspace_path();
533 match localgpt_core::agent::load_skills(&workspace_path) {
534 Ok(skills) => {
535 if skills.is_empty() {
536 bot.send_message(chat_id, "No skills installed.").await?;
537 } else {
538 let summary = localgpt_core::agent::get_skills_summary(&skills);
539 bot.send_message(chat_id, &summary).await?;
540 }
541 }
542 Err(e) => {
543 bot.send_message(chat_id, format!("Failed to load skills: {}", e))
544 .await?;
545 }
546 }
547 }
548 "/unpair" => {
549 *state.paired_user.lock().await = None;
550 if let Ok(path) = pairing_file_path() {
551 let _ = std::fs::remove_file(path);
552 }
553 let mut sessions = state.sessions.lock().await;
554 sessions.remove(&chat_id.0);
555 info!("Telegram bot: user unpaired");
556 bot.send_message(
557 chat_id,
558 "Unpaired. Send any message to start a new pairing.",
559 )
560 .await?;
561 }
562 _ => {
563 bot.send_message(
564 chat_id,
565 "Unknown command. Use /help for available commands.",
566 )
567 .await?;
568 }
569 }
570
571 Ok(())
572}
573
574fn truncate_str(s: &str, max: usize) -> &str {
575 if s.len() <= max {
576 s
577 } else {
578 let mut end = max;
580 while end > 0 && !s.is_char_boundary(end) {
581 end -= 1;
582 }
583 &s[..end]
584 }
585}
586
587fn escape_html(text: &str) -> String {
589 text.replace('&', "&")
590 .replace('<', "<")
591 .replace('>', ">")
592}
593
594fn markdown_to_html(text: &str) -> String {
598 let mut result = String::with_capacity(text.len());
599 let mut in_code_block = false;
600 let mut code_block_lang = String::new();
601 let mut code_block_content = String::new();
602
603 for line in text.lines() {
604 if in_code_block {
605 if line.starts_with("```") {
606 let lang_attr = if code_block_lang.is_empty() {
608 String::new()
609 } else {
610 format!(" class=\"language-{}\"", escape_html(&code_block_lang))
611 };
612 result.push_str(&format!(
613 "<pre><code{}>{}</code></pre>\n",
614 lang_attr,
615 escape_html(&code_block_content)
616 ));
617 code_block_content.clear();
618 code_block_lang.clear();
619 in_code_block = false;
620 } else {
621 if !code_block_content.is_empty() {
622 code_block_content.push('\n');
623 }
624 code_block_content.push_str(line);
625 }
626 continue;
627 }
628
629 if let Some(rest) = line.strip_prefix("```") {
630 in_code_block = true;
631 code_block_lang = rest.trim().to_string();
632 continue;
633 }
634
635 let line = if let Some(rest) = line.strip_prefix("### ") {
637 format!("<b>{}</b>", escape_html(rest))
638 } else if let Some(rest) = line.strip_prefix("## ") {
639 format!("<b>{}</b>", escape_html(rest))
640 } else if let Some(rest) = line.strip_prefix("# ") {
641 format!("<b>{}</b>", escape_html(rest))
642 } else {
643 convert_inline_markdown(line)
644 };
645
646 result.push_str(&line);
647 result.push('\n');
648 }
649
650 if in_code_block {
652 result.push_str(&format!(
653 "<pre><code>{}</code></pre>\n",
654 escape_html(&code_block_content)
655 ));
656 }
657
658 result
659}
660
661fn convert_inline_markdown(line: &str) -> String {
663 let mut result = String::new();
664 let chars: Vec<char> = line.chars().collect();
665 let len = chars.len();
666 let mut i = 0;
667
668 while i < len {
669 if chars[i] == '`'
671 && let Some(end) = chars[i + 1..].iter().position(|&c| c == '`')
672 {
673 let code: String = chars[i + 1..i + 1 + end].iter().collect();
674 result.push_str(&format!("<code>{}</code>", escape_html(&code)));
675 i += end + 2;
676 continue;
677 }
678
679 if i + 1 < len
681 && chars[i] == '*'
682 && chars[i + 1] == '*'
683 && let Some(end) = find_closing(&chars, i + 2, &['*', '*'])
684 {
685 let inner: String = chars[i + 2..end].iter().collect();
686 result.push_str(&format!("<b>{}</b>", escape_html(&inner)));
687 i = end + 2;
688 continue;
689 }
690
691 if chars[i] == '*'
693 && let Some(end) = chars[i + 1..].iter().position(|&c| c == '*')
694 {
695 let inner: String = chars[i + 1..i + 1 + end].iter().collect();
696 result.push_str(&format!("<i>{}</i>", escape_html(&inner)));
697 i += end + 2;
698 continue;
699 }
700
701 if chars[i] == '['
703 && let Some(close_bracket) = chars[i + 1..].iter().position(|&c| c == ']')
704 {
705 let text_end = i + 1 + close_bracket;
706 if text_end + 1 < len
707 && chars[text_end + 1] == '('
708 && let Some(close_paren) = chars[text_end + 2..].iter().position(|&c| c == ')')
709 {
710 let text: String = chars[i + 1..text_end].iter().collect();
711 let url: String = chars[text_end + 2..text_end + 2 + close_paren]
712 .iter()
713 .collect();
714 result.push_str(&format!(
715 "<a href=\"{}\">{}</a>",
716 escape_html(&url),
717 escape_html(&text)
718 ));
719 i = text_end + 2 + close_paren + 1;
720 continue;
721 }
722 }
723
724 match chars[i] {
726 '&' => result.push_str("&"),
727 '<' => result.push_str("<"),
728 '>' => result.push_str(">"),
729 c => result.push(c),
730 }
731 i += 1;
732 }
733
734 result
735}
736
737fn find_closing(chars: &[char], start: usize, delim: &[char]) -> Option<usize> {
739 let dlen = delim.len();
740 if start + dlen > chars.len() {
741 return None;
742 }
743 for i in start..chars.len() - dlen + 1 {
744 if chars[i..i + dlen] == *delim {
745 return Some(i);
746 }
747 }
748 None
749}
750
751async fn handle_chat(
752 bot: &Bot,
753 chat_id: ChatId,
754 state: &Arc<BotState>,
755 text: &str,
756) -> ResponseResult<()> {
757 let thinking_msg = bot.send_message(chat_id, "Thinking...").await?;
759 let msg_id = thinking_msg.id;
760
761 let _gate_permit = state.turn_gate.acquire().await;
763
764 let mut sessions = state.sessions.lock().await;
766
767 if let std::collections::hash_map::Entry::Vacant(e) = sessions.entry(chat_id.0) {
768 let agent_config = AgentConfig {
769 model: state.config.agent.default_model.clone(),
770 context_window: state.config.agent.context_window,
771 reserve_tokens: state.config.agent.reserve_tokens,
772 };
773
774 let memory = std::sync::Arc::new(state.memory.clone());
775 match Agent::new(agent_config, &state.config, memory).await {
776 Ok(mut agent) => {
777 if let Some(ref factory) = state.tool_factory {
779 match factory(&state.config) {
780 Ok(extra_tools) => {
781 agent.extend_tools(extra_tools);
782 }
783 Err(err) => {
784 error!("Failed to create additional tools: {}", err);
785 }
786 }
787 }
788
789 if let Err(err) = agent.new_session().await {
790 error!("Failed to create session: {}", err);
791 let _ = bot
792 .edit_message_text(chat_id, msg_id, format!("Error: {}", err))
793 .await;
794 return Ok(());
795 }
796
797 let is_brand_new = agent.is_brand_new();
799 if is_brand_new {
800 let html = markdown_to_html(localgpt_core::agent::FIRST_RUN_WELCOME);
801 let _ = bot
802 .send_message(chat_id, html)
803 .parse_mode(ParseMode::Html)
804 .await;
805 }
806
807 e.insert(SessionEntry {
808 agent,
809 last_accessed: Instant::now(),
810 });
811 }
812 Err(err) => {
813 error!("Failed to create agent: {}", err);
814 let _ = bot
815 .edit_message_text(chat_id, msg_id, format!("Error: {}", err))
816 .await;
817 return Ok(());
818 }
819 }
820 }
821
822 let entry = sessions.get_mut(&chat_id.0).unwrap();
823 entry.last_accessed = Instant::now();
824
825 let response = match entry.agent.chat_stream_with_tools(text, Vec::new()).await {
827 Ok(event_stream) => {
828 use futures::StreamExt;
829
830 let mut full_response = String::new();
831 let mut last_edit = Instant::now();
832 let mut pinned_stream = std::pin::pin!(event_stream);
833 let mut tool_info = String::new();
834
835 while let Some(event) = pinned_stream.next().await {
836 match event {
837 Ok(StreamEvent::Content(delta)) => {
838 full_response.push_str(&delta);
839
840 if last_edit.elapsed().as_secs() >= EDIT_DEBOUNCE_SECS {
842 let display = format_display(&full_response, &tool_info);
843 let _ = bot.edit_message_text(chat_id, msg_id, &display).await;
844 last_edit = Instant::now();
845 }
846 }
847 Ok(StreamEvent::ToolCallStart {
848 name, arguments, ..
849 }) => {
850 let detail = extract_tool_detail(&name, &arguments);
851 let info_line = if let Some(d) = detail {
852 format!("🔧 {}({})\n", name, d)
853 } else {
854 format!("🔧 {}\n", name)
855 };
856 tool_info.push_str(&info_line);
857
858 let display = format_display(&full_response, &tool_info);
859 let _ = bot.edit_message_text(chat_id, msg_id, &display).await;
860 last_edit = Instant::now();
861 }
862 Ok(StreamEvent::ToolCallEnd { name, warnings, .. }) => {
863 if !warnings.is_empty() {
864 for w in &warnings {
865 tool_info.push_str(&format!(
866 "\u{26a0} Suspicious content in {}: {}\n",
867 name, w
868 ));
869 }
870 let display = format_display(&full_response, &tool_info);
871 let _ = bot.edit_message_text(chat_id, msg_id, &display).await;
872 last_edit = Instant::now();
873 }
874 }
875 Ok(StreamEvent::Done) => break,
876 Ok(StreamEvent::ApprovalRequired { .. }) => {}
877 Err(e) => {
878 error!("Stream error: {}", e);
879 full_response.push_str(&format!("\n\nError: {}", e));
880 break;
881 }
882 }
883 }
884
885 if full_response.is_empty() {
886 "(no response)".to_string()
887 } else {
888 full_response
889 }
890 }
891 Err(e) => format!("Error: {}", e),
892 };
893
894 if let Err(e) = entry.agent.save_session_for_agent(TELEGRAM_AGENT_ID).await {
896 debug!("Failed to save telegram session: {}", e);
897 }
898
899 let outbox_ref = state.outbox.as_ref();
900 drop(sessions);
901
902 durable_send(bot, chat_id, Some(msg_id), &response, outbox_ref).await;
904
905 Ok(())
906}
907
908fn format_display(response: &str, tool_info: &str) -> String {
909 let mut display = String::new();
910 if !tool_info.is_empty() {
911 display.push_str(tool_info);
912 display.push('\n');
913 }
914 display.push_str(response);
915
916 if display.len() > MAX_MESSAGE_LENGTH {
918 display.truncate(MAX_MESSAGE_LENGTH - 3);
919 display.push_str("...");
920 }
921
922 display
923}
924
925async fn send_or_edit_html(bot: &Bot, chat_id: ChatId, msg_id: Option<MessageId>, text: &str) {
927 let html = markdown_to_html(text);
928 let result = if let Some(mid) = msg_id {
929 bot.edit_message_text(chat_id, mid, &html)
930 .parse_mode(ParseMode::Html)
931 .await
932 } else {
933 bot.send_message(chat_id, &html)
934 .parse_mode(ParseMode::Html)
935 .await
936 };
937
938 if result.is_err() {
940 if let Some(mid) = msg_id {
941 let _ = bot.edit_message_text(chat_id, mid, text).await;
942 } else {
943 let _ = bot.send_message(chat_id, text).await;
944 }
945 }
946}
947
948async fn send_long_message(bot: &Bot, chat_id: ChatId, edit_msg_id: Option<MessageId>, text: &str) {
949 if text.len() <= MAX_MESSAGE_LENGTH {
950 send_or_edit_html(bot, chat_id, edit_msg_id, text).await;
951 return;
952 }
953
954 let chunks = split_text_chunks(text);
956
957 if let Some(first) = chunks.first() {
959 send_or_edit_html(bot, chat_id, edit_msg_id, first).await;
960 }
961
962 for chunk in chunks.iter().skip(1) {
964 send_or_edit_html(bot, chat_id, None, chunk).await;
965 }
966}
967
968fn split_text_chunks(text: &str) -> Vec<&str> {
969 let mut chunks = Vec::new();
970 let mut start = 0;
971 while start < text.len() {
972 let mut end = (start + MAX_MESSAGE_LENGTH).min(text.len());
973 while end > start && !text.is_char_boundary(end) {
974 end -= 1;
975 }
976 chunks.push(&text[start..end]);
977 start = end;
978 }
979 chunks
980}
981
982async fn durable_send(
985 bot: &Bot,
986 chat_id: ChatId,
987 edit_msg_id: Option<MessageId>,
988 text: &str,
989 outbox: Option<&localgpt_core::outbox::Outbox>,
990) {
991 let outbox = match outbox {
992 Some(ob) => ob,
993 None => {
994 send_long_message(bot, chat_id, edit_msg_id, text).await;
996 return;
997 }
998 };
999
1000 let entry_id = match outbox.enqueue("telegram", &chat_id.0.to_string(), text, None) {
1002 Ok(id) => id,
1003 Err(e) => {
1004 warn!("Outbox enqueue failed: {}. Falling back to direct send.", e);
1005 send_long_message(bot, chat_id, edit_msg_id, text).await;
1006 return;
1007 }
1008 };
1009
1010 let html = markdown_to_html(text);
1012 let result = if text.len() <= MAX_MESSAGE_LENGTH {
1013 if let Some(mid) = edit_msg_id {
1014 bot.edit_message_text(chat_id, mid, &html)
1015 .parse_mode(ParseMode::Html)
1016 .await
1017 } else {
1018 bot.send_message(chat_id, &html)
1019 .parse_mode(ParseMode::Html)
1020 .await
1021 }
1022 } else {
1023 send_long_message(bot, chat_id, edit_msg_id, text).await;
1025 let _ = outbox.mark_delivered(&entry_id);
1026 return;
1027 };
1028
1029 match result {
1030 Ok(_) => {
1031 let _ = outbox.mark_delivered(&entry_id);
1032 }
1033 Err(e) => {
1034 let err_str = e.to_string();
1035 if localgpt_core::outbox::is_permanent_telegram_error(&err_str) {
1036 let _ = outbox.mark_permanent_failure(&entry_id, &err_str);
1037 } else {
1038 let _ = outbox.record_failure(&entry_id, &err_str);
1039 }
1040 if let Some(mid) = edit_msg_id {
1042 let _ = bot.edit_message_text(chat_id, mid, text).await;
1043 } else {
1044 let _ = bot.send_message(chat_id, text).await;
1045 }
1046 }
1047 }
1048}
1049
1050async fn outbox_retry_send(
1052 bot: &Bot,
1053 outbox: &localgpt_core::outbox::Outbox,
1054 entry: &localgpt_core::outbox::OutboxEntry,
1055) {
1056 let chat_id = match entry.target.parse::<i64>() {
1057 Ok(id) => ChatId(id),
1058 Err(_) => {
1059 let _ = outbox.mark_permanent_failure(&entry.id, "invalid chat_id");
1060 return;
1061 }
1062 };
1063
1064 let html = markdown_to_html(&entry.payload);
1065 let result = bot
1066 .send_message(chat_id, &html)
1067 .parse_mode(ParseMode::Html)
1068 .await;
1069
1070 match result {
1071 Ok(_) => {
1072 let _ = outbox.mark_delivered(&entry.id);
1073 }
1074 Err(e) => {
1075 let err_str = e.to_string();
1076 if localgpt_core::outbox::is_permanent_telegram_error(&err_str) {
1077 let _ = outbox.mark_permanent_failure(&entry.id, &err_str);
1078 } else {
1079 let _ = outbox.record_failure(&entry.id, &err_str);
1080 }
1081 }
1082 }
1083}