Skip to main content

localgpt_server/
telegram.rs

1//! Telegram bot interface for LocalGPT
2//!
3//! Provides a Telegram bot that allows interacting with LocalGPT remotely.
4//! Uses a one-time pairing code mechanism to restrict access to the owner.
5
6use anyhow::Result;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::path::PathBuf;
10use std::sync::Arc;
11use std::time::Instant;
12use teloxide::prelude::*;
13use teloxide::types::{ChatId, MessageId, ParseMode, ThreadId};
14use tokio::sync::Mutex;
15use tracing::{debug, error, info, warn};
16
17use localgpt_core::agent::{Agent, AgentConfig, StreamEvent, extract_tool_detail, tools::Tool};
18use localgpt_core::concurrency::TurnGate;
19use localgpt_core::config::Config;
20use localgpt_core::memory::MemoryManager;
21
22/// Agent ID for Telegram sessions
23const TELEGRAM_AGENT_ID: &str = "telegram";
24
25/// Maximum Telegram message length
26const MAX_MESSAGE_LENGTH: usize = 4096;
27
28/// Debounce interval for message edits (seconds)
29const EDIT_DEBOUNCE_SECS: u64 = 2;
30
31/// Factory function type for creating additional tools for the Telegram agent.
32/// This allows the caller (e.g., CLI daemon) to inject dangerous tools like bash, file I/O.
33pub type ToolFactory = Box<dyn Fn(&Config) -> Result<Vec<Box<dyn Tool>>> + Send + Sync>;
34
35#[derive(Debug, Serialize, Deserialize)]
36struct PairedUser {
37    user_id: u64,
38    username: Option<String>,
39    paired_at: String,
40}
41
42struct SessionEntry {
43    agent: Agent,
44    last_accessed: Instant,
45}
46
47struct BotState {
48    config: Config,
49    sessions: Mutex<HashMap<i64, SessionEntry>>,
50    memory: MemoryManager,
51    turn_gate: TurnGate,
52    paired_user: Mutex<Option<PairedUser>>,
53    pending_pairing_code: Mutex<Option<String>>,
54    tool_factory: Option<ToolFactory>,
55    outbox: Option<localgpt_core::outbox::Outbox>,
56}
57
58fn pairing_file_path() -> Result<PathBuf> {
59    let paths = localgpt_core::paths::Paths::resolve()?;
60    Ok(paths.pairing_file())
61}
62
63fn load_paired_user() -> Option<PairedUser> {
64    let path = pairing_file_path().ok()?;
65    let content = std::fs::read_to_string(path).ok()?;
66    serde_json::from_str(&content).ok()
67}
68
69/// Load the paired user's chat ID for sending notifications.
70/// In Telegram, private chat ID equals user ID for DMs.
71pub fn load_paired_chat_id() -> Option<i64> {
72    load_paired_user().map(|u| u.user_id as i64)
73}
74
75/// Create a heartbeat alert callback that sends messages to a Telegram topic.
76/// Returns None if no paired user or missing config.
77pub fn create_heartbeat_alert_callback(
78    api_token: &str,
79    topic_id: i32,
80) -> Option<localgpt_core::heartbeat::AlertCallback> {
81    let chat_id = load_paired_chat_id()?;
82    let bot = Bot::new(api_token);
83
84    Some(Box::new(move |text: &str| {
85        let bot = bot.clone();
86        let msg = if text.len() > 4000 {
87            format!("{}...", &text[..text.floor_char_boundary(4000)])
88        } else {
89            text.to_string()
90        };
91        tokio::spawn(async move {
92            let mut req = bot.send_message(ChatId(chat_id), &msg);
93            req = req.message_thread_id(ThreadId(MessageId(topic_id)));
94            if let Err(e) = req.await {
95                tracing::warn!("Failed to send heartbeat alert to Telegram topic: {}", e);
96            }
97        });
98    }))
99}
100
101fn save_paired_user(user: &PairedUser) -> Result<()> {
102    let path = pairing_file_path()?;
103    let content = serde_json::to_string_pretty(user)?;
104    std::fs::write(path, content)?;
105    Ok(())
106}
107
108fn generate_pairing_code() -> String {
109    use std::time::{SystemTime, UNIX_EPOCH};
110    let seed = SystemTime::now()
111        .duration_since(UNIX_EPOCH)
112        .unwrap_or_default()
113        .as_nanos();
114    // Simple LCG-based 6-digit code (not cryptographic, but fine for pairing)
115    let code = ((seed.wrapping_mul(6364136223846793005).wrapping_add(1)) % 900000 + 100000) as u32;
116    format!("{:06}", code)
117}
118
119pub async fn run_telegram_bot(
120    config: &Config,
121    turn_gate: TurnGate,
122    tool_factory: Option<ToolFactory>,
123) -> Result<()> {
124    let telegram_config = config
125        .telegram
126        .as_ref()
127        .ok_or_else(|| anyhow::anyhow!("Telegram config not found"))?;
128
129    if !telegram_config.enabled {
130        return Ok(());
131    }
132
133    let token = &telegram_config.api_token;
134    if token.is_empty() || token.starts_with("${") {
135        anyhow::bail!("Telegram API token not configured or not expanded");
136    }
137
138    let bot = Bot::new(token);
139
140    let memory =
141        MemoryManager::new_with_full_config(&config.memory, Some(config), TELEGRAM_AGENT_ID)?;
142
143    let paired_user = load_paired_user();
144    if let Some(ref user) = paired_user {
145        info!(
146            "Telegram bot: paired with user {} (ID: {})",
147            user.username.as_deref().unwrap_or("unknown"),
148            user.user_id
149        );
150    } else {
151        info!("Telegram bot: no paired user. Send any message to start pairing.");
152    }
153
154    // Initialize outbox if enabled
155    let outbox = if config.server.outbox_enabled {
156        let paths = localgpt_core::paths::Paths::resolve()?;
157        let outbox_db = paths.state_dir.join("outbox.sqlite");
158        match localgpt_core::outbox::Outbox::new_with_max_attempts(
159            &outbox_db,
160            config.server.outbox_max_attempts,
161        ) {
162            Ok(ob) => {
163                info!("Outbox enabled (db: {})", outbox_db.display());
164                Some(ob)
165            }
166            Err(e) => {
167                warn!("Failed to initialize outbox: {}. Proceeding without.", e);
168                None
169            }
170        }
171    } else {
172        None
173    };
174
175    let state = Arc::new(BotState {
176        config: config.clone(),
177        sessions: Mutex::new(HashMap::new()),
178        memory,
179        turn_gate,
180        paired_user: Mutex::new(paired_user),
181        pending_pairing_code: Mutex::new(None),
182        tool_factory,
183        outbox,
184    });
185
186    // Register bot commands so Telegram clients show the "/" menu
187    let commands: Vec<teloxide::types::BotCommand> = localgpt_core::commands::COMMANDS
188        .iter()
189        .filter(|c| c.supports(localgpt_core::commands::Interface::Telegram))
190        .map(|c| teloxide::types::BotCommand::new(c.name, c.description))
191        .collect();
192    if let Err(e) = bot.set_my_commands(commands).await {
193        warn!("Failed to set bot commands: {}", e);
194    }
195
196    // Run outbox recovery sweep and start background retry task
197    if let Some(ref outbox) = state.outbox {
198        match outbox.recovery_sweep() {
199            Ok(pending) if !pending.is_empty() => {
200                info!(
201                    "Outbox recovery: {} pending messages to retry",
202                    pending.len()
203                );
204                let retry_bot = bot.clone();
205                let retry_outbox = outbox.clone();
206                tokio::spawn(async move {
207                    for entry in pending {
208                        outbox_retry_send(&retry_bot, &retry_outbox, &entry).await;
209                    }
210                });
211            }
212            Ok(_) => debug!("Outbox recovery: no pending messages"),
213            Err(e) => warn!("Outbox recovery sweep failed: {}", e),
214        }
215
216        // Background retry task — polls every 5 seconds for messages ready to retry
217        let retry_bot = bot.clone();
218        let retry_outbox = outbox.clone();
219        let retain_days = config.server.outbox_retain_days;
220        tokio::spawn(async move {
221            let mut cleanup_counter = 0u32;
222            loop {
223                tokio::time::sleep(std::time::Duration::from_secs(5)).await;
224                if let Ok(Some(entry)) = retry_outbox.claim_next() {
225                    outbox_retry_send(&retry_bot, &retry_outbox, &entry).await;
226                }
227                // Periodic cleanup (every ~5 minutes = 60 iterations)
228                cleanup_counter += 1;
229                if cleanup_counter >= 60 {
230                    cleanup_counter = 0;
231                    let _ = retry_outbox.cleanup_delivered(retain_days);
232                }
233            }
234        });
235    }
236
237    info!("Starting Telegram bot...");
238
239    let handler = Update::filter_message().endpoint(handle_message);
240
241    Dispatcher::builder(bot, handler)
242        .default_handler(|_upd| async {})
243        .dependencies(dptree::deps![state])
244        .enable_ctrlc_handler()
245        .build()
246        .dispatch()
247        .await;
248
249    Ok(())
250}
251
252async fn handle_message(bot: Bot, msg: Message, state: Arc<BotState>) -> ResponseResult<()> {
253    let text = match msg.text() {
254        Some(t) => t.to_string(),
255        None => return Ok(()),
256    };
257
258    let user = match msg.from {
259        Some(ref u) => u,
260        None => return Ok(()),
261    };
262
263    let user_id = user.id.0;
264    let chat_id = msg.chat.id;
265
266    // Check pairing
267    {
268        let paired = state.paired_user.lock().await;
269        if let Some(ref pu) = *paired {
270            if pu.user_id != user_id {
271                bot.send_message(
272                    chat_id,
273                    "Not authorized. This bot is paired with another user.",
274                )
275                .await?;
276                return Ok(());
277            }
278        } else {
279            // Not paired yet - handle pairing flow
280            drop(paired);
281            return handle_pairing(bot, msg, &state, user_id, &text).await;
282        }
283    }
284
285    // Handle slash commands
286    if text.starts_with('/') {
287        return handle_command(&bot, chat_id, &state, &text).await;
288    }
289
290    // Regular chat message
291    handle_chat(&bot, chat_id, &state, &text).await
292}
293
294async fn handle_pairing(
295    bot: Bot,
296    msg: Message,
297    state: &Arc<BotState>,
298    user_id: u64,
299    text: &str,
300) -> ResponseResult<()> {
301    let chat_id = msg.chat.id;
302    let mut pending = state.pending_pairing_code.lock().await;
303
304    if let Some(ref code) = *pending {
305        // User is entering the pairing code
306        if text.trim() == code.as_str() {
307            // Pairing successful
308            let username = msg.from.as_ref().and_then(|u| u.username.clone());
309            let paired = PairedUser {
310                user_id,
311                username: username.clone(),
312                paired_at: chrono::Utc::now().to_rfc3339(),
313            };
314
315            if let Err(e) = save_paired_user(&paired) {
316                error!("Failed to save pairing: {}", e);
317                bot.send_message(chat_id, "Pairing failed (could not save). Check logs.")
318                    .await?;
319                return Ok(());
320            }
321
322            *state.paired_user.lock().await = Some(paired);
323            *pending = None;
324
325            info!(
326                "Telegram bot: paired with user {} (ID: {})",
327                username.as_deref().unwrap_or("unknown"),
328                user_id
329            );
330
331            bot.send_message(chat_id,
332                "Paired successfully! You can now chat with LocalGPT.\n\nUse /new to start a fresh session, /status to see session info.",
333            )
334            .await?;
335        } else {
336            bot.send_message(chat_id, "Invalid pairing code. Please try again.")
337                .await?;
338        }
339    } else {
340        // Generate new pairing code
341        let code = generate_pairing_code();
342        println!("\n========================================");
343        println!("  TELEGRAM PAIRING CODE: {}", code);
344        println!("========================================\n");
345        info!(
346            "Telegram pairing code generated for user {} (ID: {})",
347            msg.from
348                .as_ref()
349                .and_then(|u| u.username.as_deref())
350                .unwrap_or("unknown"),
351            user_id
352        );
353
354        *pending = Some(code);
355
356        bot.send_message(chat_id,
357            "Welcome! A pairing code has been printed to the daemon logs/stdout.\nPlease enter the code to pair this bot with your account.",
358        )
359        .await?;
360    }
361
362    Ok(())
363}
364
365async fn handle_command(
366    bot: &Bot,
367    chat_id: ChatId,
368    state: &Arc<BotState>,
369    text: &str,
370) -> ResponseResult<()> {
371    let parts: Vec<&str> = text.splitn(2, ' ').collect();
372    let cmd = parts[0];
373    let args = parts.get(1).map(|s| s.trim()).unwrap_or("");
374
375    match cmd {
376        "/start" | "/help" => {
377            let help = format!(
378                "LocalGPT Telegram Bot\n\n{}",
379                localgpt_core::commands::format_help_text(
380                    localgpt_core::commands::Interface::Telegram
381                )
382            );
383            bot.send_message(chat_id, &help).await?;
384        }
385        "/new" => {
386            let mut sessions = state.sessions.lock().await;
387            sessions.remove(&chat_id.0);
388            bot.send_message(
389                chat_id,
390                "Session cleared. Send a message to start a new conversation.",
391            )
392            .await?;
393        }
394        "/status" => {
395            let sessions = state.sessions.lock().await;
396            let status_text = if let Some(entry) = sessions.get(&chat_id.0) {
397                let status = entry.agent.session_status();
398                let (used, usable, total) = entry.agent.context_usage();
399                let mut text = format!(
400                    "Session active\n\
401                     Model: {}\n\
402                     Messages: {}\n\
403                     Tokens: {} / {} (window: {})\n\
404                     Compactions: {}\n\
405                     Idle: {}s",
406                    entry.agent.model(),
407                    status.message_count,
408                    used,
409                    usable,
410                    total,
411                    status.compaction_count,
412                    entry.last_accessed.elapsed().as_secs()
413                );
414                if status.search_queries > 0 {
415                    let cache_pct =
416                        (status.search_cached_hits as f64 / status.search_queries as f64) * 100.0;
417                    text.push_str(&format!(
418                        "\nSearch: {} queries ({} cached, {:.0}%) · ${:.3}",
419                        status.search_queries,
420                        status.search_cached_hits,
421                        cache_pct,
422                        status.search_cost_usd
423                    ));
424                }
425                text
426            } else {
427                "No active session. Send a message to start one.".to_string()
428            };
429            bot.send_message(chat_id, &status_text).await?;
430        }
431        "/compact" => {
432            let mut sessions = state.sessions.lock().await;
433            match sessions.get_mut(&chat_id.0) {
434                Some(entry) => {
435                    entry.last_accessed = Instant::now();
436                    match entry.agent.compact_session().await {
437                        Ok((before, after)) => {
438                            bot.send_message(
439                                chat_id,
440                                format!("Compacted: {} -> {} tokens", before, after),
441                            )
442                            .await?;
443                        }
444                        Err(e) => {
445                            bot.send_message(chat_id, format!("Compact failed: {}", e))
446                                .await?;
447                        }
448                    }
449                }
450                None => {
451                    bot.send_message(chat_id, "No active session.").await?;
452                }
453            }
454        }
455        "/clear" => {
456            let mut sessions = state.sessions.lock().await;
457            if let Some(entry) = sessions.get_mut(&chat_id.0) {
458                entry.agent.clear_session();
459                entry.last_accessed = Instant::now();
460                bot.send_message(chat_id, "Session history cleared.")
461                    .await?;
462            } else {
463                bot.send_message(chat_id, "No active session.").await?;
464            }
465        }
466        "/memory" => {
467            if args.is_empty() {
468                bot.send_message(chat_id, "Usage: /memory <search query>")
469                    .await?;
470            } else {
471                match state.memory.search(args, 5) {
472                    Ok(results) => {
473                        if results.is_empty() {
474                            bot.send_message(chat_id, "No results found.").await?;
475                        } else {
476                            let mut text = format!("Memory search: \"{}\"\n\n", args);
477                            for (i, r) in results.iter().enumerate() {
478                                text.push_str(&format!(
479                                    "{}. {} (L{}-{})\n{}\n\n",
480                                    i + 1,
481                                    r.file,
482                                    r.line_start,
483                                    r.line_end,
484                                    truncate_str(&r.content, 300),
485                                ));
486                            }
487                            send_long_message(bot, chat_id, None, &text).await;
488                        }
489                    }
490                    Err(e) => {
491                        bot.send_message(chat_id, format!("Search error: {}", e))
492                            .await?;
493                    }
494                }
495            }
496        }
497        "/model" => {
498            if args.is_empty() {
499                let sessions = state.sessions.lock().await;
500                let current = sessions
501                    .get(&chat_id.0)
502                    .map(|e| e.agent.model().to_string())
503                    .unwrap_or_else(|| state.config.agent.default_model.clone());
504                bot.send_message(
505                    chat_id,
506                    format!("Current model: {}\n\nUsage: /model <name>", current),
507                )
508                .await?;
509            } else {
510                let mut sessions = state.sessions.lock().await;
511                if let Some(entry) = sessions.get_mut(&chat_id.0) {
512                    match entry.agent.set_model(args) {
513                        Ok(()) => {
514                            bot.send_message(chat_id, format!("Switched to model: {}", args))
515                                .await?;
516                        }
517                        Err(e) => {
518                            bot.send_message(chat_id, format!("Failed to switch model: {}", e))
519                                .await?;
520                        }
521                    }
522                } else {
523                    bot.send_message(
524                        chat_id,
525                        "No active session. Send a message first, then switch models.",
526                    )
527                    .await?;
528                }
529            }
530        }
531        "/skills" => {
532            let workspace_path = state.config.workspace_path();
533            match localgpt_core::agent::load_skills(&workspace_path) {
534                Ok(skills) => {
535                    if skills.is_empty() {
536                        bot.send_message(chat_id, "No skills installed.").await?;
537                    } else {
538                        let summary = localgpt_core::agent::get_skills_summary(&skills);
539                        bot.send_message(chat_id, &summary).await?;
540                    }
541                }
542                Err(e) => {
543                    bot.send_message(chat_id, format!("Failed to load skills: {}", e))
544                        .await?;
545                }
546            }
547        }
548        "/unpair" => {
549            *state.paired_user.lock().await = None;
550            if let Ok(path) = pairing_file_path() {
551                let _ = std::fs::remove_file(path);
552            }
553            let mut sessions = state.sessions.lock().await;
554            sessions.remove(&chat_id.0);
555            info!("Telegram bot: user unpaired");
556            bot.send_message(
557                chat_id,
558                "Unpaired. Send any message to start a new pairing.",
559            )
560            .await?;
561        }
562        _ => {
563            bot.send_message(
564                chat_id,
565                "Unknown command. Use /help for available commands.",
566            )
567            .await?;
568        }
569    }
570
571    Ok(())
572}
573
574fn truncate_str(s: &str, max: usize) -> &str {
575    if s.len() <= max {
576        s
577    } else {
578        // Find a char boundary
579        let mut end = max;
580        while end > 0 && !s.is_char_boundary(end) {
581            end -= 1;
582        }
583        &s[..end]
584    }
585}
586
587/// Escape text for Telegram HTML parse mode.
588fn escape_html(text: &str) -> String {
589    text.replace('&', "&amp;")
590        .replace('<', "&lt;")
591        .replace('>', "&gt;")
592}
593
594/// Convert markdown to Telegram-compatible HTML.
595/// Handles: code blocks, inline code, bold, italic, links, headers.
596/// Unrecognized markup passes through as escaped HTML.
597fn markdown_to_html(text: &str) -> String {
598    let mut result = String::with_capacity(text.len());
599    let mut in_code_block = false;
600    let mut code_block_lang = String::new();
601    let mut code_block_content = String::new();
602
603    for line in text.lines() {
604        if in_code_block {
605            if line.starts_with("```") {
606                // Close code block
607                let lang_attr = if code_block_lang.is_empty() {
608                    String::new()
609                } else {
610                    format!(" class=\"language-{}\"", escape_html(&code_block_lang))
611                };
612                result.push_str(&format!(
613                    "<pre><code{}>{}</code></pre>\n",
614                    lang_attr,
615                    escape_html(&code_block_content)
616                ));
617                code_block_content.clear();
618                code_block_lang.clear();
619                in_code_block = false;
620            } else {
621                if !code_block_content.is_empty() {
622                    code_block_content.push('\n');
623                }
624                code_block_content.push_str(line);
625            }
626            continue;
627        }
628
629        if let Some(rest) = line.strip_prefix("```") {
630            in_code_block = true;
631            code_block_lang = rest.trim().to_string();
632            continue;
633        }
634
635        // Headers → bold
636        let line = if let Some(rest) = line.strip_prefix("### ") {
637            format!("<b>{}</b>", escape_html(rest))
638        } else if let Some(rest) = line.strip_prefix("## ") {
639            format!("<b>{}</b>", escape_html(rest))
640        } else if let Some(rest) = line.strip_prefix("# ") {
641            format!("<b>{}</b>", escape_html(rest))
642        } else {
643            convert_inline_markdown(line)
644        };
645
646        result.push_str(&line);
647        result.push('\n');
648    }
649
650    // Handle unclosed code block
651    if in_code_block {
652        result.push_str(&format!(
653            "<pre><code>{}</code></pre>\n",
654            escape_html(&code_block_content)
655        ));
656    }
657
658    result
659}
660
661/// Convert inline markdown elements: `code`, **bold**, *italic*, [links](url)
662fn convert_inline_markdown(line: &str) -> String {
663    let mut result = String::new();
664    let chars: Vec<char> = line.chars().collect();
665    let len = chars.len();
666    let mut i = 0;
667
668    while i < len {
669        // Inline code: `...`
670        if chars[i] == '`'
671            && let Some(end) = chars[i + 1..].iter().position(|&c| c == '`')
672        {
673            let code: String = chars[i + 1..i + 1 + end].iter().collect();
674            result.push_str(&format!("<code>{}</code>", escape_html(&code)));
675            i += end + 2;
676            continue;
677        }
678
679        // Bold: **...**
680        if i + 1 < len
681            && chars[i] == '*'
682            && chars[i + 1] == '*'
683            && let Some(end) = find_closing(&chars, i + 2, &['*', '*'])
684        {
685            let inner: String = chars[i + 2..end].iter().collect();
686            result.push_str(&format!("<b>{}</b>", escape_html(&inner)));
687            i = end + 2;
688            continue;
689        }
690
691        // Italic: *...*
692        if chars[i] == '*'
693            && let Some(end) = chars[i + 1..].iter().position(|&c| c == '*')
694        {
695            let inner: String = chars[i + 1..i + 1 + end].iter().collect();
696            result.push_str(&format!("<i>{}</i>", escape_html(&inner)));
697            i += end + 2;
698            continue;
699        }
700
701        // Link: [text](url)
702        if chars[i] == '['
703            && let Some(close_bracket) = chars[i + 1..].iter().position(|&c| c == ']')
704        {
705            let text_end = i + 1 + close_bracket;
706            if text_end + 1 < len
707                && chars[text_end + 1] == '('
708                && let Some(close_paren) = chars[text_end + 2..].iter().position(|&c| c == ')')
709            {
710                let text: String = chars[i + 1..text_end].iter().collect();
711                let url: String = chars[text_end + 2..text_end + 2 + close_paren]
712                    .iter()
713                    .collect();
714                result.push_str(&format!(
715                    "<a href=\"{}\">{}</a>",
716                    escape_html(&url),
717                    escape_html(&text)
718                ));
719                i = text_end + 2 + close_paren + 1;
720                continue;
721            }
722        }
723
724        // Regular character
725        match chars[i] {
726            '&' => result.push_str("&amp;"),
727            '<' => result.push_str("&lt;"),
728            '>' => result.push_str("&gt;"),
729            c => result.push(c),
730        }
731        i += 1;
732    }
733
734    result
735}
736
737/// Find closing delimiter (e.g., ** for bold) starting from `start`.
738fn find_closing(chars: &[char], start: usize, delim: &[char]) -> Option<usize> {
739    let dlen = delim.len();
740    if start + dlen > chars.len() {
741        return None;
742    }
743    for i in start..chars.len() - dlen + 1 {
744        if chars[i..i + dlen] == *delim {
745            return Some(i);
746        }
747    }
748    None
749}
750
751async fn handle_chat(
752    bot: &Bot,
753    chat_id: ChatId,
754    state: &Arc<BotState>,
755    text: &str,
756) -> ResponseResult<()> {
757    // Send initial "thinking" message
758    let thinking_msg = bot.send_message(chat_id, "Thinking...").await?;
759    let msg_id = thinking_msg.id;
760
761    // Acquire turn gate
762    let _gate_permit = state.turn_gate.acquire().await;
763
764    // Get or create agent session, then stream response
765    let mut sessions = state.sessions.lock().await;
766
767    if let std::collections::hash_map::Entry::Vacant(e) = sessions.entry(chat_id.0) {
768        let agent_config = AgentConfig {
769            model: state.config.agent.default_model.clone(),
770            context_window: state.config.agent.context_window,
771            reserve_tokens: state.config.agent.reserve_tokens,
772        };
773
774        let memory = std::sync::Arc::new(state.memory.clone());
775        match Agent::new(agent_config, &state.config, memory).await {
776            Ok(mut agent) => {
777                // Extend agent with additional tools from factory if provided (e.g., CLI tools from daemon)
778                if let Some(ref factory) = state.tool_factory {
779                    match factory(&state.config) {
780                        Ok(extra_tools) => {
781                            agent.extend_tools(extra_tools);
782                        }
783                        Err(err) => {
784                            error!("Failed to create additional tools: {}", err);
785                        }
786                    }
787                }
788
789                if let Err(err) = agent.new_session().await {
790                    error!("Failed to create session: {}", err);
791                    let _ = bot
792                        .edit_message_text(chat_id, msg_id, format!("Error: {}", err))
793                        .await;
794                    return Ok(());
795                }
796
797                // Send welcome message on first run
798                let is_brand_new = agent.is_brand_new();
799                if is_brand_new {
800                    let html = markdown_to_html(localgpt_core::agent::FIRST_RUN_WELCOME);
801                    let _ = bot
802                        .send_message(chat_id, html)
803                        .parse_mode(ParseMode::Html)
804                        .await;
805                }
806
807                e.insert(SessionEntry {
808                    agent,
809                    last_accessed: Instant::now(),
810                });
811            }
812            Err(err) => {
813                error!("Failed to create agent: {}", err);
814                let _ = bot
815                    .edit_message_text(chat_id, msg_id, format!("Error: {}", err))
816                    .await;
817                return Ok(());
818            }
819        }
820    }
821
822    let entry = sessions.get_mut(&chat_id.0).unwrap();
823    entry.last_accessed = Instant::now();
824
825    // Use streaming with tools
826    let response = match entry.agent.chat_stream_with_tools(text, Vec::new()).await {
827        Ok(event_stream) => {
828            use futures::StreamExt;
829
830            let mut full_response = String::new();
831            let mut last_edit = Instant::now();
832            let mut pinned_stream = std::pin::pin!(event_stream);
833            let mut tool_info = String::new();
834
835            while let Some(event) = pinned_stream.next().await {
836                match event {
837                    Ok(StreamEvent::Content(delta)) => {
838                        full_response.push_str(&delta);
839
840                        // Debounced edit
841                        if last_edit.elapsed().as_secs() >= EDIT_DEBOUNCE_SECS {
842                            let display = format_display(&full_response, &tool_info);
843                            let _ = bot.edit_message_text(chat_id, msg_id, &display).await;
844                            last_edit = Instant::now();
845                        }
846                    }
847                    Ok(StreamEvent::ToolCallStart {
848                        name, arguments, ..
849                    }) => {
850                        let detail = extract_tool_detail(&name, &arguments);
851                        let info_line = if let Some(d) = detail {
852                            format!("🔧 {}({})\n", name, d)
853                        } else {
854                            format!("🔧 {}\n", name)
855                        };
856                        tool_info.push_str(&info_line);
857
858                        let display = format_display(&full_response, &tool_info);
859                        let _ = bot.edit_message_text(chat_id, msg_id, &display).await;
860                        last_edit = Instant::now();
861                    }
862                    Ok(StreamEvent::ToolCallEnd { name, warnings, .. }) => {
863                        if !warnings.is_empty() {
864                            for w in &warnings {
865                                tool_info.push_str(&format!(
866                                    "\u{26a0} Suspicious content in {}: {}\n",
867                                    name, w
868                                ));
869                            }
870                            let display = format_display(&full_response, &tool_info);
871                            let _ = bot.edit_message_text(chat_id, msg_id, &display).await;
872                            last_edit = Instant::now();
873                        }
874                    }
875                    Ok(StreamEvent::Done) => break,
876                    Ok(StreamEvent::ApprovalRequired { .. }) => {}
877                    Err(e) => {
878                        error!("Stream error: {}", e);
879                        full_response.push_str(&format!("\n\nError: {}", e));
880                        break;
881                    }
882                }
883            }
884
885            if full_response.is_empty() {
886                "(no response)".to_string()
887            } else {
888                full_response
889            }
890        }
891        Err(e) => format!("Error: {}", e),
892    };
893
894    // Save session before releasing lock
895    if let Err(e) = entry.agent.save_session_for_agent(TELEGRAM_AGENT_ID).await {
896        debug!("Failed to save telegram session: {}", e);
897    }
898
899    let outbox_ref = state.outbox.as_ref();
900    drop(sessions);
901
902    // Final edit with complete response (durable if outbox enabled)
903    durable_send(bot, chat_id, Some(msg_id), &response, outbox_ref).await;
904
905    Ok(())
906}
907
908fn format_display(response: &str, tool_info: &str) -> String {
909    let mut display = String::new();
910    if !tool_info.is_empty() {
911        display.push_str(tool_info);
912        display.push('\n');
913    }
914    display.push_str(response);
915
916    // Truncate for Telegram limit
917    if display.len() > MAX_MESSAGE_LENGTH {
918        display.truncate(MAX_MESSAGE_LENGTH - 3);
919        display.push_str("...");
920    }
921
922    display
923}
924
925/// Send/edit agent response as HTML-converted markdown.
926async fn send_or_edit_html(bot: &Bot, chat_id: ChatId, msg_id: Option<MessageId>, text: &str) {
927    let html = markdown_to_html(text);
928    let result = if let Some(mid) = msg_id {
929        bot.edit_message_text(chat_id, mid, &html)
930            .parse_mode(ParseMode::Html)
931            .await
932    } else {
933        bot.send_message(chat_id, &html)
934            .parse_mode(ParseMode::Html)
935            .await
936    };
937
938    // Fallback to plain text on conversion issues
939    if result.is_err() {
940        if let Some(mid) = msg_id {
941            let _ = bot.edit_message_text(chat_id, mid, text).await;
942        } else {
943            let _ = bot.send_message(chat_id, text).await;
944        }
945    }
946}
947
948async fn send_long_message(bot: &Bot, chat_id: ChatId, edit_msg_id: Option<MessageId>, text: &str) {
949    if text.len() <= MAX_MESSAGE_LENGTH {
950        send_or_edit_html(bot, chat_id, edit_msg_id, text).await;
951        return;
952    }
953
954    // Split into chunks at char boundaries
955    let chunks = split_text_chunks(text);
956
957    // First chunk: edit existing message or send new
958    if let Some(first) = chunks.first() {
959        send_or_edit_html(bot, chat_id, edit_msg_id, first).await;
960    }
961
962    // Remaining chunks as new messages
963    for chunk in chunks.iter().skip(1) {
964        send_or_edit_html(bot, chat_id, None, chunk).await;
965    }
966}
967
968fn split_text_chunks(text: &str) -> Vec<&str> {
969    let mut chunks = Vec::new();
970    let mut start = 0;
971    while start < text.len() {
972        let mut end = (start + MAX_MESSAGE_LENGTH).min(text.len());
973        while end > start && !text.is_char_boundary(end) {
974            end -= 1;
975        }
976        chunks.push(&text[start..end]);
977        start = end;
978    }
979    chunks
980}
981
982/// Send a message durably via the outbox: enqueue → send → mark delivered/failed.
983/// If outbox is not configured, falls back to direct send.
984async fn durable_send(
985    bot: &Bot,
986    chat_id: ChatId,
987    edit_msg_id: Option<MessageId>,
988    text: &str,
989    outbox: Option<&localgpt_core::outbox::Outbox>,
990) {
991    let outbox = match outbox {
992        Some(ob) => ob,
993        None => {
994            // No outbox — direct send (original behavior)
995            send_long_message(bot, chat_id, edit_msg_id, text).await;
996            return;
997        }
998    };
999
1000    // Enqueue before sending
1001    let entry_id = match outbox.enqueue("telegram", &chat_id.0.to_string(), text, None) {
1002        Ok(id) => id,
1003        Err(e) => {
1004            warn!("Outbox enqueue failed: {}. Falling back to direct send.", e);
1005            send_long_message(bot, chat_id, edit_msg_id, text).await;
1006            return;
1007        }
1008    };
1009
1010    // Attempt send
1011    let html = markdown_to_html(text);
1012    let result = if text.len() <= MAX_MESSAGE_LENGTH {
1013        if let Some(mid) = edit_msg_id {
1014            bot.edit_message_text(chat_id, mid, &html)
1015                .parse_mode(ParseMode::Html)
1016                .await
1017        } else {
1018            bot.send_message(chat_id, &html)
1019                .parse_mode(ParseMode::Html)
1020                .await
1021        }
1022    } else {
1023        // For long messages, send directly (chunking is complex to track per-chunk)
1024        send_long_message(bot, chat_id, edit_msg_id, text).await;
1025        let _ = outbox.mark_delivered(&entry_id);
1026        return;
1027    };
1028
1029    match result {
1030        Ok(_) => {
1031            let _ = outbox.mark_delivered(&entry_id);
1032        }
1033        Err(e) => {
1034            let err_str = e.to_string();
1035            if localgpt_core::outbox::is_permanent_telegram_error(&err_str) {
1036                let _ = outbox.mark_permanent_failure(&entry_id, &err_str);
1037            } else {
1038                let _ = outbox.record_failure(&entry_id, &err_str);
1039            }
1040            // Try plain text fallback
1041            if let Some(mid) = edit_msg_id {
1042                let _ = bot.edit_message_text(chat_id, mid, text).await;
1043            } else {
1044                let _ = bot.send_message(chat_id, text).await;
1045            }
1046        }
1047    }
1048}
1049
1050/// Retry sending a pending outbox entry.
1051async fn outbox_retry_send(
1052    bot: &Bot,
1053    outbox: &localgpt_core::outbox::Outbox,
1054    entry: &localgpt_core::outbox::OutboxEntry,
1055) {
1056    let chat_id = match entry.target.parse::<i64>() {
1057        Ok(id) => ChatId(id),
1058        Err(_) => {
1059            let _ = outbox.mark_permanent_failure(&entry.id, "invalid chat_id");
1060            return;
1061        }
1062    };
1063
1064    let html = markdown_to_html(&entry.payload);
1065    let result = bot
1066        .send_message(chat_id, &html)
1067        .parse_mode(ParseMode::Html)
1068        .await;
1069
1070    match result {
1071        Ok(_) => {
1072            let _ = outbox.mark_delivered(&entry.id);
1073        }
1074        Err(e) => {
1075            let err_str = e.to_string();
1076            if localgpt_core::outbox::is_permanent_telegram_error(&err_str) {
1077                let _ = outbox.mark_permanent_failure(&entry.id, &err_str);
1078            } else {
1079                let _ = outbox.record_failure(&entry.id, &err_str);
1080            }
1081        }
1082    }
1083}