Skip to main content

ralph_telegram/
service.rs

1use std::fmt;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::time::{Duration, Instant};
6
7use chrono::Utc;
8use tracing::{debug, info, warn};
9
10use crate::bot::TelegramBot;
11use crate::error::{TelegramError, TelegramResult};
12use crate::handler::MessageHandler;
13use crate::state::StateManager;
14
15/// Maximum number of retry attempts for sending messages.
16pub const MAX_SEND_RETRIES: u32 = 3;
17
18/// Base delay for exponential backoff (1 second).
19pub const BASE_RETRY_DELAY: Duration = Duration::from_secs(1);
20
21/// Execute a fallible send operation with exponential backoff retry.
22///
23/// Retries up to [`MAX_SEND_RETRIES`] times with delays of 1s, 2s, 4s.
24/// Returns the result on success, or `TelegramError::Send` after all
25/// retries are exhausted.
26///
27/// The `sleep_fn` parameter allows tests to substitute a no-op sleep.
28pub fn retry_with_backoff<F, S>(mut send_fn: F, mut sleep_fn: S) -> TelegramResult<i32>
29where
30    F: FnMut(u32) -> TelegramResult<i32>,
31    S: FnMut(Duration),
32{
33    let mut last_error = String::new();
34
35    for attempt in 1..=MAX_SEND_RETRIES {
36        match send_fn(attempt) {
37            Ok(msg_id) => return Ok(msg_id),
38            Err(e) => {
39                last_error = e.to_string();
40                warn!(
41                    attempt = attempt,
42                    max_retries = MAX_SEND_RETRIES,
43                    error = %last_error,
44                    "Telegram send failed, {}",
45                    if attempt < MAX_SEND_RETRIES {
46                        "retrying with backoff"
47                    } else {
48                        "all retries exhausted"
49                    }
50                );
51                if attempt < MAX_SEND_RETRIES {
52                    let delay = BASE_RETRY_DELAY * 2u32.pow(attempt - 1);
53                    sleep_fn(delay);
54                }
55            }
56        }
57    }
58
59    Err(TelegramError::Send {
60        attempts: MAX_SEND_RETRIES,
61        reason: last_error,
62    })
63}
64
65/// Additional context for enhanced check-in messages.
66///
67/// Provides richer information than the basic iteration + elapsed time,
68/// including current hat, task progress, and cost tracking.
69#[derive(Debug, Default)]
70pub struct CheckinContext {
71    /// The currently active hat name (e.g., "executor", "reviewer").
72    pub current_hat: Option<String>,
73    /// Number of open (non-terminal) tasks.
74    pub open_tasks: usize,
75    /// Number of closed tasks.
76    pub closed_tasks: usize,
77    /// Cumulative cost in USD.
78    pub cumulative_cost: f64,
79}
80
81/// Coordinates the Telegram bot lifecycle with the Ralph event loop.
82///
83/// Manages startup, shutdown, message sending, and response waiting.
84/// Uses the host tokio runtime (from `#[tokio::main]`) for async operations.
85pub struct TelegramService {
86    workspace_root: PathBuf,
87    bot_token: String,
88    api_url: Option<String>,
89    timeout_secs: u64,
90    loop_id: String,
91    state_manager: StateManager,
92    handler: MessageHandler,
93    bot: TelegramBot,
94    shutdown: Arc<AtomicBool>,
95}
96
97impl TelegramService {
98    /// Create a new TelegramService.
99    ///
100    /// Resolves the bot token from config or `RALPH_TELEGRAM_BOT_TOKEN` env var.
101    /// When `api_url` is provided, all Telegram API requests target that URL
102    /// instead of the default `https://api.telegram.org`.
103    pub fn new(
104        workspace_root: PathBuf,
105        bot_token: Option<String>,
106        api_url: Option<String>,
107        timeout_secs: u64,
108        loop_id: String,
109    ) -> TelegramResult<Self> {
110        let resolved_token = bot_token
111            .or_else(|| std::env::var("RALPH_TELEGRAM_BOT_TOKEN").ok())
112            .ok_or(TelegramError::MissingBotToken)?;
113
114        let state_path = workspace_root.join(".ralph/telegram-state.json");
115        let state_manager = StateManager::new(&state_path);
116        let handler_state_manager = StateManager::new(&state_path);
117        let handler = MessageHandler::new(handler_state_manager, &workspace_root);
118        let bot = TelegramBot::new(&resolved_token, api_url.as_deref());
119        let shutdown = Arc::new(AtomicBool::new(false));
120
121        Ok(Self {
122            workspace_root,
123            bot_token: resolved_token,
124            api_url,
125            timeout_secs,
126            loop_id,
127            state_manager,
128            handler,
129            bot,
130            shutdown,
131        })
132    }
133
134    /// Get a reference to the workspace root.
135    pub fn workspace_root(&self) -> &PathBuf {
136        &self.workspace_root
137    }
138
139    /// Get the configured timeout in seconds.
140    pub fn timeout_secs(&self) -> u64 {
141        self.timeout_secs
142    }
143
144    /// Get a reference to the bot token (masked for logging).
145    pub fn bot_token_masked(&self) -> String {
146        if self.bot_token.len() > 8 {
147            format!(
148                "{}...{}",
149                &self.bot_token[..4],
150                &self.bot_token[self.bot_token.len() - 4..]
151            )
152        } else {
153            "****".to_string()
154        }
155    }
156
157    /// Get a reference to the state manager.
158    pub fn state_manager(&self) -> &StateManager {
159        &self.state_manager
160    }
161
162    /// Get a mutable reference to the message handler.
163    pub fn handler(&mut self) -> &mut MessageHandler {
164        &mut self.handler
165    }
166
167    /// Get the loop ID this service is associated with.
168    pub fn loop_id(&self) -> &str {
169        &self.loop_id
170    }
171
172    /// Returns a clone of the shutdown flag.
173    ///
174    /// Signal handlers can set this flag to interrupt `wait_for_response()`
175    /// without waiting for the full timeout.
176    pub fn shutdown_flag(&self) -> Arc<AtomicBool> {
177        self.shutdown.clone()
178    }
179
180    /// Start the Telegram service.
181    ///
182    /// Spawns a background polling task on the host tokio runtime to receive
183    /// incoming messages. Must be called from within a tokio runtime context.
184    pub fn start(&self) -> TelegramResult<()> {
185        info!(
186            bot_token = %self.bot_token_masked(),
187            workspace = %self.workspace_root.display(),
188            timeout_secs = self.timeout_secs,
189            "Telegram service starting"
190        );
191
192        // Spawn the polling task on the host tokio runtime
193        let handle = tokio::runtime::Handle::try_current().map_err(|_| {
194            TelegramError::Startup("no tokio runtime available for polling".to_string())
195        })?;
196
197        let raw_bot =
198            crate::apply_api_url(teloxide::Bot::new(&self.bot_token), self.api_url.as_deref());
199        let workspace_root = self.workspace_root.clone();
200        let state_path = self.workspace_root.join(".ralph/telegram-state.json");
201        let shutdown = self.shutdown.clone();
202        let loop_id = self.loop_id.clone();
203
204        handle.spawn(async move {
205            Self::poll_updates(raw_bot, workspace_root, state_path, shutdown, loop_id).await;
206        });
207
208        // Send greeting if we already know the chat ID
209        if let Ok(state) = self.state_manager.load_or_default()
210            && let Some(chat_id) = state.chat_id
211        {
212            let greeting = crate::bot::TelegramBot::format_greeting(&self.loop_id);
213            match self.send_with_retry(chat_id, &greeting) {
214                Ok(_) => info!("Sent greeting to chat {}", chat_id),
215                Err(e) => warn!(error = %e, "Failed to send greeting"),
216            }
217        }
218
219        info!("Telegram service started — polling for incoming messages");
220        Ok(())
221    }
222
223    /// Background polling task that receives incoming Telegram messages.
224    ///
225    /// Uses long polling (`getUpdates`) to receive messages, then routes them
226    /// through `MessageHandler` to write events to the correct loop's JSONL.
227    async fn poll_updates(
228        bot: teloxide::Bot,
229        workspace_root: PathBuf,
230        state_path: PathBuf,
231        shutdown: Arc<AtomicBool>,
232        loop_id: String,
233    ) {
234        use teloxide::payloads::{GetUpdatesSetters, SetMessageReactionSetters};
235        use teloxide::requests::Requester;
236
237        let state_manager = StateManager::new(&state_path);
238        let handler_state_manager = StateManager::new(&state_path);
239        let handler = MessageHandler::new(handler_state_manager, &workspace_root);
240        let mut offset: i32 = 0;
241
242        if let Ok(state) = state_manager.load_or_default()
243            && let Some(last_update_id) = state.last_update_id
244        {
245            offset = last_update_id + 1;
246        }
247
248        // Register bot commands with Telegram API
249        Self::register_commands(&bot).await;
250
251        info!(loop_id = %loop_id, "Telegram polling task started");
252
253        while !shutdown.load(Ordering::Relaxed) {
254            let request = bot.get_updates().offset(offset).timeout(10);
255            match request.await {
256                Ok(updates) => {
257                    for update in updates {
258                        // Next offset = current update ID + 1
259                        #[allow(clippy::cast_possible_wrap)]
260                        {
261                            offset = update.id.0 as i32 + 1;
262                        }
263
264                        // Extract message from update kind
265                        let msg = match update.kind {
266                            teloxide::types::UpdateKind::Message(msg) => msg,
267                            _ => continue,
268                        };
269
270                        let text = match msg.text() {
271                            Some(t) => t,
272                            None => continue,
273                        };
274
275                        let chat_id = msg.chat.id.0;
276                        let reply_to: Option<i32> = msg.reply_to_message().map(|r| r.id.0);
277
278                        info!(
279                            chat_id = chat_id,
280                            text = %text,
281                            "Received Telegram message"
282                        );
283
284                        // Handle bot commands before routing to handler.
285                        // Unknown slash-commands are rejected here (not treated as guidance).
286                        if crate::commands::is_command(text) {
287                            let response = crate::commands::handle_command(text, &workspace_root)
288                                .unwrap_or_else(|| {
289                                    "Unknown command. Use /help for the supported commands."
290                                        .to_string()
291                                });
292
293                            use teloxide::payloads::SendMessageSetters;
294                            let send_result = bot
295                                .send_message(teloxide::types::ChatId(chat_id), &response)
296                                .parse_mode(teloxide::types::ParseMode::Html)
297                                .await;
298                            if let Err(e) = send_result {
299                                warn!(error = %e, "Failed to send command response");
300                            }
301                            continue;
302                        }
303
304                        let mut state = match state_manager.load_or_default() {
305                            Ok(s) => s,
306                            Err(e) => {
307                                warn!(error = %e, "Failed to load Telegram state");
308                                continue;
309                            }
310                        };
311
312                        match handler.handle_message(&mut state, text, chat_id, reply_to) {
313                            Ok(topic) => {
314                                let emoji = if topic == "human.response" {
315                                    "👍"
316                                } else {
317                                    "👀"
318                                };
319                                let react_result = bot
320                                    .set_message_reaction(teloxide::types::ChatId(chat_id), msg.id)
321                                    .reaction(vec![teloxide::types::ReactionType::Emoji {
322                                        emoji: emoji.to_string(),
323                                    }])
324                                    .await;
325                                if let Err(e) = react_result {
326                                    warn!(error = %e, "Failed to react to message");
327                                }
328
329                                // For guidance, also send a short text reply
330                                if topic == "human.guidance" {
331                                    let _ = bot
332                                        .send_message(
333                                            teloxide::types::ChatId(chat_id),
334                                            "📝 <b>Guidance received</b> — will apply next iteration.",
335                                        )
336                                        .await;
337                                }
338                            }
339                            Err(e) => {
340                                warn!(
341                                    error = %e,
342                                    text = %text,
343                                    "Failed to handle incoming Telegram message"
344                                );
345                            }
346                        }
347
348                        state.last_seen = Some(Utc::now());
349                        state.last_update_id = Some(offset.saturating_sub(1));
350                        if let Err(e) = state_manager.save(&state) {
351                            warn!(error = %e, "Failed to persist Telegram state");
352                        }
353                    }
354                }
355                Err(e) => {
356                    if !shutdown.load(Ordering::Relaxed) {
357                        warn!(error = %e, "Telegram polling error — retrying in 5s");
358                        tokio::time::sleep(Duration::from_secs(5)).await;
359                    }
360                }
361            }
362        }
363
364        info!(loop_id = %loop_id, "Telegram polling task stopped");
365    }
366
367    /// Register bot commands with the Telegram API so they appear in the menu.
368    async fn register_commands(bot: &teloxide::Bot) {
369        use teloxide::requests::Requester;
370        use teloxide::types::BotCommand;
371
372        let commands = vec![
373            BotCommand::new("status", "Current loop status"),
374            BotCommand::new("tasks", "Open tasks"),
375            BotCommand::new("memories", "Recent memories"),
376            BotCommand::new("tail", "Last 20 events"),
377            BotCommand::new("model", "Show current backend/model"),
378            BotCommand::new("models", "Show configured model options"),
379            BotCommand::new("restart", "Restart the loop"),
380            BotCommand::new("stop", "Stop the loop"),
381            BotCommand::new("help", "List available commands"),
382        ];
383
384        match bot.set_my_commands(commands).await {
385            Ok(_) => info!("Registered bot commands with Telegram API"),
386            Err(e) => warn!(error = %e, "Failed to register bot commands"),
387        }
388    }
389
390    /// Stop the Telegram service gracefully.
391    ///
392    /// Signals the background polling task to shut down.
393    pub fn stop(self) {
394        // Send farewell if we know the chat ID
395        if let Ok(state) = self.state_manager.load_or_default()
396            && let Some(chat_id) = state.chat_id
397        {
398            let farewell = crate::bot::TelegramBot::format_farewell(&self.loop_id);
399            match self.send_with_retry(chat_id, &farewell) {
400                Ok(_) => info!("Sent farewell to chat {}", chat_id),
401                Err(e) => warn!(error = %e, "Failed to send farewell"),
402            }
403        }
404
405        self.shutdown.store(true, Ordering::Relaxed);
406        info!(
407            workspace = %self.workspace_root.display(),
408            "Telegram service stopped"
409        );
410    }
411
412    /// Send a question to the human via Telegram and store it as a pending question.
413    ///
414    /// The question payload is extracted from the `human.interact` event. A pending
415    /// question is stored in the state manager so that incoming replies can be
416    /// routed back to the correct loop.
417    ///
418    /// On send failure, retries up to 3 times with exponential backoff (1s, 2s, 4s).
419    /// Returns the message ID of the sent Telegram message, or 0 if no chat ID
420    /// is configured (question is logged but not sent).
421    pub fn send_question(&self, payload: &str) -> TelegramResult<i32> {
422        let mut state = self.state_manager.load_or_default()?;
423
424        let message_id = if let Some(chat_id) = state.chat_id {
425            self.send_with_retry(chat_id, payload)?
426        } else {
427            warn!(
428                loop_id = %self.loop_id,
429                "No chat ID configured — human.interact question logged but not sent: {}",
430                payload
431            );
432            0
433        };
434
435        self.state_manager
436            .add_pending_question(&mut state, &self.loop_id, message_id)?;
437
438        debug!(
439            loop_id = %self.loop_id,
440            message_id = message_id,
441            "Stored pending question"
442        );
443
444        Ok(message_id)
445    }
446
447    /// Send a periodic check-in message via Telegram.
448    ///
449    /// Loads the chat ID from state and sends a short status update so the
450    /// human knows the loop is still running. Skips silently if no chat ID
451    /// is configured. Returns `Ok(0)` when skipped, or the message ID on
452    /// success.
453    ///
454    /// When a [`CheckinContext`] is provided, the message includes richer
455    /// details: current hat, task progress, and cumulative cost.
456    pub fn send_checkin(
457        &self,
458        iteration: u32,
459        elapsed: Duration,
460        context: Option<&CheckinContext>,
461    ) -> TelegramResult<i32> {
462        let state = self.state_manager.load_or_default()?;
463        let Some(chat_id) = state.chat_id else {
464            debug!(
465                loop_id = %self.loop_id,
466                "No chat ID configured — skipping check-in"
467            );
468            return Ok(0);
469        };
470
471        let elapsed_secs = elapsed.as_secs();
472        let minutes = elapsed_secs / 60;
473        let seconds = elapsed_secs % 60;
474        let elapsed_str = if minutes > 0 {
475            format!("{}m {}s", minutes, seconds)
476        } else {
477            format!("{}s", seconds)
478        };
479
480        let msg = match context {
481            Some(ctx) => {
482                let mut lines = vec![format!(
483                    "Still working — iteration <b>{}</b>, <code>{}</code> elapsed.",
484                    iteration, elapsed_str
485                )];
486
487                if let Some(hat) = &ctx.current_hat {
488                    lines.push(format!(
489                        "Hat: <code>{}</code>",
490                        crate::bot::escape_html(hat)
491                    ));
492                }
493
494                if ctx.open_tasks > 0 || ctx.closed_tasks > 0 {
495                    lines.push(format!(
496                        "Tasks: <b>{}</b> open, {} closed",
497                        ctx.open_tasks, ctx.closed_tasks
498                    ));
499                }
500
501                if ctx.cumulative_cost > 0.0 {
502                    lines.push(format!("Cost: <code>${:.4}</code>", ctx.cumulative_cost));
503                }
504
505                lines.join("\n")
506            }
507            None => format!(
508                "Still working — iteration <b>{}</b>, <code>{}</code> elapsed.",
509                iteration, elapsed_str
510            ),
511        };
512        self.send_with_retry(chat_id, &msg)
513    }
514
515    /// Send a document (file) to the human via Telegram.
516    ///
517    /// Loads the chat ID from state and sends the file at `file_path` with an
518    /// optional caption. Returns `Ok(0)` if no chat ID is configured.
519    pub fn send_document(&self, file_path: &Path, caption: Option<&str>) -> TelegramResult<i32> {
520        let state = self.state_manager.load_or_default()?;
521        let Some(chat_id) = state.chat_id else {
522            warn!(
523                loop_id = %self.loop_id,
524                file = %file_path.display(),
525                "No chat ID configured — document not sent"
526            );
527            return Ok(0);
528        };
529
530        self.send_document_with_retry(chat_id, file_path, caption)
531    }
532
533    /// Send a photo to the human via Telegram.
534    ///
535    /// Loads the chat ID from state and sends the image at `file_path` with an
536    /// optional caption. Returns `Ok(0)` if no chat ID is configured.
537    pub fn send_photo(&self, file_path: &Path, caption: Option<&str>) -> TelegramResult<i32> {
538        let state = self.state_manager.load_or_default()?;
539        let Some(chat_id) = state.chat_id else {
540            warn!(
541                loop_id = %self.loop_id,
542                file = %file_path.display(),
543                "No chat ID configured — photo not sent"
544            );
545            return Ok(0);
546        };
547
548        self.send_photo_with_retry(chat_id, file_path, caption)
549    }
550
551    /// Attempt to send a message with exponential backoff retries.
552    ///
553    /// Uses the host tokio runtime via `block_in_place` + `Handle::block_on`
554    /// to bridge the sync event loop to the async BotApi.
555    fn send_with_retry(&self, chat_id: i64, payload: &str) -> TelegramResult<i32> {
556        use crate::bot::BotApi;
557
558        let handle = tokio::runtime::Handle::try_current().map_err(|_| TelegramError::Send {
559            attempts: 0,
560            reason: "no tokio runtime available for sending".to_string(),
561        })?;
562
563        retry_with_backoff(
564            |_attempt| {
565                tokio::task::block_in_place(|| {
566                    handle.block_on(self.bot.send_message(chat_id, payload))
567                })
568            },
569            |delay| std::thread::sleep(delay),
570        )
571    }
572
573    /// Attempt to send a document with exponential backoff retries.
574    fn send_document_with_retry(
575        &self,
576        chat_id: i64,
577        file_path: &Path,
578        caption: Option<&str>,
579    ) -> TelegramResult<i32> {
580        use crate::bot::BotApi;
581
582        let handle = tokio::runtime::Handle::try_current().map_err(|_| TelegramError::Send {
583            attempts: 0,
584            reason: "no tokio runtime available for sending".to_string(),
585        })?;
586
587        retry_with_backoff(
588            |_attempt| {
589                tokio::task::block_in_place(|| {
590                    handle.block_on(self.bot.send_document(chat_id, file_path, caption))
591                })
592            },
593            |delay| std::thread::sleep(delay),
594        )
595    }
596
597    /// Attempt to send a photo with exponential backoff retries.
598    fn send_photo_with_retry(
599        &self,
600        chat_id: i64,
601        file_path: &Path,
602        caption: Option<&str>,
603    ) -> TelegramResult<i32> {
604        use crate::bot::BotApi;
605
606        let handle = tokio::runtime::Handle::try_current().map_err(|_| TelegramError::Send {
607            attempts: 0,
608            reason: "no tokio runtime available for sending".to_string(),
609        })?;
610
611        retry_with_backoff(
612            |_attempt| {
613                tokio::task::block_in_place(|| {
614                    handle.block_on(self.bot.send_photo(chat_id, file_path, caption))
615                })
616            },
617            |delay| std::thread::sleep(delay),
618        )
619    }
620
621    /// Poll the events file for a `human.response` event, blocking until one
622    /// arrives or the configured timeout expires.
623    ///
624    /// Polls the given `events_path` every second for new lines containing
625    /// `"human.response"`. On response, removes the pending question and
626    /// returns the response message. On timeout, removes the pending question
627    /// and returns `None`.
628    pub fn wait_for_response(&self, events_path: &Path) -> TelegramResult<Option<String>> {
629        let timeout = Duration::from_secs(self.timeout_secs);
630        let poll_interval = Duration::from_millis(250);
631        let deadline = Instant::now() + timeout;
632
633        // Track file position to only read new lines
634        let initial_pos = if events_path.exists() {
635            std::fs::metadata(events_path).map(|m| m.len()).unwrap_or(0)
636        } else {
637            0
638        };
639        let mut file_pos = initial_pos;
640
641        info!(
642            loop_id = %self.loop_id,
643            timeout_secs = self.timeout_secs,
644            events_path = %events_path.display(),
645            "Waiting for human.response"
646        );
647
648        loop {
649            if Instant::now() >= deadline {
650                warn!(
651                    loop_id = %self.loop_id,
652                    timeout_secs = self.timeout_secs,
653                    "Timed out waiting for human.response"
654                );
655
656                // Remove pending question on timeout
657                if let Ok(mut state) = self.state_manager.load_or_default() {
658                    let _ = self
659                        .state_manager
660                        .remove_pending_question(&mut state, &self.loop_id);
661                }
662
663                return Ok(None);
664            }
665
666            // Check if we've been interrupted (Ctrl+C / SIGTERM / SIGHUP)
667            if self.shutdown.load(Ordering::Relaxed) {
668                info!(loop_id = %self.loop_id, "Interrupted while waiting for human.response");
669                if let Ok(mut state) = self.state_manager.load_or_default() {
670                    let _ = self
671                        .state_manager
672                        .remove_pending_question(&mut state, &self.loop_id);
673                }
674                return Ok(None);
675            }
676
677            // Read new lines from the events file
678            if let Some(response) = Self::check_for_response(events_path, &mut file_pos)? {
679                info!(
680                    loop_id = %self.loop_id,
681                    "Received human.response: {}",
682                    response
683                );
684
685                // Remove pending question on response
686                if let Ok(mut state) = self.state_manager.load_or_default() {
687                    let _ = self
688                        .state_manager
689                        .remove_pending_question(&mut state, &self.loop_id);
690                }
691
692                return Ok(Some(response));
693            }
694
695            std::thread::sleep(poll_interval);
696        }
697    }
698
699    /// Check the events file for a `human.response` event starting from
700    /// `file_pos`. Updates `file_pos` to the new end of file.
701    fn check_for_response(
702        events_path: &Path,
703        file_pos: &mut u64,
704    ) -> TelegramResult<Option<String>> {
705        use std::io::{BufRead, BufReader, Seek, SeekFrom};
706
707        if !events_path.exists() {
708            return Ok(None);
709        }
710
711        let mut file = std::fs::File::open(events_path)?;
712        file.seek(SeekFrom::Start(*file_pos))?;
713
714        let reader = BufReader::new(file);
715        for line in reader.lines() {
716            let line = line?;
717            let line_bytes = line.len() as u64 + 1; // +1 for newline
718            *file_pos += line_bytes;
719
720            if line.trim().is_empty() {
721                continue;
722            }
723
724            // Try to parse as JSON event
725            if let Ok(event) = serde_json::from_str::<serde_json::Value>(&line)
726                && event.get("topic").and_then(|t| t.as_str()) == Some("human.response")
727            {
728                let message = event
729                    .get("payload")
730                    .and_then(|p| p.as_str())
731                    .unwrap_or("")
732                    .to_string();
733                return Ok(Some(message));
734            }
735
736            // Also check pipe-separated format (written by MessageHandler)
737            if line.contains("EVENT: human.response") {
738                // Extract message from pipe-separated format:
739                // EVENT: human.response | message: "..." | timestamp: "..."
740                let message = line
741                    .split('|')
742                    .find(|part| part.trim().starts_with("message:"))
743                    .and_then(|part| {
744                        let value = part.trim().strip_prefix("message:")?;
745                        let trimmed = value.trim().trim_matches('"');
746                        Some(trimmed.to_string())
747                    })
748                    .unwrap_or_default();
749                return Ok(Some(message));
750            }
751        }
752
753        Ok(None)
754    }
755}
756
757impl ralph_proto::RobotService for TelegramService {
758    fn send_question(&self, payload: &str) -> anyhow::Result<i32> {
759        Ok(TelegramService::send_question(self, payload)?)
760    }
761
762    fn wait_for_response(&self, events_path: &Path) -> anyhow::Result<Option<String>> {
763        Ok(TelegramService::wait_for_response(self, events_path)?)
764    }
765
766    fn send_checkin(
767        &self,
768        iteration: u32,
769        elapsed: Duration,
770        context: Option<&ralph_proto::CheckinContext>,
771    ) -> anyhow::Result<i32> {
772        // Convert ralph_proto::CheckinContext to the local CheckinContext
773        let local_context = context.map(|ctx| CheckinContext {
774            current_hat: ctx.current_hat.clone(),
775            open_tasks: ctx.open_tasks,
776            closed_tasks: ctx.closed_tasks,
777            cumulative_cost: ctx.cumulative_cost,
778        });
779        Ok(TelegramService::send_checkin(
780            self,
781            iteration,
782            elapsed,
783            local_context.as_ref(),
784        )?)
785    }
786
787    fn timeout_secs(&self) -> u64 {
788        self.timeout_secs
789    }
790
791    fn shutdown_flag(&self) -> Arc<AtomicBool> {
792        self.shutdown.clone()
793    }
794
795    fn stop(self: Box<Self>) {
796        TelegramService::stop(*self);
797    }
798}
799
800impl fmt::Debug for TelegramService {
801    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
802        f.debug_struct("TelegramService")
803            .field("workspace_root", &self.workspace_root)
804            .field("bot_token", &self.bot_token_masked())
805            .field("timeout_secs", &self.timeout_secs)
806            .finish_non_exhaustive()
807    }
808}
809
810#[cfg(test)]
811mod tests {
812    use super::*;
813    use std::io::Write;
814    use tempfile::TempDir;
815
816    fn test_service(dir: &TempDir) -> TelegramService {
817        TelegramService::new(
818            dir.path().to_path_buf(),
819            Some("test-token-12345".to_string()),
820            None,
821            300,
822            "main".to_string(),
823        )
824        .unwrap()
825    }
826
827    #[test]
828    fn new_with_explicit_token() {
829        let dir = TempDir::new().unwrap();
830        let service = TelegramService::new(
831            dir.path().to_path_buf(),
832            Some("test-token-12345".to_string()),
833            None,
834            300,
835            "main".to_string(),
836        );
837        assert!(service.is_ok());
838    }
839
840    #[test]
841    fn new_without_token_fails() {
842        // Only run this test when the env var is not set,
843        // to avoid needing unsafe remove_var
844        if std::env::var("RALPH_TELEGRAM_BOT_TOKEN").is_ok() {
845            return;
846        }
847
848        let dir = TempDir::new().unwrap();
849        let service = TelegramService::new(
850            dir.path().to_path_buf(),
851            None,
852            None,
853            300,
854            "main".to_string(),
855        );
856        assert!(service.is_err());
857        assert!(matches!(
858            service.unwrap_err(),
859            TelegramError::MissingBotToken
860        ));
861    }
862
863    #[test]
864    fn bot_token_masked_works() {
865        let dir = TempDir::new().unwrap();
866        let service = TelegramService::new(
867            dir.path().to_path_buf(),
868            Some("abcd1234efgh5678".to_string()),
869            None,
870            300,
871            "main".to_string(),
872        )
873        .unwrap();
874        let masked = service.bot_token_masked();
875        assert_eq!(masked, "abcd...5678");
876    }
877
878    #[test]
879    fn loop_id_accessor() {
880        let dir = TempDir::new().unwrap();
881        let service = TelegramService::new(
882            dir.path().to_path_buf(),
883            Some("token".to_string()),
884            None,
885            60,
886            "feature-auth".to_string(),
887        )
888        .unwrap();
889        assert_eq!(service.loop_id(), "feature-auth");
890    }
891
892    #[test]
893    fn send_question_stores_pending_question() {
894        let dir = TempDir::new().unwrap();
895        let service = test_service(&dir);
896
897        service.send_question("Which DB to use?").unwrap();
898
899        // Verify pending question is stored
900        let state = service.state_manager().load_or_default().unwrap();
901        assert!(
902            state.pending_questions.contains_key("main"),
903            "pending question should be stored for loop_id 'main'"
904        );
905    }
906
907    #[test]
908    fn send_question_returns_message_id() {
909        let dir = TempDir::new().unwrap();
910        let service = test_service(&dir);
911
912        let msg_id = service.send_question("async or sync?").unwrap();
913        // Without a chat_id in state, message_id is 0
914        assert_eq!(msg_id, 0);
915    }
916
917    #[test]
918    fn check_for_response_json_format() {
919        let dir = TempDir::new().unwrap();
920        let events_path = dir.path().join("events.jsonl");
921
922        // Write a non-response event first
923        let mut file = std::fs::File::create(&events_path).unwrap();
924        writeln!(
925            file,
926            r#"{{"topic":"build.done","payload":"tests: pass, lint: pass, typecheck: pass, audit: pass, coverage: pass","ts":"2026-01-30T00:00:00Z"}}"#
927        )
928        .unwrap();
929        // Write a human.response event
930        writeln!(
931            file,
932            r#"{{"topic":"human.response","payload":"Use async","ts":"2026-01-30T00:01:00Z"}}"#
933        )
934        .unwrap();
935        file.flush().unwrap();
936
937        let mut pos = 0;
938        let result = TelegramService::check_for_response(&events_path, &mut pos).unwrap();
939        assert_eq!(result, Some("Use async".to_string()));
940    }
941
942    #[test]
943    fn check_for_response_pipe_format() {
944        let dir = TempDir::new().unwrap();
945        let events_path = dir.path().join("events.jsonl");
946
947        let mut file = std::fs::File::create(&events_path).unwrap();
948        writeln!(
949            file,
950            r#"EVENT: human.response | message: "Use sync" | timestamp: "2026-01-30T00:01:00Z""#
951        )
952        .unwrap();
953        file.flush().unwrap();
954
955        let mut pos = 0;
956        let result = TelegramService::check_for_response(&events_path, &mut pos).unwrap();
957        assert_eq!(result, Some("Use sync".to_string()));
958    }
959
960    #[test]
961    fn check_for_response_skips_non_response_events() {
962        let dir = TempDir::new().unwrap();
963        let events_path = dir.path().join("events.jsonl");
964
965        let mut file = std::fs::File::create(&events_path).unwrap();
966        writeln!(
967            file,
968            r#"{{"topic":"build.done","payload":"done","ts":"2026-01-30T00:00:00Z"}}"#
969        )
970        .unwrap();
971        writeln!(
972            file,
973            r#"{{"topic":"human.guidance","payload":"check errors","ts":"2026-01-30T00:01:00Z"}}"#
974        )
975        .unwrap();
976        file.flush().unwrap();
977
978        let mut pos = 0;
979        let result = TelegramService::check_for_response(&events_path, &mut pos).unwrap();
980        assert_eq!(result, None);
981    }
982
983    #[test]
984    fn check_for_response_missing_file() {
985        let dir = TempDir::new().unwrap();
986        let events_path = dir.path().join("does-not-exist.jsonl");
987
988        let mut pos = 0;
989        let result = TelegramService::check_for_response(&events_path, &mut pos).unwrap();
990        assert_eq!(result, None);
991    }
992
993    #[test]
994    fn check_for_response_tracks_position() {
995        let dir = TempDir::new().unwrap();
996        let events_path = dir.path().join("events.jsonl");
997
998        // Write one event
999        let mut file = std::fs::OpenOptions::new()
1000            .create(true)
1001            .truncate(true)
1002            .write(true)
1003            .open(&events_path)
1004            .unwrap();
1005        writeln!(
1006            file,
1007            r#"{{"topic":"build.done","payload":"done","ts":"2026-01-30T00:00:00Z"}}"#
1008        )
1009        .unwrap();
1010        file.flush().unwrap();
1011
1012        let mut pos = 0;
1013        let result = TelegramService::check_for_response(&events_path, &mut pos).unwrap();
1014        assert_eq!(result, None);
1015        assert!(pos > 0, "position should advance after reading");
1016
1017        let pos_after_first = pos;
1018
1019        // Append a human.response
1020        let mut file = std::fs::OpenOptions::new()
1021            .append(true)
1022            .open(&events_path)
1023            .unwrap();
1024        writeln!(
1025            file,
1026            r#"{{"topic":"human.response","payload":"yes","ts":"2026-01-30T00:02:00Z"}}"#
1027        )
1028        .unwrap();
1029        file.flush().unwrap();
1030
1031        // Should find the response starting from where we left off
1032        let result = TelegramService::check_for_response(&events_path, &mut pos).unwrap();
1033        assert_eq!(result, Some("yes".to_string()));
1034        assert!(pos > pos_after_first, "position should advance further");
1035    }
1036
1037    #[test]
1038    fn wait_for_response_returns_on_response() {
1039        let dir = TempDir::new().unwrap();
1040        let service = TelegramService::new(
1041            dir.path().to_path_buf(),
1042            Some("token".to_string()),
1043            None,
1044            5, // enough time for the writer thread
1045            "main".to_string(),
1046        )
1047        .unwrap();
1048
1049        let events_path = dir.path().join("events.jsonl");
1050        // Create an empty events file so wait_for_response records position 0
1051        std::fs::File::create(&events_path).unwrap();
1052
1053        // Store a pending question first
1054        service.send_question("Which plan?").unwrap();
1055
1056        // Spawn a thread to write the response after a brief delay
1057        let writer_path = events_path.clone();
1058        let writer = std::thread::spawn(move || {
1059            std::thread::sleep(Duration::from_millis(200));
1060            let mut file = std::fs::OpenOptions::new()
1061                .append(true)
1062                .open(&writer_path)
1063                .unwrap();
1064            writeln!(
1065                file,
1066                r#"{{"topic":"human.response","payload":"Go with plan A","ts":"2026-01-30T00:00:00Z"}}"#
1067            )
1068            .unwrap();
1069            file.flush().unwrap();
1070        });
1071
1072        let result = service.wait_for_response(&events_path).unwrap();
1073        writer.join().unwrap();
1074
1075        assert_eq!(result, Some("Go with plan A".to_string()));
1076
1077        // Pending question should be removed
1078        let state = service.state_manager().load_or_default().unwrap();
1079        assert!(
1080            !state.pending_questions.contains_key("main"),
1081            "pending question should be removed after response"
1082        );
1083    }
1084
1085    #[test]
1086    fn wait_for_response_returns_none_on_timeout() {
1087        let dir = TempDir::new().unwrap();
1088        let service = TelegramService::new(
1089            dir.path().to_path_buf(),
1090            Some("token".to_string()),
1091            None,
1092            1, // 1 second timeout
1093            "main".to_string(),
1094        )
1095        .unwrap();
1096
1097        let events_path = dir.path().join("events.jsonl");
1098        // Create an empty events file with no human.response
1099        std::fs::File::create(&events_path).unwrap();
1100
1101        // Store a pending question
1102        service.send_question("Will this timeout?").unwrap();
1103
1104        let result = service.wait_for_response(&events_path).unwrap();
1105        assert_eq!(result, None, "should return None on timeout");
1106
1107        // Pending question should be removed even on timeout
1108        let state = service.state_manager().load_or_default().unwrap();
1109        assert!(
1110            !state.pending_questions.contains_key("main"),
1111            "pending question should be removed on timeout"
1112        );
1113    }
1114
1115    #[test]
1116    fn retry_with_backoff_succeeds_on_first_attempt() {
1117        let attempts = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
1118        let attempts_clone = attempts.clone();
1119
1120        let result = retry_with_backoff(
1121            |attempt| {
1122                attempts_clone.lock().unwrap().push(attempt);
1123                Ok(42)
1124            },
1125            |_delay| {},
1126        );
1127
1128        assert!(result.is_ok());
1129        assert_eq!(result.unwrap(), 42);
1130        assert_eq!(*attempts.lock().unwrap(), vec![1]);
1131    }
1132
1133    #[test]
1134    fn retry_with_backoff_succeeds_on_second_attempt() {
1135        let attempts = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
1136        let attempts_clone = attempts.clone();
1137        let delays = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
1138        let delays_clone = delays.clone();
1139
1140        let result = retry_with_backoff(
1141            |attempt| {
1142                attempts_clone.lock().unwrap().push(attempt);
1143                if attempt < 2 {
1144                    Err(TelegramError::Send {
1145                        attempts: attempt,
1146                        reason: "transient failure".to_string(),
1147                    })
1148                } else {
1149                    Ok(99)
1150                }
1151            },
1152            |delay| {
1153                delays_clone.lock().unwrap().push(delay);
1154            },
1155        );
1156
1157        assert!(result.is_ok());
1158        assert_eq!(result.unwrap(), 99);
1159        assert_eq!(*attempts.lock().unwrap(), vec![1, 2]);
1160        // First retry delay: 1s * 2^0 = 1s
1161        assert_eq!(*delays.lock().unwrap(), vec![Duration::from_secs(1)]);
1162    }
1163
1164    #[test]
1165    fn retry_with_backoff_succeeds_on_third_attempt() {
1166        let attempts = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
1167        let attempts_clone = attempts.clone();
1168        let delays = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
1169        let delays_clone = delays.clone();
1170
1171        let result = retry_with_backoff(
1172            |attempt| {
1173                attempts_clone.lock().unwrap().push(attempt);
1174                if attempt < 3 {
1175                    Err(TelegramError::Send {
1176                        attempts: attempt,
1177                        reason: "transient failure".to_string(),
1178                    })
1179                } else {
1180                    Ok(7)
1181                }
1182            },
1183            |delay| {
1184                delays_clone.lock().unwrap().push(delay);
1185            },
1186        );
1187
1188        assert!(result.is_ok());
1189        assert_eq!(result.unwrap(), 7);
1190        assert_eq!(*attempts.lock().unwrap(), vec![1, 2, 3]);
1191        // Delays: 1s * 2^0 = 1s, 1s * 2^1 = 2s
1192        assert_eq!(
1193            *delays.lock().unwrap(),
1194            vec![Duration::from_secs(1), Duration::from_secs(2)]
1195        );
1196    }
1197
1198    #[test]
1199    fn retry_with_backoff_fails_after_all_retries() {
1200        let attempts = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
1201        let attempts_clone = attempts.clone();
1202        let delays = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
1203        let delays_clone = delays.clone();
1204
1205        let result = retry_with_backoff(
1206            |attempt| {
1207                attempts_clone.lock().unwrap().push(attempt);
1208                Err(TelegramError::Send {
1209                    attempts: attempt,
1210                    reason: format!("failure on attempt {}", attempt),
1211                })
1212            },
1213            |delay| {
1214                delays_clone.lock().unwrap().push(delay);
1215            },
1216        );
1217
1218        assert!(result.is_err());
1219        let err = result.unwrap_err();
1220        assert!(matches!(
1221            err,
1222            TelegramError::Send {
1223                attempts: 3,
1224                reason: _
1225            }
1226        ));
1227        // Should report the last error message
1228        if let TelegramError::Send { reason, .. } = &err {
1229            assert!(reason.contains("failure on attempt 3"));
1230        }
1231        assert_eq!(*attempts.lock().unwrap(), vec![1, 2, 3]);
1232        // Delays: 1s, 2s (no delay after final attempt)
1233        assert_eq!(
1234            *delays.lock().unwrap(),
1235            vec![Duration::from_secs(1), Duration::from_secs(2)]
1236        );
1237    }
1238
1239    #[test]
1240    fn retry_with_backoff_exponential_delays_are_correct() {
1241        let delays = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
1242        let delays_clone = delays.clone();
1243
1244        let _ = retry_with_backoff(
1245            |_attempt| {
1246                Err(TelegramError::Send {
1247                    attempts: 1,
1248                    reason: "always fail".to_string(),
1249                })
1250            },
1251            |delay| {
1252                delays_clone.lock().unwrap().push(delay);
1253            },
1254        );
1255
1256        let recorded = delays.lock().unwrap().clone();
1257        // Backoff: 1s * 2^0 = 1s, 1s * 2^1 = 2s (no sleep after 3rd attempt)
1258        assert_eq!(recorded.len(), 2);
1259        assert_eq!(recorded[0], Duration::from_secs(1));
1260        assert_eq!(recorded[1], Duration::from_secs(2));
1261    }
1262
1263    #[test]
1264    fn checkin_context_default() {
1265        let ctx = CheckinContext::default();
1266        assert!(ctx.current_hat.is_none());
1267        assert_eq!(ctx.open_tasks, 0);
1268        assert_eq!(ctx.closed_tasks, 0);
1269        assert!(ctx.cumulative_cost.abs() < f64::EPSILON);
1270    }
1271
1272    #[test]
1273    fn checkin_context_with_hat_and_tasks() {
1274        let ctx = CheckinContext {
1275            current_hat: Some("executor".to_string()),
1276            open_tasks: 3,
1277            closed_tasks: 5,
1278            cumulative_cost: 1.2345,
1279        };
1280        assert_eq!(ctx.current_hat.as_deref(), Some("executor"));
1281        assert_eq!(ctx.open_tasks, 3);
1282        assert_eq!(ctx.closed_tasks, 5);
1283        assert!((ctx.cumulative_cost - 1.2345).abs() < f64::EPSILON);
1284    }
1285
1286    #[test]
1287    fn wait_for_response_returns_none_on_shutdown() {
1288        let dir = TempDir::new().unwrap();
1289        let service = TelegramService::new(
1290            dir.path().to_path_buf(),
1291            Some("token".to_string()),
1292            None,
1293            60, // long timeout — shutdown flag should preempt it
1294            "main".to_string(),
1295        )
1296        .unwrap();
1297
1298        let events_path = dir.path().join("events.jsonl");
1299        std::fs::File::create(&events_path).unwrap();
1300
1301        // Set shutdown flag before calling wait_for_response
1302        service.shutdown_flag().store(true, Ordering::Relaxed);
1303
1304        let start = Instant::now();
1305        let result = service.wait_for_response(&events_path).unwrap();
1306        let elapsed = start.elapsed();
1307
1308        assert_eq!(result, None, "should return None when shutdown flag is set");
1309        assert!(
1310            elapsed < Duration::from_secs(2),
1311            "should return quickly, not wait for timeout (elapsed: {:?})",
1312            elapsed
1313        );
1314    }
1315}