Skip to main content

astrid_telegram/
handler.rs

1//! Message handler: receives text from Telegram, sends to daemon, starts
2//! event loop.
3
4use std::fmt::Write as _;
5use std::path::PathBuf;
6use std::sync::Arc;
7
8use teloxide::prelude::*;
9use teloxide::types::ParseMode;
10use tracing::{info, warn};
11
12use crate::approval::ApprovalManager;
13use crate::client::DaemonClient;
14use crate::config::TelegramConfig;
15use crate::elicitation::ElicitationManager;
16use crate::event_loop::run_event_loop;
17use crate::session::{SessionMap, TurnStartResult};
18
19/// Shared bot state passed to all handlers.
20#[derive(Clone)]
21pub struct BotState {
22    /// The connected daemon proxy client.
23    pub daemon: Arc<DaemonClient>,
24    /// Active Telegram sessions mapping.
25    pub sessions: SessionMap,
26    /// Telegram bot configuration details.
27    pub config: Arc<TelegramConfig>,
28    /// Flow manager for pending human approvals.
29    pub approvals: ApprovalManager,
30    /// Flow manager for pending human elicitations.
31    pub elicitations: ElicitationManager,
32}
33
34/// Handle an incoming text message.
35pub async fn handle_message(bot: Bot, msg: Message, state: BotState) -> anyhow::Result<()> {
36    let Some(text) = msg.text() else {
37        return Ok(());
38    };
39
40    let chat_id = msg.chat.id;
41
42    // Access control: check user identity against the allowlist.
43    // If msg.from is absent (channel posts, etc.) and an allowlist is set,
44    // deny access since we can't verify the sender.
45    let user_allowed = match &msg.from {
46        Some(user) => state.config.is_user_allowed(user.id.0),
47        None => state.config.allowed_user_ids.is_empty(),
48    };
49    if !user_allowed {
50        let _ = bot
51            .send_message(chat_id, "You are not authorized to use this bot.")
52            .await;
53        return Ok(());
54    }
55
56    // Check if this is an elicitation text reply.
57    if state
58        .elicitations
59        .handle_text_reply(chat_id, text, &state.daemon, &state.sessions)
60        .await
61    {
62        return Ok(());
63    }
64
65    // Handle bot commands.
66    if text.starts_with('/') {
67        return handle_command(&bot, chat_id, text, &state).await;
68    }
69
70    // Ensure session exists and atomically start the turn.
71    let session_id = match acquire_session_and_start_turn(&state, chat_id).await {
72        Ok(sid) => sid,
73        Err(msg) => {
74            let _ = bot.send_message(chat_id, msg).await;
75            return Ok(());
76        },
77    };
78
79    // Send "Thinking..." placeholder.
80    let placeholder = match bot.send_message(chat_id, "Thinking...").await {
81        Ok(msg) => msg,
82        Err(e) => {
83            warn!("Failed to send placeholder: {e}");
84            state.sessions.set_turn_in_progress(chat_id, false).await;
85            return Ok(());
86        },
87    };
88
89    // Send typing indicator.
90    let _ = bot
91        .send_chat_action(chat_id, teloxide::types::ChatAction::Typing)
92        .await;
93
94    // Send input to daemon.
95    if let Err(e) = state.daemon.send_input(&session_id, text).await {
96        let _ = bot
97            .edit_message_text(chat_id, placeholder.id, format!("Error: {e}"))
98            .await;
99        state.sessions.set_turn_in_progress(chat_id, false).await;
100        return Ok(());
101    }
102
103    // Subscribe to events.
104    let subscription = match state.daemon.subscribe_events(&session_id).await {
105        Ok(sub) => sub,
106        Err(e) => {
107            let _ = bot
108                .edit_message_text(chat_id, placeholder.id, format!("Failed to subscribe: {e}"))
109                .await;
110            state.sessions.set_turn_in_progress(chat_id, false).await;
111            return Ok(());
112        },
113    };
114
115    // Spawn event loop task.
116    tokio::spawn(run_event_loop(
117        bot.clone(),
118        chat_id,
119        placeholder.id,
120        subscription,
121        state.sessions.clone(),
122        state.approvals.clone(),
123        state.elicitations.clone(),
124    ));
125
126    Ok(())
127}
128
129/// Ensure a session exists for `chat_id` and atomically start a turn.
130///
131/// Returns `Ok(session_id)` on success, or `Err(user_message)` with a message
132/// to display to the user.
133async fn acquire_session_and_start_turn(
134    state: &BotState,
135    chat_id: ChatId,
136) -> Result<astrid_core::SessionId, &'static str> {
137    match state.sessions.try_start_existing_turn(chat_id).await {
138        TurnStartResult::Started(sid) => return Ok(sid),
139        TurnStartResult::TurnBusy => {
140            return Err("A turn is already in progress. Please wait.");
141        },
142        TurnStartResult::NoSession => {},
143    }
144
145    // No session — claim creation lock to prevent duplicate create_session
146    // calls when concurrent messages race for the same chat.
147    if !state.sessions.try_claim_creation(chat_id).await {
148        return Err("Session is being created. Please wait.");
149    }
150
151    let workspace = state.config.workspace_path.as_ref().map(PathBuf::from);
152    match state.daemon.create_session(workspace).await {
153        Ok(session_info) => {
154            info!("Created session {} for chat {}", session_info.id, chat_id);
155            // Atomically insert session + start turn under one lock to
156            // prevent another message from stealing the turn in between.
157            let sid = state
158                .sessions
159                .finish_creation_and_start_turn(chat_id, session_info.id)
160                .await;
161            Ok(sid)
162        },
163        Err(e) => {
164            warn!("Failed to create session for chat {chat_id}: {e}");
165            state.sessions.cancel_creation(chat_id).await;
166            Err("Failed to create session. Please try again.")
167        },
168    }
169}
170
171/// Handle bot commands.
172async fn handle_command(
173    bot: &Bot,
174    chat_id: ChatId,
175    text: &str,
176    state: &BotState,
177) -> anyhow::Result<()> {
178    let cmd = text.split_whitespace().next().unwrap_or("");
179
180    match cmd {
181        "/start" => {
182            let msg = "Welcome to Astrid! Send me a message and I'll process it \
183                       through the agent runtime.\n\n\
184                       Commands:\n\
185                       /help - Show this help\n\
186                       /reset - Reset session\n\
187                       /status - Daemon status\n\
188                       /cancel - Cancel current turn";
189            let _ = bot.send_message(chat_id, msg).await;
190        },
191        "/help" => {
192            let msg = "<b>Astrid Telegram Bot</b>\n\n\
193                       Send any text message to interact with the agent.\n\n\
194                       <b>Commands:</b>\n\
195                       /start - Welcome message\n\
196                       /help - This help text\n\
197                       /reset - End current session and start fresh\n\
198                       /status - Show daemon status and budget\n\
199                       /cancel - Cancel the current turn";
200            let _ = bot
201                .send_message(chat_id, msg)
202                .parse_mode(ParseMode::Html)
203                .await;
204        },
205        "/reset" => {
206            if let Some(session_id) = state.sessions.remove(chat_id).await {
207                let _ = state.daemon.end_session(&session_id).await;
208            }
209            let _ = bot.send_message(chat_id, "Session reset.").await;
210        },
211        "/status" => match state.daemon.status().await {
212            Ok(status) => {
213                let mut msg = format!(
214                    "<b>Daemon Status</b>\n\
215                         Uptime: {}s\n\
216                         Active sessions: {}\n\
217                         Version: {}",
218                    status.uptime_secs,
219                    status.active_sessions,
220                    crate::format::html_escape(&status.version),
221                );
222
223                if let Some(session_id) = state.sessions.get_session_id(chat_id).await
224                    && let Ok(budget) = state.daemon.session_budget(&session_id).await
225                {
226                    let _ = write!(
227                        msg,
228                        "\n\n<b>Budget</b>\n\
229                             Spent: ${:.4}\n\
230                             Remaining: ${:.4}\n\
231                             Limit: ${:.4}",
232                        budget.session_spent_usd,
233                        budget.session_remaining_usd,
234                        budget.session_max_usd,
235                    );
236                }
237
238                let _ = bot
239                    .send_message(chat_id, msg)
240                    .parse_mode(ParseMode::Html)
241                    .await;
242            },
243            Err(e) => {
244                let _ = bot
245                    .send_message(chat_id, format!("Failed to get status: {e}"))
246                    .await;
247            },
248        },
249        "/cancel" => {
250            if let Some(session_id) = state.sessions.get_session_id(chat_id).await {
251                match state.daemon.cancel_turn(&session_id).await {
252                    Ok(()) => {
253                        state.sessions.set_turn_in_progress(chat_id, false).await;
254                        let _ = bot.send_message(chat_id, "Turn cancelled.").await;
255                    },
256                    Err(e) => {
257                        let _ = bot
258                            .send_message(chat_id, format!("Failed to cancel: {e}"))
259                            .await;
260                    },
261                }
262            } else {
263                let _ = bot.send_message(chat_id, "No active session.").await;
264            }
265        },
266        _ => {
267            let _ = bot
268                .send_message(chat_id, "Unknown command. Try /help.")
269                .await;
270        },
271    }
272
273    Ok(())
274}