1use std::fmt;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::time::{Duration, Instant};
6
7use chrono::Utc;
8use tracing::{debug, info, warn};
9
10use crate::bot::TelegramBot;
11use crate::error::{TelegramError, TelegramResult};
12use crate::handler::MessageHandler;
13use crate::state::StateManager;
14
15pub const MAX_SEND_RETRIES: u32 = 3;
17
18pub const BASE_RETRY_DELAY: Duration = Duration::from_secs(1);
20
21pub fn retry_with_backoff<F, S>(mut send_fn: F, mut sleep_fn: S) -> TelegramResult<i32>
29where
30 F: FnMut(u32) -> TelegramResult<i32>,
31 S: FnMut(Duration),
32{
33 let mut last_error = String::new();
34
35 for attempt in 1..=MAX_SEND_RETRIES {
36 match send_fn(attempt) {
37 Ok(msg_id) => return Ok(msg_id),
38 Err(e) => {
39 last_error = e.to_string();
40 warn!(
41 attempt = attempt,
42 max_retries = MAX_SEND_RETRIES,
43 error = %last_error,
44 "Telegram send failed, {}",
45 if attempt < MAX_SEND_RETRIES {
46 "retrying with backoff"
47 } else {
48 "all retries exhausted"
49 }
50 );
51 if attempt < MAX_SEND_RETRIES {
52 let delay = BASE_RETRY_DELAY * 2u32.pow(attempt - 1);
53 sleep_fn(delay);
54 }
55 }
56 }
57 }
58
59 Err(TelegramError::Send {
60 attempts: MAX_SEND_RETRIES,
61 reason: last_error,
62 })
63}
64
65#[derive(Debug, Default)]
70pub struct CheckinContext {
71 pub current_hat: Option<String>,
73 pub open_tasks: usize,
75 pub closed_tasks: usize,
77 pub cumulative_cost: f64,
79}
80
81pub struct TelegramService {
86 workspace_root: PathBuf,
87 bot_token: String,
88 api_url: Option<String>,
89 timeout_secs: u64,
90 loop_id: String,
91 state_manager: StateManager,
92 handler: MessageHandler,
93 bot: TelegramBot,
94 shutdown: Arc<AtomicBool>,
95}
96
97impl TelegramService {
98 pub fn new(
104 workspace_root: PathBuf,
105 bot_token: Option<String>,
106 api_url: Option<String>,
107 timeout_secs: u64,
108 loop_id: String,
109 ) -> TelegramResult<Self> {
110 let resolved_token = bot_token
111 .or_else(|| std::env::var("RALPH_TELEGRAM_BOT_TOKEN").ok())
112 .ok_or(TelegramError::MissingBotToken)?;
113
114 let state_path = workspace_root.join(".ralph/telegram-state.json");
115 let state_manager = StateManager::new(&state_path);
116 let handler_state_manager = StateManager::new(&state_path);
117 let handler = MessageHandler::new(handler_state_manager, &workspace_root);
118 let bot = TelegramBot::new(&resolved_token, api_url.as_deref());
119 let shutdown = Arc::new(AtomicBool::new(false));
120
121 Ok(Self {
122 workspace_root,
123 bot_token: resolved_token,
124 api_url,
125 timeout_secs,
126 loop_id,
127 state_manager,
128 handler,
129 bot,
130 shutdown,
131 })
132 }
133
134 pub fn workspace_root(&self) -> &PathBuf {
136 &self.workspace_root
137 }
138
139 pub fn timeout_secs(&self) -> u64 {
141 self.timeout_secs
142 }
143
144 pub fn bot_token_masked(&self) -> String {
146 if self.bot_token.len() > 8 {
147 format!(
148 "{}...{}",
149 &self.bot_token[..4],
150 &self.bot_token[self.bot_token.len() - 4..]
151 )
152 } else {
153 "****".to_string()
154 }
155 }
156
157 pub fn state_manager(&self) -> &StateManager {
159 &self.state_manager
160 }
161
162 pub fn handler(&mut self) -> &mut MessageHandler {
164 &mut self.handler
165 }
166
167 pub fn loop_id(&self) -> &str {
169 &self.loop_id
170 }
171
172 pub fn shutdown_flag(&self) -> Arc<AtomicBool> {
177 self.shutdown.clone()
178 }
179
180 pub fn start(&self) -> TelegramResult<()> {
185 info!(
186 bot_token = %self.bot_token_masked(),
187 workspace = %self.workspace_root.display(),
188 timeout_secs = self.timeout_secs,
189 "Telegram service starting"
190 );
191
192 let handle = tokio::runtime::Handle::try_current().map_err(|_| {
194 TelegramError::Startup("no tokio runtime available for polling".to_string())
195 })?;
196
197 let raw_bot =
198 crate::apply_api_url(teloxide::Bot::new(&self.bot_token), self.api_url.as_deref());
199 let workspace_root = self.workspace_root.clone();
200 let state_path = self.workspace_root.join(".ralph/telegram-state.json");
201 let shutdown = self.shutdown.clone();
202 let loop_id = self.loop_id.clone();
203
204 handle.spawn(async move {
205 Self::poll_updates(raw_bot, workspace_root, state_path, shutdown, loop_id).await;
206 });
207
208 if let Ok(state) = self.state_manager.load_or_default()
210 && let Some(chat_id) = state.chat_id
211 {
212 let greeting = crate::bot::TelegramBot::format_greeting(&self.loop_id);
213 match self.send_with_retry(chat_id, &greeting) {
214 Ok(_) => info!("Sent greeting to chat {}", chat_id),
215 Err(e) => warn!(error = %e, "Failed to send greeting"),
216 }
217 }
218
219 info!("Telegram service started — polling for incoming messages");
220 Ok(())
221 }
222
223 async fn poll_updates(
228 bot: teloxide::Bot,
229 workspace_root: PathBuf,
230 state_path: PathBuf,
231 shutdown: Arc<AtomicBool>,
232 loop_id: String,
233 ) {
234 use teloxide::payloads::{GetUpdatesSetters, SetMessageReactionSetters};
235 use teloxide::requests::Requester;
236
237 let state_manager = StateManager::new(&state_path);
238 let handler_state_manager = StateManager::new(&state_path);
239 let handler = MessageHandler::new(handler_state_manager, &workspace_root);
240 let mut offset: i32 = 0;
241
242 if let Ok(state) = state_manager.load_or_default()
243 && let Some(last_update_id) = state.last_update_id
244 {
245 offset = last_update_id + 1;
246 }
247
248 Self::register_commands(&bot).await;
250
251 info!(loop_id = %loop_id, "Telegram polling task started");
252
253 while !shutdown.load(Ordering::Relaxed) {
254 let request = bot.get_updates().offset(offset).timeout(10);
255 match request.await {
256 Ok(updates) => {
257 for update in updates {
258 #[allow(clippy::cast_possible_wrap)]
260 {
261 offset = update.id.0 as i32 + 1;
262 }
263
264 let msg = match update.kind {
266 teloxide::types::UpdateKind::Message(msg) => msg,
267 _ => continue,
268 };
269
270 let text = match msg.text() {
271 Some(t) => t,
272 None => continue,
273 };
274
275 let chat_id = msg.chat.id.0;
276 let reply_to: Option<i32> = msg.reply_to_message().map(|r| r.id.0);
277
278 info!(
279 chat_id = chat_id,
280 text = %text,
281 "Received Telegram message"
282 );
283
284 if crate::commands::is_command(text) {
287 let response = crate::commands::handle_command(text, &workspace_root)
288 .unwrap_or_else(|| {
289 "Unknown command. Use /help for the supported commands."
290 .to_string()
291 });
292
293 use teloxide::payloads::SendMessageSetters;
294 let send_result = bot
295 .send_message(teloxide::types::ChatId(chat_id), &response)
296 .parse_mode(teloxide::types::ParseMode::Html)
297 .await;
298 if let Err(e) = send_result {
299 warn!(error = %e, "Failed to send command response");
300 }
301 continue;
302 }
303
304 let mut state = match state_manager.load_or_default() {
305 Ok(s) => s,
306 Err(e) => {
307 warn!(error = %e, "Failed to load Telegram state");
308 continue;
309 }
310 };
311
312 match handler.handle_message(&mut state, text, chat_id, reply_to) {
313 Ok(topic) => {
314 let emoji = if topic == "human.response" {
315 "👍"
316 } else {
317 "👀"
318 };
319 let react_result = bot
320 .set_message_reaction(teloxide::types::ChatId(chat_id), msg.id)
321 .reaction(vec![teloxide::types::ReactionType::Emoji {
322 emoji: emoji.to_string(),
323 }])
324 .await;
325 if let Err(e) = react_result {
326 warn!(error = %e, "Failed to react to message");
327 }
328
329 if topic == "human.guidance" {
331 let _ = bot
332 .send_message(
333 teloxide::types::ChatId(chat_id),
334 "📝 <b>Guidance received</b> — will apply next iteration.",
335 )
336 .await;
337 }
338 }
339 Err(e) => {
340 warn!(
341 error = %e,
342 text = %text,
343 "Failed to handle incoming Telegram message"
344 );
345 }
346 }
347
348 state.last_seen = Some(Utc::now());
349 state.last_update_id = Some(offset.saturating_sub(1));
350 if let Err(e) = state_manager.save(&state) {
351 warn!(error = %e, "Failed to persist Telegram state");
352 }
353 }
354 }
355 Err(e) => {
356 if !shutdown.load(Ordering::Relaxed) {
357 warn!(error = %e, "Telegram polling error — retrying in 5s");
358 tokio::time::sleep(Duration::from_secs(5)).await;
359 }
360 }
361 }
362 }
363
364 info!(loop_id = %loop_id, "Telegram polling task stopped");
365 }
366
367 async fn register_commands(bot: &teloxide::Bot) {
369 use teloxide::requests::Requester;
370 use teloxide::types::BotCommand;
371
372 let commands = vec![
373 BotCommand::new("status", "Current loop status"),
374 BotCommand::new("tasks", "Open tasks"),
375 BotCommand::new("memories", "Recent memories"),
376 BotCommand::new("tail", "Last 20 events"),
377 BotCommand::new("model", "Show current backend/model"),
378 BotCommand::new("models", "Show configured model options"),
379 BotCommand::new("restart", "Restart the loop"),
380 BotCommand::new("stop", "Stop the loop"),
381 BotCommand::new("help", "List available commands"),
382 ];
383
384 match bot.set_my_commands(commands).await {
385 Ok(_) => info!("Registered bot commands with Telegram API"),
386 Err(e) => warn!(error = %e, "Failed to register bot commands"),
387 }
388 }
389
390 pub fn stop(self) {
394 if let Ok(state) = self.state_manager.load_or_default()
396 && let Some(chat_id) = state.chat_id
397 {
398 let farewell = crate::bot::TelegramBot::format_farewell(&self.loop_id);
399 match self.send_with_retry(chat_id, &farewell) {
400 Ok(_) => info!("Sent farewell to chat {}", chat_id),
401 Err(e) => warn!(error = %e, "Failed to send farewell"),
402 }
403 }
404
405 self.shutdown.store(true, Ordering::Relaxed);
406 info!(
407 workspace = %self.workspace_root.display(),
408 "Telegram service stopped"
409 );
410 }
411
412 pub fn send_question(&self, payload: &str) -> TelegramResult<i32> {
422 let mut state = self.state_manager.load_or_default()?;
423
424 let message_id = if let Some(chat_id) = state.chat_id {
425 self.send_with_retry(chat_id, payload)?
426 } else {
427 warn!(
428 loop_id = %self.loop_id,
429 "No chat ID configured — human.interact question logged but not sent: {}",
430 payload
431 );
432 0
433 };
434
435 self.state_manager
436 .add_pending_question(&mut state, &self.loop_id, message_id)?;
437
438 debug!(
439 loop_id = %self.loop_id,
440 message_id = message_id,
441 "Stored pending question"
442 );
443
444 Ok(message_id)
445 }
446
447 pub fn send_checkin(
457 &self,
458 iteration: u32,
459 elapsed: Duration,
460 context: Option<&CheckinContext>,
461 ) -> TelegramResult<i32> {
462 let state = self.state_manager.load_or_default()?;
463 let Some(chat_id) = state.chat_id else {
464 debug!(
465 loop_id = %self.loop_id,
466 "No chat ID configured — skipping check-in"
467 );
468 return Ok(0);
469 };
470
471 let elapsed_secs = elapsed.as_secs();
472 let minutes = elapsed_secs / 60;
473 let seconds = elapsed_secs % 60;
474 let elapsed_str = if minutes > 0 {
475 format!("{}m {}s", minutes, seconds)
476 } else {
477 format!("{}s", seconds)
478 };
479
480 let msg = match context {
481 Some(ctx) => {
482 let mut lines = vec![format!(
483 "Still working — iteration <b>{}</b>, <code>{}</code> elapsed.",
484 iteration, elapsed_str
485 )];
486
487 if let Some(hat) = &ctx.current_hat {
488 lines.push(format!(
489 "Hat: <code>{}</code>",
490 crate::bot::escape_html(hat)
491 ));
492 }
493
494 if ctx.open_tasks > 0 || ctx.closed_tasks > 0 {
495 lines.push(format!(
496 "Tasks: <b>{}</b> open, {} closed",
497 ctx.open_tasks, ctx.closed_tasks
498 ));
499 }
500
501 if ctx.cumulative_cost > 0.0 {
502 lines.push(format!("Cost: <code>${:.4}</code>", ctx.cumulative_cost));
503 }
504
505 lines.join("\n")
506 }
507 None => format!(
508 "Still working — iteration <b>{}</b>, <code>{}</code> elapsed.",
509 iteration, elapsed_str
510 ),
511 };
512 self.send_with_retry(chat_id, &msg)
513 }
514
515 pub fn send_document(&self, file_path: &Path, caption: Option<&str>) -> TelegramResult<i32> {
520 let state = self.state_manager.load_or_default()?;
521 let Some(chat_id) = state.chat_id else {
522 warn!(
523 loop_id = %self.loop_id,
524 file = %file_path.display(),
525 "No chat ID configured — document not sent"
526 );
527 return Ok(0);
528 };
529
530 self.send_document_with_retry(chat_id, file_path, caption)
531 }
532
533 pub fn send_photo(&self, file_path: &Path, caption: Option<&str>) -> TelegramResult<i32> {
538 let state = self.state_manager.load_or_default()?;
539 let Some(chat_id) = state.chat_id else {
540 warn!(
541 loop_id = %self.loop_id,
542 file = %file_path.display(),
543 "No chat ID configured — photo not sent"
544 );
545 return Ok(0);
546 };
547
548 self.send_photo_with_retry(chat_id, file_path, caption)
549 }
550
551 fn send_with_retry(&self, chat_id: i64, payload: &str) -> TelegramResult<i32> {
556 use crate::bot::BotApi;
557
558 let handle = tokio::runtime::Handle::try_current().map_err(|_| TelegramError::Send {
559 attempts: 0,
560 reason: "no tokio runtime available for sending".to_string(),
561 })?;
562
563 retry_with_backoff(
564 |_attempt| {
565 tokio::task::block_in_place(|| {
566 handle.block_on(self.bot.send_message(chat_id, payload))
567 })
568 },
569 |delay| std::thread::sleep(delay),
570 )
571 }
572
573 fn send_document_with_retry(
575 &self,
576 chat_id: i64,
577 file_path: &Path,
578 caption: Option<&str>,
579 ) -> TelegramResult<i32> {
580 use crate::bot::BotApi;
581
582 let handle = tokio::runtime::Handle::try_current().map_err(|_| TelegramError::Send {
583 attempts: 0,
584 reason: "no tokio runtime available for sending".to_string(),
585 })?;
586
587 retry_with_backoff(
588 |_attempt| {
589 tokio::task::block_in_place(|| {
590 handle.block_on(self.bot.send_document(chat_id, file_path, caption))
591 })
592 },
593 |delay| std::thread::sleep(delay),
594 )
595 }
596
597 fn send_photo_with_retry(
599 &self,
600 chat_id: i64,
601 file_path: &Path,
602 caption: Option<&str>,
603 ) -> TelegramResult<i32> {
604 use crate::bot::BotApi;
605
606 let handle = tokio::runtime::Handle::try_current().map_err(|_| TelegramError::Send {
607 attempts: 0,
608 reason: "no tokio runtime available for sending".to_string(),
609 })?;
610
611 retry_with_backoff(
612 |_attempt| {
613 tokio::task::block_in_place(|| {
614 handle.block_on(self.bot.send_photo(chat_id, file_path, caption))
615 })
616 },
617 |delay| std::thread::sleep(delay),
618 )
619 }
620
621 pub fn wait_for_response(&self, events_path: &Path) -> TelegramResult<Option<String>> {
629 let timeout = Duration::from_secs(self.timeout_secs);
630 let poll_interval = Duration::from_millis(250);
631 let deadline = Instant::now() + timeout;
632
633 let initial_pos = if events_path.exists() {
635 std::fs::metadata(events_path).map(|m| m.len()).unwrap_or(0)
636 } else {
637 0
638 };
639 let mut file_pos = initial_pos;
640
641 info!(
642 loop_id = %self.loop_id,
643 timeout_secs = self.timeout_secs,
644 events_path = %events_path.display(),
645 "Waiting for human.response"
646 );
647
648 loop {
649 if Instant::now() >= deadline {
650 warn!(
651 loop_id = %self.loop_id,
652 timeout_secs = self.timeout_secs,
653 "Timed out waiting for human.response"
654 );
655
656 if let Ok(mut state) = self.state_manager.load_or_default() {
658 let _ = self
659 .state_manager
660 .remove_pending_question(&mut state, &self.loop_id);
661 }
662
663 return Ok(None);
664 }
665
666 if self.shutdown.load(Ordering::Relaxed) {
668 info!(loop_id = %self.loop_id, "Interrupted while waiting for human.response");
669 if let Ok(mut state) = self.state_manager.load_or_default() {
670 let _ = self
671 .state_manager
672 .remove_pending_question(&mut state, &self.loop_id);
673 }
674 return Ok(None);
675 }
676
677 if let Some(response) = Self::check_for_response(events_path, &mut file_pos)? {
679 info!(
680 loop_id = %self.loop_id,
681 "Received human.response: {}",
682 response
683 );
684
685 if let Ok(mut state) = self.state_manager.load_or_default() {
687 let _ = self
688 .state_manager
689 .remove_pending_question(&mut state, &self.loop_id);
690 }
691
692 return Ok(Some(response));
693 }
694
695 std::thread::sleep(poll_interval);
696 }
697 }
698
699 fn check_for_response(
702 events_path: &Path,
703 file_pos: &mut u64,
704 ) -> TelegramResult<Option<String>> {
705 use std::io::{BufRead, BufReader, Seek, SeekFrom};
706
707 if !events_path.exists() {
708 return Ok(None);
709 }
710
711 let mut file = std::fs::File::open(events_path)?;
712 file.seek(SeekFrom::Start(*file_pos))?;
713
714 let reader = BufReader::new(file);
715 for line in reader.lines() {
716 let line = line?;
717 let line_bytes = line.len() as u64 + 1; *file_pos += line_bytes;
719
720 if line.trim().is_empty() {
721 continue;
722 }
723
724 if let Ok(event) = serde_json::from_str::<serde_json::Value>(&line)
726 && event.get("topic").and_then(|t| t.as_str()) == Some("human.response")
727 {
728 let message = event
729 .get("payload")
730 .and_then(|p| p.as_str())
731 .unwrap_or("")
732 .to_string();
733 return Ok(Some(message));
734 }
735
736 if line.contains("EVENT: human.response") {
738 let message = line
741 .split('|')
742 .find(|part| part.trim().starts_with("message:"))
743 .and_then(|part| {
744 let value = part.trim().strip_prefix("message:")?;
745 let trimmed = value.trim().trim_matches('"');
746 Some(trimmed.to_string())
747 })
748 .unwrap_or_default();
749 return Ok(Some(message));
750 }
751 }
752
753 Ok(None)
754 }
755}
756
757impl ralph_proto::RobotService for TelegramService {
758 fn send_question(&self, payload: &str) -> anyhow::Result<i32> {
759 Ok(TelegramService::send_question(self, payload)?)
760 }
761
762 fn wait_for_response(&self, events_path: &Path) -> anyhow::Result<Option<String>> {
763 Ok(TelegramService::wait_for_response(self, events_path)?)
764 }
765
766 fn send_checkin(
767 &self,
768 iteration: u32,
769 elapsed: Duration,
770 context: Option<&ralph_proto::CheckinContext>,
771 ) -> anyhow::Result<i32> {
772 let local_context = context.map(|ctx| CheckinContext {
774 current_hat: ctx.current_hat.clone(),
775 open_tasks: ctx.open_tasks,
776 closed_tasks: ctx.closed_tasks,
777 cumulative_cost: ctx.cumulative_cost,
778 });
779 Ok(TelegramService::send_checkin(
780 self,
781 iteration,
782 elapsed,
783 local_context.as_ref(),
784 )?)
785 }
786
787 fn timeout_secs(&self) -> u64 {
788 self.timeout_secs
789 }
790
791 fn shutdown_flag(&self) -> Arc<AtomicBool> {
792 self.shutdown.clone()
793 }
794
795 fn stop(self: Box<Self>) {
796 TelegramService::stop(*self);
797 }
798}
799
800impl fmt::Debug for TelegramService {
801 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
802 f.debug_struct("TelegramService")
803 .field("workspace_root", &self.workspace_root)
804 .field("bot_token", &self.bot_token_masked())
805 .field("timeout_secs", &self.timeout_secs)
806 .finish_non_exhaustive()
807 }
808}
809
810#[cfg(test)]
811mod tests {
812 use super::*;
813 use std::io::Write;
814 use tempfile::TempDir;
815
816 fn test_service(dir: &TempDir) -> TelegramService {
817 TelegramService::new(
818 dir.path().to_path_buf(),
819 Some("test-token-12345".to_string()),
820 None,
821 300,
822 "main".to_string(),
823 )
824 .unwrap()
825 }
826
827 #[test]
828 fn new_with_explicit_token() {
829 let dir = TempDir::new().unwrap();
830 let service = TelegramService::new(
831 dir.path().to_path_buf(),
832 Some("test-token-12345".to_string()),
833 None,
834 300,
835 "main".to_string(),
836 );
837 assert!(service.is_ok());
838 }
839
840 #[test]
841 fn new_without_token_fails() {
842 if std::env::var("RALPH_TELEGRAM_BOT_TOKEN").is_ok() {
845 return;
846 }
847
848 let dir = TempDir::new().unwrap();
849 let service = TelegramService::new(
850 dir.path().to_path_buf(),
851 None,
852 None,
853 300,
854 "main".to_string(),
855 );
856 assert!(service.is_err());
857 assert!(matches!(
858 service.unwrap_err(),
859 TelegramError::MissingBotToken
860 ));
861 }
862
863 #[test]
864 fn bot_token_masked_works() {
865 let dir = TempDir::new().unwrap();
866 let service = TelegramService::new(
867 dir.path().to_path_buf(),
868 Some("abcd1234efgh5678".to_string()),
869 None,
870 300,
871 "main".to_string(),
872 )
873 .unwrap();
874 let masked = service.bot_token_masked();
875 assert_eq!(masked, "abcd...5678");
876 }
877
878 #[test]
879 fn loop_id_accessor() {
880 let dir = TempDir::new().unwrap();
881 let service = TelegramService::new(
882 dir.path().to_path_buf(),
883 Some("token".to_string()),
884 None,
885 60,
886 "feature-auth".to_string(),
887 )
888 .unwrap();
889 assert_eq!(service.loop_id(), "feature-auth");
890 }
891
892 #[test]
893 fn send_question_stores_pending_question() {
894 let dir = TempDir::new().unwrap();
895 let service = test_service(&dir);
896
897 service.send_question("Which DB to use?").unwrap();
898
899 let state = service.state_manager().load_or_default().unwrap();
901 assert!(
902 state.pending_questions.contains_key("main"),
903 "pending question should be stored for loop_id 'main'"
904 );
905 }
906
907 #[test]
908 fn send_question_returns_message_id() {
909 let dir = TempDir::new().unwrap();
910 let service = test_service(&dir);
911
912 let msg_id = service.send_question("async or sync?").unwrap();
913 assert_eq!(msg_id, 0);
915 }
916
917 #[test]
918 fn check_for_response_json_format() {
919 let dir = TempDir::new().unwrap();
920 let events_path = dir.path().join("events.jsonl");
921
922 let mut file = std::fs::File::create(&events_path).unwrap();
924 writeln!(
925 file,
926 r#"{{"topic":"build.done","payload":"tests: pass, lint: pass, typecheck: pass, audit: pass, coverage: pass","ts":"2026-01-30T00:00:00Z"}}"#
927 )
928 .unwrap();
929 writeln!(
931 file,
932 r#"{{"topic":"human.response","payload":"Use async","ts":"2026-01-30T00:01:00Z"}}"#
933 )
934 .unwrap();
935 file.flush().unwrap();
936
937 let mut pos = 0;
938 let result = TelegramService::check_for_response(&events_path, &mut pos).unwrap();
939 assert_eq!(result, Some("Use async".to_string()));
940 }
941
942 #[test]
943 fn check_for_response_pipe_format() {
944 let dir = TempDir::new().unwrap();
945 let events_path = dir.path().join("events.jsonl");
946
947 let mut file = std::fs::File::create(&events_path).unwrap();
948 writeln!(
949 file,
950 r#"EVENT: human.response | message: "Use sync" | timestamp: "2026-01-30T00:01:00Z""#
951 )
952 .unwrap();
953 file.flush().unwrap();
954
955 let mut pos = 0;
956 let result = TelegramService::check_for_response(&events_path, &mut pos).unwrap();
957 assert_eq!(result, Some("Use sync".to_string()));
958 }
959
960 #[test]
961 fn check_for_response_skips_non_response_events() {
962 let dir = TempDir::new().unwrap();
963 let events_path = dir.path().join("events.jsonl");
964
965 let mut file = std::fs::File::create(&events_path).unwrap();
966 writeln!(
967 file,
968 r#"{{"topic":"build.done","payload":"done","ts":"2026-01-30T00:00:00Z"}}"#
969 )
970 .unwrap();
971 writeln!(
972 file,
973 r#"{{"topic":"human.guidance","payload":"check errors","ts":"2026-01-30T00:01:00Z"}}"#
974 )
975 .unwrap();
976 file.flush().unwrap();
977
978 let mut pos = 0;
979 let result = TelegramService::check_for_response(&events_path, &mut pos).unwrap();
980 assert_eq!(result, None);
981 }
982
983 #[test]
984 fn check_for_response_missing_file() {
985 let dir = TempDir::new().unwrap();
986 let events_path = dir.path().join("does-not-exist.jsonl");
987
988 let mut pos = 0;
989 let result = TelegramService::check_for_response(&events_path, &mut pos).unwrap();
990 assert_eq!(result, None);
991 }
992
993 #[test]
994 fn check_for_response_tracks_position() {
995 let dir = TempDir::new().unwrap();
996 let events_path = dir.path().join("events.jsonl");
997
998 let mut file = std::fs::OpenOptions::new()
1000 .create(true)
1001 .truncate(true)
1002 .write(true)
1003 .open(&events_path)
1004 .unwrap();
1005 writeln!(
1006 file,
1007 r#"{{"topic":"build.done","payload":"done","ts":"2026-01-30T00:00:00Z"}}"#
1008 )
1009 .unwrap();
1010 file.flush().unwrap();
1011
1012 let mut pos = 0;
1013 let result = TelegramService::check_for_response(&events_path, &mut pos).unwrap();
1014 assert_eq!(result, None);
1015 assert!(pos > 0, "position should advance after reading");
1016
1017 let pos_after_first = pos;
1018
1019 let mut file = std::fs::OpenOptions::new()
1021 .append(true)
1022 .open(&events_path)
1023 .unwrap();
1024 writeln!(
1025 file,
1026 r#"{{"topic":"human.response","payload":"yes","ts":"2026-01-30T00:02:00Z"}}"#
1027 )
1028 .unwrap();
1029 file.flush().unwrap();
1030
1031 let result = TelegramService::check_for_response(&events_path, &mut pos).unwrap();
1033 assert_eq!(result, Some("yes".to_string()));
1034 assert!(pos > pos_after_first, "position should advance further");
1035 }
1036
1037 #[test]
1038 fn wait_for_response_returns_on_response() {
1039 let dir = TempDir::new().unwrap();
1040 let service = TelegramService::new(
1041 dir.path().to_path_buf(),
1042 Some("token".to_string()),
1043 None,
1044 5, "main".to_string(),
1046 )
1047 .unwrap();
1048
1049 let events_path = dir.path().join("events.jsonl");
1050 std::fs::File::create(&events_path).unwrap();
1052
1053 service.send_question("Which plan?").unwrap();
1055
1056 let writer_path = events_path.clone();
1058 let writer = std::thread::spawn(move || {
1059 std::thread::sleep(Duration::from_millis(200));
1060 let mut file = std::fs::OpenOptions::new()
1061 .append(true)
1062 .open(&writer_path)
1063 .unwrap();
1064 writeln!(
1065 file,
1066 r#"{{"topic":"human.response","payload":"Go with plan A","ts":"2026-01-30T00:00:00Z"}}"#
1067 )
1068 .unwrap();
1069 file.flush().unwrap();
1070 });
1071
1072 let result = service.wait_for_response(&events_path).unwrap();
1073 writer.join().unwrap();
1074
1075 assert_eq!(result, Some("Go with plan A".to_string()));
1076
1077 let state = service.state_manager().load_or_default().unwrap();
1079 assert!(
1080 !state.pending_questions.contains_key("main"),
1081 "pending question should be removed after response"
1082 );
1083 }
1084
1085 #[test]
1086 fn wait_for_response_returns_none_on_timeout() {
1087 let dir = TempDir::new().unwrap();
1088 let service = TelegramService::new(
1089 dir.path().to_path_buf(),
1090 Some("token".to_string()),
1091 None,
1092 1, "main".to_string(),
1094 )
1095 .unwrap();
1096
1097 let events_path = dir.path().join("events.jsonl");
1098 std::fs::File::create(&events_path).unwrap();
1100
1101 service.send_question("Will this timeout?").unwrap();
1103
1104 let result = service.wait_for_response(&events_path).unwrap();
1105 assert_eq!(result, None, "should return None on timeout");
1106
1107 let state = service.state_manager().load_or_default().unwrap();
1109 assert!(
1110 !state.pending_questions.contains_key("main"),
1111 "pending question should be removed on timeout"
1112 );
1113 }
1114
1115 #[test]
1116 fn retry_with_backoff_succeeds_on_first_attempt() {
1117 let attempts = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
1118 let attempts_clone = attempts.clone();
1119
1120 let result = retry_with_backoff(
1121 |attempt| {
1122 attempts_clone.lock().unwrap().push(attempt);
1123 Ok(42)
1124 },
1125 |_delay| {},
1126 );
1127
1128 assert!(result.is_ok());
1129 assert_eq!(result.unwrap(), 42);
1130 assert_eq!(*attempts.lock().unwrap(), vec![1]);
1131 }
1132
1133 #[test]
1134 fn retry_with_backoff_succeeds_on_second_attempt() {
1135 let attempts = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
1136 let attempts_clone = attempts.clone();
1137 let delays = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
1138 let delays_clone = delays.clone();
1139
1140 let result = retry_with_backoff(
1141 |attempt| {
1142 attempts_clone.lock().unwrap().push(attempt);
1143 if attempt < 2 {
1144 Err(TelegramError::Send {
1145 attempts: attempt,
1146 reason: "transient failure".to_string(),
1147 })
1148 } else {
1149 Ok(99)
1150 }
1151 },
1152 |delay| {
1153 delays_clone.lock().unwrap().push(delay);
1154 },
1155 );
1156
1157 assert!(result.is_ok());
1158 assert_eq!(result.unwrap(), 99);
1159 assert_eq!(*attempts.lock().unwrap(), vec![1, 2]);
1160 assert_eq!(*delays.lock().unwrap(), vec![Duration::from_secs(1)]);
1162 }
1163
1164 #[test]
1165 fn retry_with_backoff_succeeds_on_third_attempt() {
1166 let attempts = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
1167 let attempts_clone = attempts.clone();
1168 let delays = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
1169 let delays_clone = delays.clone();
1170
1171 let result = retry_with_backoff(
1172 |attempt| {
1173 attempts_clone.lock().unwrap().push(attempt);
1174 if attempt < 3 {
1175 Err(TelegramError::Send {
1176 attempts: attempt,
1177 reason: "transient failure".to_string(),
1178 })
1179 } else {
1180 Ok(7)
1181 }
1182 },
1183 |delay| {
1184 delays_clone.lock().unwrap().push(delay);
1185 },
1186 );
1187
1188 assert!(result.is_ok());
1189 assert_eq!(result.unwrap(), 7);
1190 assert_eq!(*attempts.lock().unwrap(), vec![1, 2, 3]);
1191 assert_eq!(
1193 *delays.lock().unwrap(),
1194 vec![Duration::from_secs(1), Duration::from_secs(2)]
1195 );
1196 }
1197
1198 #[test]
1199 fn retry_with_backoff_fails_after_all_retries() {
1200 let attempts = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
1201 let attempts_clone = attempts.clone();
1202 let delays = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
1203 let delays_clone = delays.clone();
1204
1205 let result = retry_with_backoff(
1206 |attempt| {
1207 attempts_clone.lock().unwrap().push(attempt);
1208 Err(TelegramError::Send {
1209 attempts: attempt,
1210 reason: format!("failure on attempt {}", attempt),
1211 })
1212 },
1213 |delay| {
1214 delays_clone.lock().unwrap().push(delay);
1215 },
1216 );
1217
1218 assert!(result.is_err());
1219 let err = result.unwrap_err();
1220 assert!(matches!(
1221 err,
1222 TelegramError::Send {
1223 attempts: 3,
1224 reason: _
1225 }
1226 ));
1227 if let TelegramError::Send { reason, .. } = &err {
1229 assert!(reason.contains("failure on attempt 3"));
1230 }
1231 assert_eq!(*attempts.lock().unwrap(), vec![1, 2, 3]);
1232 assert_eq!(
1234 *delays.lock().unwrap(),
1235 vec![Duration::from_secs(1), Duration::from_secs(2)]
1236 );
1237 }
1238
1239 #[test]
1240 fn retry_with_backoff_exponential_delays_are_correct() {
1241 let delays = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
1242 let delays_clone = delays.clone();
1243
1244 let _ = retry_with_backoff(
1245 |_attempt| {
1246 Err(TelegramError::Send {
1247 attempts: 1,
1248 reason: "always fail".to_string(),
1249 })
1250 },
1251 |delay| {
1252 delays_clone.lock().unwrap().push(delay);
1253 },
1254 );
1255
1256 let recorded = delays.lock().unwrap().clone();
1257 assert_eq!(recorded.len(), 2);
1259 assert_eq!(recorded[0], Duration::from_secs(1));
1260 assert_eq!(recorded[1], Duration::from_secs(2));
1261 }
1262
1263 #[test]
1264 fn checkin_context_default() {
1265 let ctx = CheckinContext::default();
1266 assert!(ctx.current_hat.is_none());
1267 assert_eq!(ctx.open_tasks, 0);
1268 assert_eq!(ctx.closed_tasks, 0);
1269 assert!(ctx.cumulative_cost.abs() < f64::EPSILON);
1270 }
1271
1272 #[test]
1273 fn checkin_context_with_hat_and_tasks() {
1274 let ctx = CheckinContext {
1275 current_hat: Some("executor".to_string()),
1276 open_tasks: 3,
1277 closed_tasks: 5,
1278 cumulative_cost: 1.2345,
1279 };
1280 assert_eq!(ctx.current_hat.as_deref(), Some("executor"));
1281 assert_eq!(ctx.open_tasks, 3);
1282 assert_eq!(ctx.closed_tasks, 5);
1283 assert!((ctx.cumulative_cost - 1.2345).abs() < f64::EPSILON);
1284 }
1285
1286 #[test]
1287 fn wait_for_response_returns_none_on_shutdown() {
1288 let dir = TempDir::new().unwrap();
1289 let service = TelegramService::new(
1290 dir.path().to_path_buf(),
1291 Some("token".to_string()),
1292 None,
1293 60, "main".to_string(),
1295 )
1296 .unwrap();
1297
1298 let events_path = dir.path().join("events.jsonl");
1299 std::fs::File::create(&events_path).unwrap();
1300
1301 service.shutdown_flag().store(true, Ordering::Relaxed);
1303
1304 let start = Instant::now();
1305 let result = service.wait_for_response(&events_path).unwrap();
1306 let elapsed = start.elapsed();
1307
1308 assert_eq!(result, None, "should return None when shutdown flag is set");
1309 assert!(
1310 elapsed < Duration::from_secs(2),
1311 "should return quickly, not wait for timeout (elapsed: {:?})",
1312 elapsed
1313 );
1314 }
1315}