1use std::fmt;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::time::{Duration, Instant};
6
7use chrono::Utc;
8use tracing::{debug, info, warn};
9
10use crate::bot::TelegramBot;
11use crate::error::{TelegramError, TelegramResult};
12use crate::handler::MessageHandler;
13use crate::state::StateManager;
14
15pub const MAX_SEND_RETRIES: u32 = 3;
17
18pub const BASE_RETRY_DELAY: Duration = Duration::from_secs(1);
20
21pub fn retry_with_backoff<F, S>(mut send_fn: F, mut sleep_fn: S) -> TelegramResult<i32>
29where
30 F: FnMut(u32) -> TelegramResult<i32>,
31 S: FnMut(Duration),
32{
33 let mut last_error = String::new();
34
35 for attempt in 1..=MAX_SEND_RETRIES {
36 match send_fn(attempt) {
37 Ok(msg_id) => return Ok(msg_id),
38 Err(e) => {
39 last_error = e.to_string();
40 warn!(
41 attempt = attempt,
42 max_retries = MAX_SEND_RETRIES,
43 error = %last_error,
44 "Telegram send failed, {}",
45 if attempt < MAX_SEND_RETRIES {
46 "retrying with backoff"
47 } else {
48 "all retries exhausted"
49 }
50 );
51 if attempt < MAX_SEND_RETRIES {
52 let delay = BASE_RETRY_DELAY * 2u32.pow(attempt - 1);
53 sleep_fn(delay);
54 }
55 }
56 }
57 }
58
59 Err(TelegramError::Send {
60 attempts: MAX_SEND_RETRIES,
61 reason: last_error,
62 })
63}
64
65#[derive(Debug, Default)]
70pub struct CheckinContext {
71 pub current_hat: Option<String>,
73 pub open_tasks: usize,
75 pub closed_tasks: usize,
77 pub cumulative_cost: f64,
79}
80
81pub struct TelegramService {
86 workspace_root: PathBuf,
87 bot_token: String,
88 timeout_secs: u64,
89 loop_id: String,
90 state_manager: StateManager,
91 handler: MessageHandler,
92 bot: TelegramBot,
93 shutdown: Arc<AtomicBool>,
94}
95
96impl TelegramService {
97 pub fn new(
101 workspace_root: PathBuf,
102 bot_token: Option<String>,
103 timeout_secs: u64,
104 loop_id: String,
105 ) -> TelegramResult<Self> {
106 let resolved_token = bot_token
107 .or_else(|| std::env::var("RALPH_TELEGRAM_BOT_TOKEN").ok())
108 .ok_or(TelegramError::MissingBotToken)?;
109
110 let state_path = workspace_root.join(".ralph/telegram-state.json");
111 let state_manager = StateManager::new(&state_path);
112 let handler_state_manager = StateManager::new(&state_path);
113 let handler = MessageHandler::new(handler_state_manager, &workspace_root);
114 let bot = TelegramBot::new(&resolved_token);
115 let shutdown = Arc::new(AtomicBool::new(false));
116
117 Ok(Self {
118 workspace_root,
119 bot_token: resolved_token,
120 timeout_secs,
121 loop_id,
122 state_manager,
123 handler,
124 bot,
125 shutdown,
126 })
127 }
128
129 pub fn workspace_root(&self) -> &PathBuf {
131 &self.workspace_root
132 }
133
134 pub fn timeout_secs(&self) -> u64 {
136 self.timeout_secs
137 }
138
139 pub fn bot_token_masked(&self) -> String {
141 if self.bot_token.len() > 8 {
142 format!(
143 "{}...{}",
144 &self.bot_token[..4],
145 &self.bot_token[self.bot_token.len() - 4..]
146 )
147 } else {
148 "****".to_string()
149 }
150 }
151
152 pub fn state_manager(&self) -> &StateManager {
154 &self.state_manager
155 }
156
157 pub fn handler(&mut self) -> &mut MessageHandler {
159 &mut self.handler
160 }
161
162 pub fn loop_id(&self) -> &str {
164 &self.loop_id
165 }
166
167 pub fn shutdown_flag(&self) -> Arc<AtomicBool> {
172 self.shutdown.clone()
173 }
174
175 pub fn start(&self) -> TelegramResult<()> {
180 info!(
181 bot_token = %self.bot_token_masked(),
182 workspace = %self.workspace_root.display(),
183 timeout_secs = self.timeout_secs,
184 "Telegram service starting"
185 );
186
187 let handle = tokio::runtime::Handle::try_current().map_err(|_| {
189 TelegramError::Startup("no tokio runtime available for polling".to_string())
190 })?;
191
192 let raw_bot = teloxide::Bot::new(&self.bot_token);
193 let workspace_root = self.workspace_root.clone();
194 let state_path = self.workspace_root.join(".ralph/telegram-state.json");
195 let shutdown = self.shutdown.clone();
196 let loop_id = self.loop_id.clone();
197
198 handle.spawn(async move {
199 Self::poll_updates(raw_bot, workspace_root, state_path, shutdown, loop_id).await;
200 });
201
202 if let Ok(state) = self.state_manager.load_or_default()
204 && let Some(chat_id) = state.chat_id
205 {
206 let greeting = crate::bot::TelegramBot::format_greeting(&self.loop_id);
207 match self.send_with_retry(chat_id, &greeting) {
208 Ok(_) => info!("Sent greeting to chat {}", chat_id),
209 Err(e) => warn!(error = %e, "Failed to send greeting"),
210 }
211 }
212
213 info!("Telegram service started — polling for incoming messages");
214 Ok(())
215 }
216
217 async fn poll_updates(
222 bot: teloxide::Bot,
223 workspace_root: PathBuf,
224 state_path: PathBuf,
225 shutdown: Arc<AtomicBool>,
226 loop_id: String,
227 ) {
228 use teloxide::payloads::{GetUpdatesSetters, SetMessageReactionSetters};
229 use teloxide::requests::Requester;
230
231 let state_manager = StateManager::new(&state_path);
232 let handler_state_manager = StateManager::new(&state_path);
233 let handler = MessageHandler::new(handler_state_manager, &workspace_root);
234 let mut offset: i32 = 0;
235
236 if let Ok(state) = state_manager.load_or_default()
237 && let Some(last_update_id) = state.last_update_id
238 {
239 offset = last_update_id + 1;
240 }
241
242 Self::register_commands(&bot).await;
244
245 info!(loop_id = %loop_id, "Telegram polling task started");
246
247 while !shutdown.load(Ordering::Relaxed) {
248 let request = bot.get_updates().offset(offset).timeout(10);
249 match request.await {
250 Ok(updates) => {
251 for update in updates {
252 #[allow(clippy::cast_possible_wrap)]
254 {
255 offset = update.id.0 as i32 + 1;
256 }
257
258 let msg = match update.kind {
260 teloxide::types::UpdateKind::Message(msg) => msg,
261 _ => continue,
262 };
263
264 let text = match msg.text() {
265 Some(t) => t,
266 None => continue,
267 };
268
269 let chat_id = msg.chat.id.0;
270 let reply_to: Option<i32> = msg.reply_to_message().map(|r| r.id.0);
271
272 info!(
273 chat_id = chat_id,
274 text = %text,
275 "Received Telegram message"
276 );
277
278 if crate::commands::is_command(text)
280 && let Some(response) =
281 crate::commands::handle_command(text, &workspace_root)
282 {
283 use teloxide::payloads::SendMessageSetters;
284 let send_result = bot
285 .send_message(teloxide::types::ChatId(chat_id), &response)
286 .parse_mode(teloxide::types::ParseMode::Html)
287 .await;
288 if let Err(e) = send_result {
289 warn!(error = %e, "Failed to send command response");
290 }
291 continue;
292 }
293
294 let mut state = match state_manager.load_or_default() {
295 Ok(s) => s,
296 Err(e) => {
297 warn!(error = %e, "Failed to load Telegram state");
298 continue;
299 }
300 };
301
302 match handler.handle_message(&mut state, text, chat_id, reply_to) {
303 Ok(topic) => {
304 let emoji = if topic == "human.response" {
305 "👍"
306 } else {
307 "👀"
308 };
309 let react_result = bot
310 .set_message_reaction(teloxide::types::ChatId(chat_id), msg.id)
311 .reaction(vec![teloxide::types::ReactionType::Emoji {
312 emoji: emoji.to_string(),
313 }])
314 .await;
315 if let Err(e) = react_result {
316 warn!(error = %e, "Failed to react to message");
317 }
318
319 if topic == "human.guidance" {
321 let _ = bot
322 .send_message(
323 teloxide::types::ChatId(chat_id),
324 "📝 <b>Guidance received</b> — will apply next iteration.",
325 )
326 .await;
327 }
328 }
329 Err(e) => {
330 warn!(
331 error = %e,
332 text = %text,
333 "Failed to handle incoming Telegram message"
334 );
335 }
336 }
337
338 state.last_seen = Some(Utc::now());
339 state.last_update_id = Some(offset.saturating_sub(1));
340 if let Err(e) = state_manager.save(&state) {
341 warn!(error = %e, "Failed to persist Telegram state");
342 }
343 }
344 }
345 Err(e) => {
346 if !shutdown.load(Ordering::Relaxed) {
347 warn!(error = %e, "Telegram polling error — retrying in 5s");
348 tokio::time::sleep(Duration::from_secs(5)).await;
349 }
350 }
351 }
352 }
353
354 info!(loop_id = %loop_id, "Telegram polling task stopped");
355 }
356
357 async fn register_commands(bot: &teloxide::Bot) {
359 use teloxide::requests::Requester;
360 use teloxide::types::BotCommand;
361
362 let commands = vec![
363 BotCommand::new("status", "Current loop status"),
364 BotCommand::new("tasks", "Open tasks"),
365 BotCommand::new("memories", "Recent memories"),
366 BotCommand::new("tail", "Last 20 events"),
367 BotCommand::new("stop", "Stop the loop"),
368 BotCommand::new("help", "List available commands"),
369 ];
370
371 match bot.set_my_commands(commands).await {
372 Ok(_) => info!("Registered bot commands with Telegram API"),
373 Err(e) => warn!(error = %e, "Failed to register bot commands"),
374 }
375 }
376
377 pub fn stop(self) {
381 if let Ok(state) = self.state_manager.load_or_default()
383 && let Some(chat_id) = state.chat_id
384 {
385 let farewell = crate::bot::TelegramBot::format_farewell(&self.loop_id);
386 match self.send_with_retry(chat_id, &farewell) {
387 Ok(_) => info!("Sent farewell to chat {}", chat_id),
388 Err(e) => warn!(error = %e, "Failed to send farewell"),
389 }
390 }
391
392 self.shutdown.store(true, Ordering::Relaxed);
393 info!(
394 workspace = %self.workspace_root.display(),
395 "Telegram service stopped"
396 );
397 }
398
399 pub fn send_question(&self, payload: &str) -> TelegramResult<i32> {
409 let mut state = self.state_manager.load_or_default()?;
410
411 let message_id = if let Some(chat_id) = state.chat_id {
412 self.send_with_retry(chat_id, payload)?
413 } else {
414 warn!(
415 loop_id = %self.loop_id,
416 "No chat ID configured — human.interact question logged but not sent: {}",
417 payload
418 );
419 0
420 };
421
422 self.state_manager
423 .add_pending_question(&mut state, &self.loop_id, message_id)?;
424
425 debug!(
426 loop_id = %self.loop_id,
427 message_id = message_id,
428 "Stored pending question"
429 );
430
431 Ok(message_id)
432 }
433
434 pub fn send_checkin(
444 &self,
445 iteration: u32,
446 elapsed: Duration,
447 context: Option<&CheckinContext>,
448 ) -> TelegramResult<i32> {
449 let state = self.state_manager.load_or_default()?;
450 let Some(chat_id) = state.chat_id else {
451 debug!(
452 loop_id = %self.loop_id,
453 "No chat ID configured — skipping check-in"
454 );
455 return Ok(0);
456 };
457
458 let elapsed_secs = elapsed.as_secs();
459 let minutes = elapsed_secs / 60;
460 let seconds = elapsed_secs % 60;
461 let elapsed_str = if minutes > 0 {
462 format!("{}m {}s", minutes, seconds)
463 } else {
464 format!("{}s", seconds)
465 };
466
467 let msg = match context {
468 Some(ctx) => {
469 let mut lines = vec![format!(
470 "Still working — iteration <b>{}</b>, <code>{}</code> elapsed.",
471 iteration, elapsed_str
472 )];
473
474 if let Some(hat) = &ctx.current_hat {
475 lines.push(format!(
476 "Hat: <code>{}</code>",
477 crate::bot::escape_html(hat)
478 ));
479 }
480
481 if ctx.open_tasks > 0 || ctx.closed_tasks > 0 {
482 lines.push(format!(
483 "Tasks: <b>{}</b> open, {} closed",
484 ctx.open_tasks, ctx.closed_tasks
485 ));
486 }
487
488 if ctx.cumulative_cost > 0.0 {
489 lines.push(format!("Cost: <code>${:.4}</code>", ctx.cumulative_cost));
490 }
491
492 lines.join("\n")
493 }
494 None => format!(
495 "Still working — iteration <b>{}</b>, <code>{}</code> elapsed.",
496 iteration, elapsed_str
497 ),
498 };
499 self.send_with_retry(chat_id, &msg)
500 }
501
502 pub fn send_document(&self, file_path: &Path, caption: Option<&str>) -> TelegramResult<i32> {
507 let state = self.state_manager.load_or_default()?;
508 let Some(chat_id) = state.chat_id else {
509 warn!(
510 loop_id = %self.loop_id,
511 file = %file_path.display(),
512 "No chat ID configured — document not sent"
513 );
514 return Ok(0);
515 };
516
517 self.send_document_with_retry(chat_id, file_path, caption)
518 }
519
520 pub fn send_photo(&self, file_path: &Path, caption: Option<&str>) -> TelegramResult<i32> {
525 let state = self.state_manager.load_or_default()?;
526 let Some(chat_id) = state.chat_id else {
527 warn!(
528 loop_id = %self.loop_id,
529 file = %file_path.display(),
530 "No chat ID configured — photo not sent"
531 );
532 return Ok(0);
533 };
534
535 self.send_photo_with_retry(chat_id, file_path, caption)
536 }
537
538 fn send_with_retry(&self, chat_id: i64, payload: &str) -> TelegramResult<i32> {
543 use crate::bot::BotApi;
544
545 let handle = tokio::runtime::Handle::try_current().map_err(|_| TelegramError::Send {
546 attempts: 0,
547 reason: "no tokio runtime available for sending".to_string(),
548 })?;
549
550 retry_with_backoff(
551 |_attempt| {
552 tokio::task::block_in_place(|| {
553 handle.block_on(self.bot.send_message(chat_id, payload))
554 })
555 },
556 |delay| std::thread::sleep(delay),
557 )
558 }
559
560 fn send_document_with_retry(
562 &self,
563 chat_id: i64,
564 file_path: &Path,
565 caption: Option<&str>,
566 ) -> TelegramResult<i32> {
567 use crate::bot::BotApi;
568
569 let handle = tokio::runtime::Handle::try_current().map_err(|_| TelegramError::Send {
570 attempts: 0,
571 reason: "no tokio runtime available for sending".to_string(),
572 })?;
573
574 retry_with_backoff(
575 |_attempt| {
576 tokio::task::block_in_place(|| {
577 handle.block_on(self.bot.send_document(chat_id, file_path, caption))
578 })
579 },
580 |delay| std::thread::sleep(delay),
581 )
582 }
583
584 fn send_photo_with_retry(
586 &self,
587 chat_id: i64,
588 file_path: &Path,
589 caption: Option<&str>,
590 ) -> TelegramResult<i32> {
591 use crate::bot::BotApi;
592
593 let handle = tokio::runtime::Handle::try_current().map_err(|_| TelegramError::Send {
594 attempts: 0,
595 reason: "no tokio runtime available for sending".to_string(),
596 })?;
597
598 retry_with_backoff(
599 |_attempt| {
600 tokio::task::block_in_place(|| {
601 handle.block_on(self.bot.send_photo(chat_id, file_path, caption))
602 })
603 },
604 |delay| std::thread::sleep(delay),
605 )
606 }
607
608 pub fn wait_for_response(&self, events_path: &Path) -> TelegramResult<Option<String>> {
616 let timeout = Duration::from_secs(self.timeout_secs);
617 let poll_interval = Duration::from_millis(250);
618 let deadline = Instant::now() + timeout;
619
620 let initial_pos = if events_path.exists() {
622 std::fs::metadata(events_path).map(|m| m.len()).unwrap_or(0)
623 } else {
624 0
625 };
626 let mut file_pos = initial_pos;
627
628 info!(
629 loop_id = %self.loop_id,
630 timeout_secs = self.timeout_secs,
631 events_path = %events_path.display(),
632 "Waiting for human.response"
633 );
634
635 loop {
636 if Instant::now() >= deadline {
637 warn!(
638 loop_id = %self.loop_id,
639 timeout_secs = self.timeout_secs,
640 "Timed out waiting for human.response"
641 );
642
643 if let Ok(mut state) = self.state_manager.load_or_default() {
645 let _ = self
646 .state_manager
647 .remove_pending_question(&mut state, &self.loop_id);
648 }
649
650 return Ok(None);
651 }
652
653 if self.shutdown.load(Ordering::Relaxed) {
655 info!(loop_id = %self.loop_id, "Interrupted while waiting for human.response");
656 if let Ok(mut state) = self.state_manager.load_or_default() {
657 let _ = self
658 .state_manager
659 .remove_pending_question(&mut state, &self.loop_id);
660 }
661 return Ok(None);
662 }
663
664 if let Some(response) = Self::check_for_response(events_path, &mut file_pos)? {
666 info!(
667 loop_id = %self.loop_id,
668 "Received human.response: {}",
669 response
670 );
671
672 if let Ok(mut state) = self.state_manager.load_or_default() {
674 let _ = self
675 .state_manager
676 .remove_pending_question(&mut state, &self.loop_id);
677 }
678
679 return Ok(Some(response));
680 }
681
682 std::thread::sleep(poll_interval);
683 }
684 }
685
686 fn check_for_response(
689 events_path: &Path,
690 file_pos: &mut u64,
691 ) -> TelegramResult<Option<String>> {
692 use std::io::{BufRead, BufReader, Seek, SeekFrom};
693
694 if !events_path.exists() {
695 return Ok(None);
696 }
697
698 let mut file = std::fs::File::open(events_path)?;
699 file.seek(SeekFrom::Start(*file_pos))?;
700
701 let reader = BufReader::new(file);
702 for line in reader.lines() {
703 let line = line?;
704 let line_bytes = line.len() as u64 + 1; *file_pos += line_bytes;
706
707 if line.trim().is_empty() {
708 continue;
709 }
710
711 if let Ok(event) = serde_json::from_str::<serde_json::Value>(&line)
713 && event.get("topic").and_then(|t| t.as_str()) == Some("human.response")
714 {
715 let message = event
716 .get("payload")
717 .and_then(|p| p.as_str())
718 .unwrap_or("")
719 .to_string();
720 return Ok(Some(message));
721 }
722
723 if line.contains("EVENT: human.response") {
725 let message = line
728 .split('|')
729 .find(|part| part.trim().starts_with("message:"))
730 .and_then(|part| {
731 let value = part.trim().strip_prefix("message:")?;
732 let trimmed = value.trim().trim_matches('"');
733 Some(trimmed.to_string())
734 })
735 .unwrap_or_default();
736 return Ok(Some(message));
737 }
738 }
739
740 Ok(None)
741 }
742}
743
744impl ralph_proto::RobotService for TelegramService {
745 fn send_question(&self, payload: &str) -> anyhow::Result<i32> {
746 Ok(TelegramService::send_question(self, payload)?)
747 }
748
749 fn wait_for_response(&self, events_path: &Path) -> anyhow::Result<Option<String>> {
750 Ok(TelegramService::wait_for_response(self, events_path)?)
751 }
752
753 fn send_checkin(
754 &self,
755 iteration: u32,
756 elapsed: Duration,
757 context: Option<&ralph_proto::CheckinContext>,
758 ) -> anyhow::Result<i32> {
759 let local_context = context.map(|ctx| CheckinContext {
761 current_hat: ctx.current_hat.clone(),
762 open_tasks: ctx.open_tasks,
763 closed_tasks: ctx.closed_tasks,
764 cumulative_cost: ctx.cumulative_cost,
765 });
766 Ok(TelegramService::send_checkin(
767 self,
768 iteration,
769 elapsed,
770 local_context.as_ref(),
771 )?)
772 }
773
774 fn timeout_secs(&self) -> u64 {
775 self.timeout_secs
776 }
777
778 fn shutdown_flag(&self) -> Arc<AtomicBool> {
779 self.shutdown.clone()
780 }
781
782 fn stop(self: Box<Self>) {
783 TelegramService::stop(*self);
784 }
785}
786
787impl fmt::Debug for TelegramService {
788 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
789 f.debug_struct("TelegramService")
790 .field("workspace_root", &self.workspace_root)
791 .field("bot_token", &self.bot_token_masked())
792 .field("timeout_secs", &self.timeout_secs)
793 .finish_non_exhaustive()
794 }
795}
796
797#[cfg(test)]
798mod tests {
799 use super::*;
800 use std::io::Write;
801 use tempfile::TempDir;
802
803 fn test_service(dir: &TempDir) -> TelegramService {
804 TelegramService::new(
805 dir.path().to_path_buf(),
806 Some("test-token-12345".to_string()),
807 300,
808 "main".to_string(),
809 )
810 .unwrap()
811 }
812
813 #[test]
814 fn new_with_explicit_token() {
815 let dir = TempDir::new().unwrap();
816 let service = TelegramService::new(
817 dir.path().to_path_buf(),
818 Some("test-token-12345".to_string()),
819 300,
820 "main".to_string(),
821 );
822 assert!(service.is_ok());
823 }
824
825 #[test]
826 fn new_without_token_fails() {
827 if std::env::var("RALPH_TELEGRAM_BOT_TOKEN").is_ok() {
830 return;
831 }
832
833 let dir = TempDir::new().unwrap();
834 let service = TelegramService::new(dir.path().to_path_buf(), None, 300, "main".to_string());
835 assert!(service.is_err());
836 assert!(matches!(
837 service.unwrap_err(),
838 TelegramError::MissingBotToken
839 ));
840 }
841
842 #[test]
843 fn bot_token_masked_works() {
844 let dir = TempDir::new().unwrap();
845 let service = TelegramService::new(
846 dir.path().to_path_buf(),
847 Some("abcd1234efgh5678".to_string()),
848 300,
849 "main".to_string(),
850 )
851 .unwrap();
852 let masked = service.bot_token_masked();
853 assert_eq!(masked, "abcd...5678");
854 }
855
856 #[test]
857 fn loop_id_accessor() {
858 let dir = TempDir::new().unwrap();
859 let service = TelegramService::new(
860 dir.path().to_path_buf(),
861 Some("token".to_string()),
862 60,
863 "feature-auth".to_string(),
864 )
865 .unwrap();
866 assert_eq!(service.loop_id(), "feature-auth");
867 }
868
869 #[test]
870 fn send_question_stores_pending_question() {
871 let dir = TempDir::new().unwrap();
872 let service = test_service(&dir);
873
874 service.send_question("Which DB to use?").unwrap();
875
876 let state = service.state_manager().load_or_default().unwrap();
878 assert!(
879 state.pending_questions.contains_key("main"),
880 "pending question should be stored for loop_id 'main'"
881 );
882 }
883
884 #[test]
885 fn send_question_returns_message_id() {
886 let dir = TempDir::new().unwrap();
887 let service = test_service(&dir);
888
889 let msg_id = service.send_question("async or sync?").unwrap();
890 assert_eq!(msg_id, 0);
892 }
893
894 #[test]
895 fn check_for_response_json_format() {
896 let dir = TempDir::new().unwrap();
897 let events_path = dir.path().join("events.jsonl");
898
899 let mut file = std::fs::File::create(&events_path).unwrap();
901 writeln!(
902 file,
903 r#"{{"topic":"build.done","payload":"tests: pass, lint: pass, typecheck: pass, audit: pass, coverage: pass","ts":"2026-01-30T00:00:00Z"}}"#
904 )
905 .unwrap();
906 writeln!(
908 file,
909 r#"{{"topic":"human.response","payload":"Use async","ts":"2026-01-30T00:01:00Z"}}"#
910 )
911 .unwrap();
912 file.flush().unwrap();
913
914 let mut pos = 0;
915 let result = TelegramService::check_for_response(&events_path, &mut pos).unwrap();
916 assert_eq!(result, Some("Use async".to_string()));
917 }
918
919 #[test]
920 fn check_for_response_pipe_format() {
921 let dir = TempDir::new().unwrap();
922 let events_path = dir.path().join("events.jsonl");
923
924 let mut file = std::fs::File::create(&events_path).unwrap();
925 writeln!(
926 file,
927 r#"EVENT: human.response | message: "Use sync" | timestamp: "2026-01-30T00:01:00Z""#
928 )
929 .unwrap();
930 file.flush().unwrap();
931
932 let mut pos = 0;
933 let result = TelegramService::check_for_response(&events_path, &mut pos).unwrap();
934 assert_eq!(result, Some("Use sync".to_string()));
935 }
936
937 #[test]
938 fn check_for_response_skips_non_response_events() {
939 let dir = TempDir::new().unwrap();
940 let events_path = dir.path().join("events.jsonl");
941
942 let mut file = std::fs::File::create(&events_path).unwrap();
943 writeln!(
944 file,
945 r#"{{"topic":"build.done","payload":"done","ts":"2026-01-30T00:00:00Z"}}"#
946 )
947 .unwrap();
948 writeln!(
949 file,
950 r#"{{"topic":"human.guidance","payload":"check errors","ts":"2026-01-30T00:01:00Z"}}"#
951 )
952 .unwrap();
953 file.flush().unwrap();
954
955 let mut pos = 0;
956 let result = TelegramService::check_for_response(&events_path, &mut pos).unwrap();
957 assert_eq!(result, None);
958 }
959
960 #[test]
961 fn check_for_response_missing_file() {
962 let dir = TempDir::new().unwrap();
963 let events_path = dir.path().join("does-not-exist.jsonl");
964
965 let mut pos = 0;
966 let result = TelegramService::check_for_response(&events_path, &mut pos).unwrap();
967 assert_eq!(result, None);
968 }
969
970 #[test]
971 fn check_for_response_tracks_position() {
972 let dir = TempDir::new().unwrap();
973 let events_path = dir.path().join("events.jsonl");
974
975 let mut file = std::fs::OpenOptions::new()
977 .create(true)
978 .truncate(true)
979 .write(true)
980 .open(&events_path)
981 .unwrap();
982 writeln!(
983 file,
984 r#"{{"topic":"build.done","payload":"done","ts":"2026-01-30T00:00:00Z"}}"#
985 )
986 .unwrap();
987 file.flush().unwrap();
988
989 let mut pos = 0;
990 let result = TelegramService::check_for_response(&events_path, &mut pos).unwrap();
991 assert_eq!(result, None);
992 assert!(pos > 0, "position should advance after reading");
993
994 let pos_after_first = pos;
995
996 let mut file = std::fs::OpenOptions::new()
998 .append(true)
999 .open(&events_path)
1000 .unwrap();
1001 writeln!(
1002 file,
1003 r#"{{"topic":"human.response","payload":"yes","ts":"2026-01-30T00:02:00Z"}}"#
1004 )
1005 .unwrap();
1006 file.flush().unwrap();
1007
1008 let result = TelegramService::check_for_response(&events_path, &mut pos).unwrap();
1010 assert_eq!(result, Some("yes".to_string()));
1011 assert!(pos > pos_after_first, "position should advance further");
1012 }
1013
1014 #[test]
1015 fn wait_for_response_returns_on_response() {
1016 let dir = TempDir::new().unwrap();
1017 let service = TelegramService::new(
1018 dir.path().to_path_buf(),
1019 Some("token".to_string()),
1020 5, "main".to_string(),
1022 )
1023 .unwrap();
1024
1025 let events_path = dir.path().join("events.jsonl");
1026 std::fs::File::create(&events_path).unwrap();
1028
1029 service.send_question("Which plan?").unwrap();
1031
1032 let writer_path = events_path.clone();
1034 let writer = std::thread::spawn(move || {
1035 std::thread::sleep(Duration::from_millis(200));
1036 let mut file = std::fs::OpenOptions::new()
1037 .append(true)
1038 .open(&writer_path)
1039 .unwrap();
1040 writeln!(
1041 file,
1042 r#"{{"topic":"human.response","payload":"Go with plan A","ts":"2026-01-30T00:00:00Z"}}"#
1043 )
1044 .unwrap();
1045 file.flush().unwrap();
1046 });
1047
1048 let result = service.wait_for_response(&events_path).unwrap();
1049 writer.join().unwrap();
1050
1051 assert_eq!(result, Some("Go with plan A".to_string()));
1052
1053 let state = service.state_manager().load_or_default().unwrap();
1055 assert!(
1056 !state.pending_questions.contains_key("main"),
1057 "pending question should be removed after response"
1058 );
1059 }
1060
1061 #[test]
1062 fn wait_for_response_returns_none_on_timeout() {
1063 let dir = TempDir::new().unwrap();
1064 let service = TelegramService::new(
1065 dir.path().to_path_buf(),
1066 Some("token".to_string()),
1067 1, "main".to_string(),
1069 )
1070 .unwrap();
1071
1072 let events_path = dir.path().join("events.jsonl");
1073 std::fs::File::create(&events_path).unwrap();
1075
1076 service.send_question("Will this timeout?").unwrap();
1078
1079 let result = service.wait_for_response(&events_path).unwrap();
1080 assert_eq!(result, None, "should return None on timeout");
1081
1082 let state = service.state_manager().load_or_default().unwrap();
1084 assert!(
1085 !state.pending_questions.contains_key("main"),
1086 "pending question should be removed on timeout"
1087 );
1088 }
1089
1090 #[test]
1091 fn retry_with_backoff_succeeds_on_first_attempt() {
1092 let attempts = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
1093 let attempts_clone = attempts.clone();
1094
1095 let result = retry_with_backoff(
1096 |attempt| {
1097 attempts_clone.lock().unwrap().push(attempt);
1098 Ok(42)
1099 },
1100 |_delay| {},
1101 );
1102
1103 assert!(result.is_ok());
1104 assert_eq!(result.unwrap(), 42);
1105 assert_eq!(*attempts.lock().unwrap(), vec![1]);
1106 }
1107
1108 #[test]
1109 fn retry_with_backoff_succeeds_on_second_attempt() {
1110 let attempts = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
1111 let attempts_clone = attempts.clone();
1112 let delays = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
1113 let delays_clone = delays.clone();
1114
1115 let result = retry_with_backoff(
1116 |attempt| {
1117 attempts_clone.lock().unwrap().push(attempt);
1118 if attempt < 2 {
1119 Err(TelegramError::Send {
1120 attempts: attempt,
1121 reason: "transient failure".to_string(),
1122 })
1123 } else {
1124 Ok(99)
1125 }
1126 },
1127 |delay| {
1128 delays_clone.lock().unwrap().push(delay);
1129 },
1130 );
1131
1132 assert!(result.is_ok());
1133 assert_eq!(result.unwrap(), 99);
1134 assert_eq!(*attempts.lock().unwrap(), vec![1, 2]);
1135 assert_eq!(*delays.lock().unwrap(), vec![Duration::from_secs(1)]);
1137 }
1138
1139 #[test]
1140 fn retry_with_backoff_succeeds_on_third_attempt() {
1141 let attempts = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
1142 let attempts_clone = attempts.clone();
1143 let delays = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
1144 let delays_clone = delays.clone();
1145
1146 let result = retry_with_backoff(
1147 |attempt| {
1148 attempts_clone.lock().unwrap().push(attempt);
1149 if attempt < 3 {
1150 Err(TelegramError::Send {
1151 attempts: attempt,
1152 reason: "transient failure".to_string(),
1153 })
1154 } else {
1155 Ok(7)
1156 }
1157 },
1158 |delay| {
1159 delays_clone.lock().unwrap().push(delay);
1160 },
1161 );
1162
1163 assert!(result.is_ok());
1164 assert_eq!(result.unwrap(), 7);
1165 assert_eq!(*attempts.lock().unwrap(), vec![1, 2, 3]);
1166 assert_eq!(
1168 *delays.lock().unwrap(),
1169 vec![Duration::from_secs(1), Duration::from_secs(2)]
1170 );
1171 }
1172
1173 #[test]
1174 fn retry_with_backoff_fails_after_all_retries() {
1175 let attempts = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
1176 let attempts_clone = attempts.clone();
1177 let delays = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
1178 let delays_clone = delays.clone();
1179
1180 let result = retry_with_backoff(
1181 |attempt| {
1182 attempts_clone.lock().unwrap().push(attempt);
1183 Err(TelegramError::Send {
1184 attempts: attempt,
1185 reason: format!("failure on attempt {}", attempt),
1186 })
1187 },
1188 |delay| {
1189 delays_clone.lock().unwrap().push(delay);
1190 },
1191 );
1192
1193 assert!(result.is_err());
1194 let err = result.unwrap_err();
1195 assert!(matches!(
1196 err,
1197 TelegramError::Send {
1198 attempts: 3,
1199 reason: _
1200 }
1201 ));
1202 if let TelegramError::Send { reason, .. } = &err {
1204 assert!(reason.contains("failure on attempt 3"));
1205 }
1206 assert_eq!(*attempts.lock().unwrap(), vec![1, 2, 3]);
1207 assert_eq!(
1209 *delays.lock().unwrap(),
1210 vec![Duration::from_secs(1), Duration::from_secs(2)]
1211 );
1212 }
1213
1214 #[test]
1215 fn retry_with_backoff_exponential_delays_are_correct() {
1216 let delays = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
1217 let delays_clone = delays.clone();
1218
1219 let _ = retry_with_backoff(
1220 |_attempt| {
1221 Err(TelegramError::Send {
1222 attempts: 1,
1223 reason: "always fail".to_string(),
1224 })
1225 },
1226 |delay| {
1227 delays_clone.lock().unwrap().push(delay);
1228 },
1229 );
1230
1231 let recorded = delays.lock().unwrap().clone();
1232 assert_eq!(recorded.len(), 2);
1234 assert_eq!(recorded[0], Duration::from_secs(1));
1235 assert_eq!(recorded[1], Duration::from_secs(2));
1236 }
1237
1238 #[test]
1239 fn checkin_context_default() {
1240 let ctx = CheckinContext::default();
1241 assert!(ctx.current_hat.is_none());
1242 assert_eq!(ctx.open_tasks, 0);
1243 assert_eq!(ctx.closed_tasks, 0);
1244 assert!(ctx.cumulative_cost.abs() < f64::EPSILON);
1245 }
1246
1247 #[test]
1248 fn checkin_context_with_hat_and_tasks() {
1249 let ctx = CheckinContext {
1250 current_hat: Some("executor".to_string()),
1251 open_tasks: 3,
1252 closed_tasks: 5,
1253 cumulative_cost: 1.2345,
1254 };
1255 assert_eq!(ctx.current_hat.as_deref(), Some("executor"));
1256 assert_eq!(ctx.open_tasks, 3);
1257 assert_eq!(ctx.closed_tasks, 5);
1258 assert!((ctx.cumulative_cost - 1.2345).abs() < f64::EPSILON);
1259 }
1260
1261 #[test]
1262 fn wait_for_response_returns_none_on_shutdown() {
1263 let dir = TempDir::new().unwrap();
1264 let service = TelegramService::new(
1265 dir.path().to_path_buf(),
1266 Some("token".to_string()),
1267 60, "main".to_string(),
1269 )
1270 .unwrap();
1271
1272 let events_path = dir.path().join("events.jsonl");
1273 std::fs::File::create(&events_path).unwrap();
1274
1275 service.shutdown_flag().store(true, Ordering::Relaxed);
1277
1278 let start = Instant::now();
1279 let result = service.wait_for_response(&events_path).unwrap();
1280 let elapsed = start.elapsed();
1281
1282 assert_eq!(result, None, "should return None when shutdown flag is set");
1283 assert!(
1284 elapsed < Duration::from_secs(2),
1285 "should return quickly, not wait for timeout (elapsed: {:?})",
1286 elapsed
1287 );
1288 }
1289}