Skip to main content

ralph_telegram/
service.rs

1use std::fmt;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::time::{Duration, Instant};
6
7use chrono::Utc;
8use tracing::{debug, info, warn};
9
10use crate::bot::TelegramBot;
11use crate::error::{TelegramError, TelegramResult};
12use crate::handler::MessageHandler;
13use crate::state::StateManager;
14
15/// Maximum number of retry attempts for sending messages.
16pub const MAX_SEND_RETRIES: u32 = 3;
17
18/// Base delay for exponential backoff (1 second).
19pub const BASE_RETRY_DELAY: Duration = Duration::from_secs(1);
20
21/// Execute a fallible send operation with exponential backoff retry.
22///
23/// Retries up to [`MAX_SEND_RETRIES`] times with delays of 1s, 2s, 4s.
24/// Returns the result on success, or `TelegramError::Send` after all
25/// retries are exhausted.
26///
27/// The `sleep_fn` parameter allows tests to substitute a no-op sleep.
28pub fn retry_with_backoff<F, S>(mut send_fn: F, mut sleep_fn: S) -> TelegramResult<i32>
29where
30    F: FnMut(u32) -> TelegramResult<i32>,
31    S: FnMut(Duration),
32{
33    let mut last_error = String::new();
34
35    for attempt in 1..=MAX_SEND_RETRIES {
36        match send_fn(attempt) {
37            Ok(msg_id) => return Ok(msg_id),
38            Err(e) => {
39                last_error = e.to_string();
40                warn!(
41                    attempt = attempt,
42                    max_retries = MAX_SEND_RETRIES,
43                    error = %last_error,
44                    "Telegram send failed, {}",
45                    if attempt < MAX_SEND_RETRIES {
46                        "retrying with backoff"
47                    } else {
48                        "all retries exhausted"
49                    }
50                );
51                if attempt < MAX_SEND_RETRIES {
52                    let delay = BASE_RETRY_DELAY * 2u32.pow(attempt - 1);
53                    sleep_fn(delay);
54                }
55            }
56        }
57    }
58
59    Err(TelegramError::Send {
60        attempts: MAX_SEND_RETRIES,
61        reason: last_error,
62    })
63}
64
65/// Additional context for enhanced check-in messages.
66///
67/// Provides richer information than the basic iteration + elapsed time,
68/// including current hat, task progress, and cost tracking.
69#[derive(Debug, Default)]
70pub struct CheckinContext {
71    /// The currently active hat name (e.g., "executor", "reviewer").
72    pub current_hat: Option<String>,
73    /// Number of open (non-terminal) tasks.
74    pub open_tasks: usize,
75    /// Number of closed tasks.
76    pub closed_tasks: usize,
77    /// Cumulative cost in USD.
78    pub cumulative_cost: f64,
79}
80
81/// Coordinates the Telegram bot lifecycle with the Ralph event loop.
82///
83/// Manages startup, shutdown, message sending, and response waiting.
84/// Uses the host tokio runtime (from `#[tokio::main]`) for async operations.
85pub struct TelegramService {
86    workspace_root: PathBuf,
87    bot_token: String,
88    timeout_secs: u64,
89    loop_id: String,
90    state_manager: StateManager,
91    handler: MessageHandler,
92    bot: TelegramBot,
93    shutdown: Arc<AtomicBool>,
94}
95
96impl TelegramService {
97    /// Create a new TelegramService.
98    ///
99    /// Resolves the bot token from config or `RALPH_TELEGRAM_BOT_TOKEN` env var.
100    pub fn new(
101        workspace_root: PathBuf,
102        bot_token: Option<String>,
103        timeout_secs: u64,
104        loop_id: String,
105    ) -> TelegramResult<Self> {
106        let resolved_token = bot_token
107            .or_else(|| std::env::var("RALPH_TELEGRAM_BOT_TOKEN").ok())
108            .ok_or(TelegramError::MissingBotToken)?;
109
110        let state_path = workspace_root.join(".ralph/telegram-state.json");
111        let state_manager = StateManager::new(&state_path);
112        let handler_state_manager = StateManager::new(&state_path);
113        let handler = MessageHandler::new(handler_state_manager, &workspace_root);
114        let bot = TelegramBot::new(&resolved_token);
115        let shutdown = Arc::new(AtomicBool::new(false));
116
117        Ok(Self {
118            workspace_root,
119            bot_token: resolved_token,
120            timeout_secs,
121            loop_id,
122            state_manager,
123            handler,
124            bot,
125            shutdown,
126        })
127    }
128
129    /// Get a reference to the workspace root.
130    pub fn workspace_root(&self) -> &PathBuf {
131        &self.workspace_root
132    }
133
134    /// Get the configured timeout in seconds.
135    pub fn timeout_secs(&self) -> u64 {
136        self.timeout_secs
137    }
138
139    /// Get a reference to the bot token (masked for logging).
140    pub fn bot_token_masked(&self) -> String {
141        if self.bot_token.len() > 8 {
142            format!(
143                "{}...{}",
144                &self.bot_token[..4],
145                &self.bot_token[self.bot_token.len() - 4..]
146            )
147        } else {
148            "****".to_string()
149        }
150    }
151
152    /// Get a reference to the state manager.
153    pub fn state_manager(&self) -> &StateManager {
154        &self.state_manager
155    }
156
157    /// Get a mutable reference to the message handler.
158    pub fn handler(&mut self) -> &mut MessageHandler {
159        &mut self.handler
160    }
161
162    /// Get the loop ID this service is associated with.
163    pub fn loop_id(&self) -> &str {
164        &self.loop_id
165    }
166
167    /// Returns a clone of the shutdown flag.
168    ///
169    /// Signal handlers can set this flag to interrupt `wait_for_response()`
170    /// without waiting for the full timeout.
171    pub fn shutdown_flag(&self) -> Arc<AtomicBool> {
172        self.shutdown.clone()
173    }
174
175    /// Start the Telegram service.
176    ///
177    /// Spawns a background polling task on the host tokio runtime to receive
178    /// incoming messages. Must be called from within a tokio runtime context.
179    pub fn start(&self) -> TelegramResult<()> {
180        info!(
181            bot_token = %self.bot_token_masked(),
182            workspace = %self.workspace_root.display(),
183            timeout_secs = self.timeout_secs,
184            "Telegram service starting"
185        );
186
187        // Spawn the polling task on the host tokio runtime
188        let handle = tokio::runtime::Handle::try_current().map_err(|_| {
189            TelegramError::Startup("no tokio runtime available for polling".to_string())
190        })?;
191
192        let raw_bot = teloxide::Bot::new(&self.bot_token);
193        let workspace_root = self.workspace_root.clone();
194        let state_path = self.workspace_root.join(".ralph/telegram-state.json");
195        let shutdown = self.shutdown.clone();
196        let loop_id = self.loop_id.clone();
197
198        handle.spawn(async move {
199            Self::poll_updates(raw_bot, workspace_root, state_path, shutdown, loop_id).await;
200        });
201
202        // Send greeting if we already know the chat ID
203        if let Ok(state) = self.state_manager.load_or_default()
204            && let Some(chat_id) = state.chat_id
205        {
206            let greeting = crate::bot::TelegramBot::format_greeting(&self.loop_id);
207            match self.send_with_retry(chat_id, &greeting) {
208                Ok(_) => info!("Sent greeting to chat {}", chat_id),
209                Err(e) => warn!(error = %e, "Failed to send greeting"),
210            }
211        }
212
213        info!("Telegram service started — polling for incoming messages");
214        Ok(())
215    }
216
217    /// Background polling task that receives incoming Telegram messages.
218    ///
219    /// Uses long polling (`getUpdates`) to receive messages, then routes them
220    /// through `MessageHandler` to write events to the correct loop's JSONL.
221    async fn poll_updates(
222        bot: teloxide::Bot,
223        workspace_root: PathBuf,
224        state_path: PathBuf,
225        shutdown: Arc<AtomicBool>,
226        loop_id: String,
227    ) {
228        use teloxide::payloads::{GetUpdatesSetters, SetMessageReactionSetters};
229        use teloxide::requests::Requester;
230
231        let state_manager = StateManager::new(&state_path);
232        let handler_state_manager = StateManager::new(&state_path);
233        let handler = MessageHandler::new(handler_state_manager, &workspace_root);
234        let mut offset: i32 = 0;
235
236        if let Ok(state) = state_manager.load_or_default()
237            && let Some(last_update_id) = state.last_update_id
238        {
239            offset = last_update_id + 1;
240        }
241
242        // Register bot commands with Telegram API
243        Self::register_commands(&bot).await;
244
245        info!(loop_id = %loop_id, "Telegram polling task started");
246
247        while !shutdown.load(Ordering::Relaxed) {
248            let request = bot.get_updates().offset(offset).timeout(10);
249            match request.await {
250                Ok(updates) => {
251                    for update in updates {
252                        // Next offset = current update ID + 1
253                        #[allow(clippy::cast_possible_wrap)]
254                        {
255                            offset = update.id.0 as i32 + 1;
256                        }
257
258                        // Extract message from update kind
259                        let msg = match update.kind {
260                            teloxide::types::UpdateKind::Message(msg) => msg,
261                            _ => continue,
262                        };
263
264                        let text = match msg.text() {
265                            Some(t) => t,
266                            None => continue,
267                        };
268
269                        let chat_id = msg.chat.id.0;
270                        let reply_to: Option<i32> = msg.reply_to_message().map(|r| r.id.0);
271
272                        info!(
273                            chat_id = chat_id,
274                            text = %text,
275                            "Received Telegram message"
276                        );
277
278                        // Handle bot commands before routing to handler
279                        if crate::commands::is_command(text)
280                            && let Some(response) =
281                                crate::commands::handle_command(text, &workspace_root)
282                        {
283                            use teloxide::payloads::SendMessageSetters;
284                            let send_result = bot
285                                .send_message(teloxide::types::ChatId(chat_id), &response)
286                                .parse_mode(teloxide::types::ParseMode::Html)
287                                .await;
288                            if let Err(e) = send_result {
289                                warn!(error = %e, "Failed to send command response");
290                            }
291                            continue;
292                        }
293
294                        let mut state = match state_manager.load_or_default() {
295                            Ok(s) => s,
296                            Err(e) => {
297                                warn!(error = %e, "Failed to load Telegram state");
298                                continue;
299                            }
300                        };
301
302                        match handler.handle_message(&mut state, text, chat_id, reply_to) {
303                            Ok(topic) => {
304                                let emoji = if topic == "human.response" {
305                                    "👍"
306                                } else {
307                                    "👀"
308                                };
309                                let react_result = bot
310                                    .set_message_reaction(teloxide::types::ChatId(chat_id), msg.id)
311                                    .reaction(vec![teloxide::types::ReactionType::Emoji {
312                                        emoji: emoji.to_string(),
313                                    }])
314                                    .await;
315                                if let Err(e) = react_result {
316                                    warn!(error = %e, "Failed to react to message");
317                                }
318
319                                // For guidance, also send a short text reply
320                                if topic == "human.guidance" {
321                                    let _ = bot
322                                        .send_message(
323                                            teloxide::types::ChatId(chat_id),
324                                            "📝 <b>Guidance received</b> — will apply next iteration.",
325                                        )
326                                        .await;
327                                }
328                            }
329                            Err(e) => {
330                                warn!(
331                                    error = %e,
332                                    text = %text,
333                                    "Failed to handle incoming Telegram message"
334                                );
335                            }
336                        }
337
338                        state.last_seen = Some(Utc::now());
339                        state.last_update_id = Some(offset.saturating_sub(1));
340                        if let Err(e) = state_manager.save(&state) {
341                            warn!(error = %e, "Failed to persist Telegram state");
342                        }
343                    }
344                }
345                Err(e) => {
346                    if !shutdown.load(Ordering::Relaxed) {
347                        warn!(error = %e, "Telegram polling error — retrying in 5s");
348                        tokio::time::sleep(Duration::from_secs(5)).await;
349                    }
350                }
351            }
352        }
353
354        info!(loop_id = %loop_id, "Telegram polling task stopped");
355    }
356
357    /// Register bot commands with the Telegram API so they appear in the menu.
358    async fn register_commands(bot: &teloxide::Bot) {
359        use teloxide::requests::Requester;
360        use teloxide::types::BotCommand;
361
362        let commands = vec![
363            BotCommand::new("status", "Current loop status"),
364            BotCommand::new("tasks", "Open tasks"),
365            BotCommand::new("memories", "Recent memories"),
366            BotCommand::new("tail", "Last 20 events"),
367            BotCommand::new("stop", "Stop the loop"),
368            BotCommand::new("help", "List available commands"),
369        ];
370
371        match bot.set_my_commands(commands).await {
372            Ok(_) => info!("Registered bot commands with Telegram API"),
373            Err(e) => warn!(error = %e, "Failed to register bot commands"),
374        }
375    }
376
377    /// Stop the Telegram service gracefully.
378    ///
379    /// Signals the background polling task to shut down.
380    pub fn stop(self) {
381        // Send farewell if we know the chat ID
382        if let Ok(state) = self.state_manager.load_or_default()
383            && let Some(chat_id) = state.chat_id
384        {
385            let farewell = crate::bot::TelegramBot::format_farewell(&self.loop_id);
386            match self.send_with_retry(chat_id, &farewell) {
387                Ok(_) => info!("Sent farewell to chat {}", chat_id),
388                Err(e) => warn!(error = %e, "Failed to send farewell"),
389            }
390        }
391
392        self.shutdown.store(true, Ordering::Relaxed);
393        info!(
394            workspace = %self.workspace_root.display(),
395            "Telegram service stopped"
396        );
397    }
398
399    /// Send a question to the human via Telegram and store it as a pending question.
400    ///
401    /// The question payload is extracted from the `human.interact` event. A pending
402    /// question is stored in the state manager so that incoming replies can be
403    /// routed back to the correct loop.
404    ///
405    /// On send failure, retries up to 3 times with exponential backoff (1s, 2s, 4s).
406    /// Returns the message ID of the sent Telegram message, or 0 if no chat ID
407    /// is configured (question is logged but not sent).
408    pub fn send_question(&self, payload: &str) -> TelegramResult<i32> {
409        let mut state = self.state_manager.load_or_default()?;
410
411        let message_id = if let Some(chat_id) = state.chat_id {
412            self.send_with_retry(chat_id, payload)?
413        } else {
414            warn!(
415                loop_id = %self.loop_id,
416                "No chat ID configured — human.interact question logged but not sent: {}",
417                payload
418            );
419            0
420        };
421
422        self.state_manager
423            .add_pending_question(&mut state, &self.loop_id, message_id)?;
424
425        debug!(
426            loop_id = %self.loop_id,
427            message_id = message_id,
428            "Stored pending question"
429        );
430
431        Ok(message_id)
432    }
433
434    /// Send a periodic check-in message via Telegram.
435    ///
436    /// Loads the chat ID from state and sends a short status update so the
437    /// human knows the loop is still running. Skips silently if no chat ID
438    /// is configured. Returns `Ok(0)` when skipped, or the message ID on
439    /// success.
440    ///
441    /// When a [`CheckinContext`] is provided, the message includes richer
442    /// details: current hat, task progress, and cumulative cost.
443    pub fn send_checkin(
444        &self,
445        iteration: u32,
446        elapsed: Duration,
447        context: Option<&CheckinContext>,
448    ) -> TelegramResult<i32> {
449        let state = self.state_manager.load_or_default()?;
450        let Some(chat_id) = state.chat_id else {
451            debug!(
452                loop_id = %self.loop_id,
453                "No chat ID configured — skipping check-in"
454            );
455            return Ok(0);
456        };
457
458        let elapsed_secs = elapsed.as_secs();
459        let minutes = elapsed_secs / 60;
460        let seconds = elapsed_secs % 60;
461        let elapsed_str = if minutes > 0 {
462            format!("{}m {}s", minutes, seconds)
463        } else {
464            format!("{}s", seconds)
465        };
466
467        let msg = match context {
468            Some(ctx) => {
469                let mut lines = vec![format!(
470                    "Still working — iteration <b>{}</b>, <code>{}</code> elapsed.",
471                    iteration, elapsed_str
472                )];
473
474                if let Some(hat) = &ctx.current_hat {
475                    lines.push(format!(
476                        "Hat: <code>{}</code>",
477                        crate::bot::escape_html(hat)
478                    ));
479                }
480
481                if ctx.open_tasks > 0 || ctx.closed_tasks > 0 {
482                    lines.push(format!(
483                        "Tasks: <b>{}</b> open, {} closed",
484                        ctx.open_tasks, ctx.closed_tasks
485                    ));
486                }
487
488                if ctx.cumulative_cost > 0.0 {
489                    lines.push(format!("Cost: <code>${:.4}</code>", ctx.cumulative_cost));
490                }
491
492                lines.join("\n")
493            }
494            None => format!(
495                "Still working — iteration <b>{}</b>, <code>{}</code> elapsed.",
496                iteration, elapsed_str
497            ),
498        };
499        self.send_with_retry(chat_id, &msg)
500    }
501
502    /// Send a document (file) to the human via Telegram.
503    ///
504    /// Loads the chat ID from state and sends the file at `file_path` with an
505    /// optional caption. Returns `Ok(0)` if no chat ID is configured.
506    pub fn send_document(&self, file_path: &Path, caption: Option<&str>) -> TelegramResult<i32> {
507        let state = self.state_manager.load_or_default()?;
508        let Some(chat_id) = state.chat_id else {
509            warn!(
510                loop_id = %self.loop_id,
511                file = %file_path.display(),
512                "No chat ID configured — document not sent"
513            );
514            return Ok(0);
515        };
516
517        self.send_document_with_retry(chat_id, file_path, caption)
518    }
519
520    /// Send a photo to the human via Telegram.
521    ///
522    /// Loads the chat ID from state and sends the image at `file_path` with an
523    /// optional caption. Returns `Ok(0)` if no chat ID is configured.
524    pub fn send_photo(&self, file_path: &Path, caption: Option<&str>) -> TelegramResult<i32> {
525        let state = self.state_manager.load_or_default()?;
526        let Some(chat_id) = state.chat_id else {
527            warn!(
528                loop_id = %self.loop_id,
529                file = %file_path.display(),
530                "No chat ID configured — photo not sent"
531            );
532            return Ok(0);
533        };
534
535        self.send_photo_with_retry(chat_id, file_path, caption)
536    }
537
538    /// Attempt to send a message with exponential backoff retries.
539    ///
540    /// Uses the host tokio runtime via `block_in_place` + `Handle::block_on`
541    /// to bridge the sync event loop to the async BotApi.
542    fn send_with_retry(&self, chat_id: i64, payload: &str) -> TelegramResult<i32> {
543        use crate::bot::BotApi;
544
545        let handle = tokio::runtime::Handle::try_current().map_err(|_| TelegramError::Send {
546            attempts: 0,
547            reason: "no tokio runtime available for sending".to_string(),
548        })?;
549
550        retry_with_backoff(
551            |_attempt| {
552                tokio::task::block_in_place(|| {
553                    handle.block_on(self.bot.send_message(chat_id, payload))
554                })
555            },
556            |delay| std::thread::sleep(delay),
557        )
558    }
559
560    /// Attempt to send a document with exponential backoff retries.
561    fn send_document_with_retry(
562        &self,
563        chat_id: i64,
564        file_path: &Path,
565        caption: Option<&str>,
566    ) -> TelegramResult<i32> {
567        use crate::bot::BotApi;
568
569        let handle = tokio::runtime::Handle::try_current().map_err(|_| TelegramError::Send {
570            attempts: 0,
571            reason: "no tokio runtime available for sending".to_string(),
572        })?;
573
574        retry_with_backoff(
575            |_attempt| {
576                tokio::task::block_in_place(|| {
577                    handle.block_on(self.bot.send_document(chat_id, file_path, caption))
578                })
579            },
580            |delay| std::thread::sleep(delay),
581        )
582    }
583
584    /// Attempt to send a photo with exponential backoff retries.
585    fn send_photo_with_retry(
586        &self,
587        chat_id: i64,
588        file_path: &Path,
589        caption: Option<&str>,
590    ) -> TelegramResult<i32> {
591        use crate::bot::BotApi;
592
593        let handle = tokio::runtime::Handle::try_current().map_err(|_| TelegramError::Send {
594            attempts: 0,
595            reason: "no tokio runtime available for sending".to_string(),
596        })?;
597
598        retry_with_backoff(
599            |_attempt| {
600                tokio::task::block_in_place(|| {
601                    handle.block_on(self.bot.send_photo(chat_id, file_path, caption))
602                })
603            },
604            |delay| std::thread::sleep(delay),
605        )
606    }
607
608    /// Poll the events file for a `human.response` event, blocking until one
609    /// arrives or the configured timeout expires.
610    ///
611    /// Polls the given `events_path` every second for new lines containing
612    /// `"human.response"`. On response, removes the pending question and
613    /// returns the response message. On timeout, removes the pending question
614    /// and returns `None`.
615    pub fn wait_for_response(&self, events_path: &Path) -> TelegramResult<Option<String>> {
616        let timeout = Duration::from_secs(self.timeout_secs);
617        let poll_interval = Duration::from_millis(250);
618        let deadline = Instant::now() + timeout;
619
620        // Track file position to only read new lines
621        let initial_pos = if events_path.exists() {
622            std::fs::metadata(events_path).map(|m| m.len()).unwrap_or(0)
623        } else {
624            0
625        };
626        let mut file_pos = initial_pos;
627
628        info!(
629            loop_id = %self.loop_id,
630            timeout_secs = self.timeout_secs,
631            events_path = %events_path.display(),
632            "Waiting for human.response"
633        );
634
635        loop {
636            if Instant::now() >= deadline {
637                warn!(
638                    loop_id = %self.loop_id,
639                    timeout_secs = self.timeout_secs,
640                    "Timed out waiting for human.response"
641                );
642
643                // Remove pending question on timeout
644                if let Ok(mut state) = self.state_manager.load_or_default() {
645                    let _ = self
646                        .state_manager
647                        .remove_pending_question(&mut state, &self.loop_id);
648                }
649
650                return Ok(None);
651            }
652
653            // Check if we've been interrupted (Ctrl+C / SIGTERM / SIGHUP)
654            if self.shutdown.load(Ordering::Relaxed) {
655                info!(loop_id = %self.loop_id, "Interrupted while waiting for human.response");
656                if let Ok(mut state) = self.state_manager.load_or_default() {
657                    let _ = self
658                        .state_manager
659                        .remove_pending_question(&mut state, &self.loop_id);
660                }
661                return Ok(None);
662            }
663
664            // Read new lines from the events file
665            if let Some(response) = Self::check_for_response(events_path, &mut file_pos)? {
666                info!(
667                    loop_id = %self.loop_id,
668                    "Received human.response: {}",
669                    response
670                );
671
672                // Remove pending question on response
673                if let Ok(mut state) = self.state_manager.load_or_default() {
674                    let _ = self
675                        .state_manager
676                        .remove_pending_question(&mut state, &self.loop_id);
677                }
678
679                return Ok(Some(response));
680            }
681
682            std::thread::sleep(poll_interval);
683        }
684    }
685
686    /// Check the events file for a `human.response` event starting from
687    /// `file_pos`. Updates `file_pos` to the new end of file.
688    fn check_for_response(
689        events_path: &Path,
690        file_pos: &mut u64,
691    ) -> TelegramResult<Option<String>> {
692        use std::io::{BufRead, BufReader, Seek, SeekFrom};
693
694        if !events_path.exists() {
695            return Ok(None);
696        }
697
698        let mut file = std::fs::File::open(events_path)?;
699        file.seek(SeekFrom::Start(*file_pos))?;
700
701        let reader = BufReader::new(file);
702        for line in reader.lines() {
703            let line = line?;
704            let line_bytes = line.len() as u64 + 1; // +1 for newline
705            *file_pos += line_bytes;
706
707            if line.trim().is_empty() {
708                continue;
709            }
710
711            // Try to parse as JSON event
712            if let Ok(event) = serde_json::from_str::<serde_json::Value>(&line)
713                && event.get("topic").and_then(|t| t.as_str()) == Some("human.response")
714            {
715                let message = event
716                    .get("payload")
717                    .and_then(|p| p.as_str())
718                    .unwrap_or("")
719                    .to_string();
720                return Ok(Some(message));
721            }
722
723            // Also check pipe-separated format (written by MessageHandler)
724            if line.contains("EVENT: human.response") {
725                // Extract message from pipe-separated format:
726                // EVENT: human.response | message: "..." | timestamp: "..."
727                let message = line
728                    .split('|')
729                    .find(|part| part.trim().starts_with("message:"))
730                    .and_then(|part| {
731                        let value = part.trim().strip_prefix("message:")?;
732                        let trimmed = value.trim().trim_matches('"');
733                        Some(trimmed.to_string())
734                    })
735                    .unwrap_or_default();
736                return Ok(Some(message));
737            }
738        }
739
740        Ok(None)
741    }
742}
743
744impl ralph_proto::RobotService for TelegramService {
745    fn send_question(&self, payload: &str) -> anyhow::Result<i32> {
746        Ok(TelegramService::send_question(self, payload)?)
747    }
748
749    fn wait_for_response(&self, events_path: &Path) -> anyhow::Result<Option<String>> {
750        Ok(TelegramService::wait_for_response(self, events_path)?)
751    }
752
753    fn send_checkin(
754        &self,
755        iteration: u32,
756        elapsed: Duration,
757        context: Option<&ralph_proto::CheckinContext>,
758    ) -> anyhow::Result<i32> {
759        // Convert ralph_proto::CheckinContext to the local CheckinContext
760        let local_context = context.map(|ctx| CheckinContext {
761            current_hat: ctx.current_hat.clone(),
762            open_tasks: ctx.open_tasks,
763            closed_tasks: ctx.closed_tasks,
764            cumulative_cost: ctx.cumulative_cost,
765        });
766        Ok(TelegramService::send_checkin(
767            self,
768            iteration,
769            elapsed,
770            local_context.as_ref(),
771        )?)
772    }
773
774    fn timeout_secs(&self) -> u64 {
775        self.timeout_secs
776    }
777
778    fn shutdown_flag(&self) -> Arc<AtomicBool> {
779        self.shutdown.clone()
780    }
781
782    fn stop(self: Box<Self>) {
783        TelegramService::stop(*self);
784    }
785}
786
787impl fmt::Debug for TelegramService {
788    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
789        f.debug_struct("TelegramService")
790            .field("workspace_root", &self.workspace_root)
791            .field("bot_token", &self.bot_token_masked())
792            .field("timeout_secs", &self.timeout_secs)
793            .finish_non_exhaustive()
794    }
795}
796
797#[cfg(test)]
798mod tests {
799    use super::*;
800    use std::io::Write;
801    use tempfile::TempDir;
802
803    fn test_service(dir: &TempDir) -> TelegramService {
804        TelegramService::new(
805            dir.path().to_path_buf(),
806            Some("test-token-12345".to_string()),
807            300,
808            "main".to_string(),
809        )
810        .unwrap()
811    }
812
813    #[test]
814    fn new_with_explicit_token() {
815        let dir = TempDir::new().unwrap();
816        let service = TelegramService::new(
817            dir.path().to_path_buf(),
818            Some("test-token-12345".to_string()),
819            300,
820            "main".to_string(),
821        );
822        assert!(service.is_ok());
823    }
824
825    #[test]
826    fn new_without_token_fails() {
827        // Only run this test when the env var is not set,
828        // to avoid needing unsafe remove_var
829        if std::env::var("RALPH_TELEGRAM_BOT_TOKEN").is_ok() {
830            return;
831        }
832
833        let dir = TempDir::new().unwrap();
834        let service = TelegramService::new(dir.path().to_path_buf(), None, 300, "main".to_string());
835        assert!(service.is_err());
836        assert!(matches!(
837            service.unwrap_err(),
838            TelegramError::MissingBotToken
839        ));
840    }
841
842    #[test]
843    fn bot_token_masked_works() {
844        let dir = TempDir::new().unwrap();
845        let service = TelegramService::new(
846            dir.path().to_path_buf(),
847            Some("abcd1234efgh5678".to_string()),
848            300,
849            "main".to_string(),
850        )
851        .unwrap();
852        let masked = service.bot_token_masked();
853        assert_eq!(masked, "abcd...5678");
854    }
855
856    #[test]
857    fn loop_id_accessor() {
858        let dir = TempDir::new().unwrap();
859        let service = TelegramService::new(
860            dir.path().to_path_buf(),
861            Some("token".to_string()),
862            60,
863            "feature-auth".to_string(),
864        )
865        .unwrap();
866        assert_eq!(service.loop_id(), "feature-auth");
867    }
868
869    #[test]
870    fn send_question_stores_pending_question() {
871        let dir = TempDir::new().unwrap();
872        let service = test_service(&dir);
873
874        service.send_question("Which DB to use?").unwrap();
875
876        // Verify pending question is stored
877        let state = service.state_manager().load_or_default().unwrap();
878        assert!(
879            state.pending_questions.contains_key("main"),
880            "pending question should be stored for loop_id 'main'"
881        );
882    }
883
884    #[test]
885    fn send_question_returns_message_id() {
886        let dir = TempDir::new().unwrap();
887        let service = test_service(&dir);
888
889        let msg_id = service.send_question("async or sync?").unwrap();
890        // Without a chat_id in state, message_id is 0
891        assert_eq!(msg_id, 0);
892    }
893
894    #[test]
895    fn check_for_response_json_format() {
896        let dir = TempDir::new().unwrap();
897        let events_path = dir.path().join("events.jsonl");
898
899        // Write a non-response event first
900        let mut file = std::fs::File::create(&events_path).unwrap();
901        writeln!(
902            file,
903            r#"{{"topic":"build.done","payload":"tests: pass, lint: pass, typecheck: pass, audit: pass, coverage: pass","ts":"2026-01-30T00:00:00Z"}}"#
904        )
905        .unwrap();
906        // Write a human.response event
907        writeln!(
908            file,
909            r#"{{"topic":"human.response","payload":"Use async","ts":"2026-01-30T00:01:00Z"}}"#
910        )
911        .unwrap();
912        file.flush().unwrap();
913
914        let mut pos = 0;
915        let result = TelegramService::check_for_response(&events_path, &mut pos).unwrap();
916        assert_eq!(result, Some("Use async".to_string()));
917    }
918
919    #[test]
920    fn check_for_response_pipe_format() {
921        let dir = TempDir::new().unwrap();
922        let events_path = dir.path().join("events.jsonl");
923
924        let mut file = std::fs::File::create(&events_path).unwrap();
925        writeln!(
926            file,
927            r#"EVENT: human.response | message: "Use sync" | timestamp: "2026-01-30T00:01:00Z""#
928        )
929        .unwrap();
930        file.flush().unwrap();
931
932        let mut pos = 0;
933        let result = TelegramService::check_for_response(&events_path, &mut pos).unwrap();
934        assert_eq!(result, Some("Use sync".to_string()));
935    }
936
937    #[test]
938    fn check_for_response_skips_non_response_events() {
939        let dir = TempDir::new().unwrap();
940        let events_path = dir.path().join("events.jsonl");
941
942        let mut file = std::fs::File::create(&events_path).unwrap();
943        writeln!(
944            file,
945            r#"{{"topic":"build.done","payload":"done","ts":"2026-01-30T00:00:00Z"}}"#
946        )
947        .unwrap();
948        writeln!(
949            file,
950            r#"{{"topic":"human.guidance","payload":"check errors","ts":"2026-01-30T00:01:00Z"}}"#
951        )
952        .unwrap();
953        file.flush().unwrap();
954
955        let mut pos = 0;
956        let result = TelegramService::check_for_response(&events_path, &mut pos).unwrap();
957        assert_eq!(result, None);
958    }
959
960    #[test]
961    fn check_for_response_missing_file() {
962        let dir = TempDir::new().unwrap();
963        let events_path = dir.path().join("does-not-exist.jsonl");
964
965        let mut pos = 0;
966        let result = TelegramService::check_for_response(&events_path, &mut pos).unwrap();
967        assert_eq!(result, None);
968    }
969
970    #[test]
971    fn check_for_response_tracks_position() {
972        let dir = TempDir::new().unwrap();
973        let events_path = dir.path().join("events.jsonl");
974
975        // Write one event
976        let mut file = std::fs::OpenOptions::new()
977            .create(true)
978            .truncate(true)
979            .write(true)
980            .open(&events_path)
981            .unwrap();
982        writeln!(
983            file,
984            r#"{{"topic":"build.done","payload":"done","ts":"2026-01-30T00:00:00Z"}}"#
985        )
986        .unwrap();
987        file.flush().unwrap();
988
989        let mut pos = 0;
990        let result = TelegramService::check_for_response(&events_path, &mut pos).unwrap();
991        assert_eq!(result, None);
992        assert!(pos > 0, "position should advance after reading");
993
994        let pos_after_first = pos;
995
996        // Append a human.response
997        let mut file = std::fs::OpenOptions::new()
998            .append(true)
999            .open(&events_path)
1000            .unwrap();
1001        writeln!(
1002            file,
1003            r#"{{"topic":"human.response","payload":"yes","ts":"2026-01-30T00:02:00Z"}}"#
1004        )
1005        .unwrap();
1006        file.flush().unwrap();
1007
1008        // Should find the response starting from where we left off
1009        let result = TelegramService::check_for_response(&events_path, &mut pos).unwrap();
1010        assert_eq!(result, Some("yes".to_string()));
1011        assert!(pos > pos_after_first, "position should advance further");
1012    }
1013
1014    #[test]
1015    fn wait_for_response_returns_on_response() {
1016        let dir = TempDir::new().unwrap();
1017        let service = TelegramService::new(
1018            dir.path().to_path_buf(),
1019            Some("token".to_string()),
1020            5, // enough time for the writer thread
1021            "main".to_string(),
1022        )
1023        .unwrap();
1024
1025        let events_path = dir.path().join("events.jsonl");
1026        // Create an empty events file so wait_for_response records position 0
1027        std::fs::File::create(&events_path).unwrap();
1028
1029        // Store a pending question first
1030        service.send_question("Which plan?").unwrap();
1031
1032        // Spawn a thread to write the response after a brief delay
1033        let writer_path = events_path.clone();
1034        let writer = std::thread::spawn(move || {
1035            std::thread::sleep(Duration::from_millis(200));
1036            let mut file = std::fs::OpenOptions::new()
1037                .append(true)
1038                .open(&writer_path)
1039                .unwrap();
1040            writeln!(
1041                file,
1042                r#"{{"topic":"human.response","payload":"Go with plan A","ts":"2026-01-30T00:00:00Z"}}"#
1043            )
1044            .unwrap();
1045            file.flush().unwrap();
1046        });
1047
1048        let result = service.wait_for_response(&events_path).unwrap();
1049        writer.join().unwrap();
1050
1051        assert_eq!(result, Some("Go with plan A".to_string()));
1052
1053        // Pending question should be removed
1054        let state = service.state_manager().load_or_default().unwrap();
1055        assert!(
1056            !state.pending_questions.contains_key("main"),
1057            "pending question should be removed after response"
1058        );
1059    }
1060
1061    #[test]
1062    fn wait_for_response_returns_none_on_timeout() {
1063        let dir = TempDir::new().unwrap();
1064        let service = TelegramService::new(
1065            dir.path().to_path_buf(),
1066            Some("token".to_string()),
1067            1, // 1 second timeout
1068            "main".to_string(),
1069        )
1070        .unwrap();
1071
1072        let events_path = dir.path().join("events.jsonl");
1073        // Create an empty events file with no human.response
1074        std::fs::File::create(&events_path).unwrap();
1075
1076        // Store a pending question
1077        service.send_question("Will this timeout?").unwrap();
1078
1079        let result = service.wait_for_response(&events_path).unwrap();
1080        assert_eq!(result, None, "should return None on timeout");
1081
1082        // Pending question should be removed even on timeout
1083        let state = service.state_manager().load_or_default().unwrap();
1084        assert!(
1085            !state.pending_questions.contains_key("main"),
1086            "pending question should be removed on timeout"
1087        );
1088    }
1089
1090    #[test]
1091    fn retry_with_backoff_succeeds_on_first_attempt() {
1092        let attempts = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
1093        let attempts_clone = attempts.clone();
1094
1095        let result = retry_with_backoff(
1096            |attempt| {
1097                attempts_clone.lock().unwrap().push(attempt);
1098                Ok(42)
1099            },
1100            |_delay| {},
1101        );
1102
1103        assert!(result.is_ok());
1104        assert_eq!(result.unwrap(), 42);
1105        assert_eq!(*attempts.lock().unwrap(), vec![1]);
1106    }
1107
1108    #[test]
1109    fn retry_with_backoff_succeeds_on_second_attempt() {
1110        let attempts = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
1111        let attempts_clone = attempts.clone();
1112        let delays = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
1113        let delays_clone = delays.clone();
1114
1115        let result = retry_with_backoff(
1116            |attempt| {
1117                attempts_clone.lock().unwrap().push(attempt);
1118                if attempt < 2 {
1119                    Err(TelegramError::Send {
1120                        attempts: attempt,
1121                        reason: "transient failure".to_string(),
1122                    })
1123                } else {
1124                    Ok(99)
1125                }
1126            },
1127            |delay| {
1128                delays_clone.lock().unwrap().push(delay);
1129            },
1130        );
1131
1132        assert!(result.is_ok());
1133        assert_eq!(result.unwrap(), 99);
1134        assert_eq!(*attempts.lock().unwrap(), vec![1, 2]);
1135        // First retry delay: 1s * 2^0 = 1s
1136        assert_eq!(*delays.lock().unwrap(), vec![Duration::from_secs(1)]);
1137    }
1138
1139    #[test]
1140    fn retry_with_backoff_succeeds_on_third_attempt() {
1141        let attempts = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
1142        let attempts_clone = attempts.clone();
1143        let delays = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
1144        let delays_clone = delays.clone();
1145
1146        let result = retry_with_backoff(
1147            |attempt| {
1148                attempts_clone.lock().unwrap().push(attempt);
1149                if attempt < 3 {
1150                    Err(TelegramError::Send {
1151                        attempts: attempt,
1152                        reason: "transient failure".to_string(),
1153                    })
1154                } else {
1155                    Ok(7)
1156                }
1157            },
1158            |delay| {
1159                delays_clone.lock().unwrap().push(delay);
1160            },
1161        );
1162
1163        assert!(result.is_ok());
1164        assert_eq!(result.unwrap(), 7);
1165        assert_eq!(*attempts.lock().unwrap(), vec![1, 2, 3]);
1166        // Delays: 1s * 2^0 = 1s, 1s * 2^1 = 2s
1167        assert_eq!(
1168            *delays.lock().unwrap(),
1169            vec![Duration::from_secs(1), Duration::from_secs(2)]
1170        );
1171    }
1172
1173    #[test]
1174    fn retry_with_backoff_fails_after_all_retries() {
1175        let attempts = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
1176        let attempts_clone = attempts.clone();
1177        let delays = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
1178        let delays_clone = delays.clone();
1179
1180        let result = retry_with_backoff(
1181            |attempt| {
1182                attempts_clone.lock().unwrap().push(attempt);
1183                Err(TelegramError::Send {
1184                    attempts: attempt,
1185                    reason: format!("failure on attempt {}", attempt),
1186                })
1187            },
1188            |delay| {
1189                delays_clone.lock().unwrap().push(delay);
1190            },
1191        );
1192
1193        assert!(result.is_err());
1194        let err = result.unwrap_err();
1195        assert!(matches!(
1196            err,
1197            TelegramError::Send {
1198                attempts: 3,
1199                reason: _
1200            }
1201        ));
1202        // Should report the last error message
1203        if let TelegramError::Send { reason, .. } = &err {
1204            assert!(reason.contains("failure on attempt 3"));
1205        }
1206        assert_eq!(*attempts.lock().unwrap(), vec![1, 2, 3]);
1207        // Delays: 1s, 2s (no delay after final attempt)
1208        assert_eq!(
1209            *delays.lock().unwrap(),
1210            vec![Duration::from_secs(1), Duration::from_secs(2)]
1211        );
1212    }
1213
1214    #[test]
1215    fn retry_with_backoff_exponential_delays_are_correct() {
1216        let delays = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
1217        let delays_clone = delays.clone();
1218
1219        let _ = retry_with_backoff(
1220            |_attempt| {
1221                Err(TelegramError::Send {
1222                    attempts: 1,
1223                    reason: "always fail".to_string(),
1224                })
1225            },
1226            |delay| {
1227                delays_clone.lock().unwrap().push(delay);
1228            },
1229        );
1230
1231        let recorded = delays.lock().unwrap().clone();
1232        // Backoff: 1s * 2^0 = 1s, 1s * 2^1 = 2s (no sleep after 3rd attempt)
1233        assert_eq!(recorded.len(), 2);
1234        assert_eq!(recorded[0], Duration::from_secs(1));
1235        assert_eq!(recorded[1], Duration::from_secs(2));
1236    }
1237
1238    #[test]
1239    fn checkin_context_default() {
1240        let ctx = CheckinContext::default();
1241        assert!(ctx.current_hat.is_none());
1242        assert_eq!(ctx.open_tasks, 0);
1243        assert_eq!(ctx.closed_tasks, 0);
1244        assert!(ctx.cumulative_cost.abs() < f64::EPSILON);
1245    }
1246
1247    #[test]
1248    fn checkin_context_with_hat_and_tasks() {
1249        let ctx = CheckinContext {
1250            current_hat: Some("executor".to_string()),
1251            open_tasks: 3,
1252            closed_tasks: 5,
1253            cumulative_cost: 1.2345,
1254        };
1255        assert_eq!(ctx.current_hat.as_deref(), Some("executor"));
1256        assert_eq!(ctx.open_tasks, 3);
1257        assert_eq!(ctx.closed_tasks, 5);
1258        assert!((ctx.cumulative_cost - 1.2345).abs() < f64::EPSILON);
1259    }
1260
1261    #[test]
1262    fn wait_for_response_returns_none_on_shutdown() {
1263        let dir = TempDir::new().unwrap();
1264        let service = TelegramService::new(
1265            dir.path().to_path_buf(),
1266            Some("token".to_string()),
1267            60, // long timeout — shutdown flag should preempt it
1268            "main".to_string(),
1269        )
1270        .unwrap();
1271
1272        let events_path = dir.path().join("events.jsonl");
1273        std::fs::File::create(&events_path).unwrap();
1274
1275        // Set shutdown flag before calling wait_for_response
1276        service.shutdown_flag().store(true, Ordering::Relaxed);
1277
1278        let start = Instant::now();
1279        let result = service.wait_for_response(&events_path).unwrap();
1280        let elapsed = start.elapsed();
1281
1282        assert_eq!(result, None, "should return None when shutdown flag is set");
1283        assert!(
1284            elapsed < Duration::from_secs(2),
1285            "should return quickly, not wait for timeout (elapsed: {:?})",
1286            elapsed
1287        );
1288    }
1289}