1use std::path::PathBuf;
12use std::sync::Arc;
13use std::sync::atomic::{AtomicBool, Ordering};
14
15use async_trait::async_trait;
16use tracing::{error, info, warn};
17
18use ralph_proto::daemon::{DaemonAdapter, StartLoopFn};
19
20use crate::bot::{BotApi, TelegramBot, escape_html};
21use crate::loop_lock::{LockState, lock_path, lock_state};
22use crate::state::StateManager;
23
24async fn wait_for_shutdown(shutdown: Arc<AtomicBool>) {
25 while !shutdown.load(Ordering::Relaxed) {
26 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
27 }
28}
29
30pub struct TelegramDaemon {
37 bot_token: String,
38 api_url: Option<String>,
39 chat_id: i64,
40}
41
42impl TelegramDaemon {
43 pub fn new(bot_token: String, api_url: Option<String>, chat_id: i64) -> Self {
49 Self {
50 bot_token,
51 api_url,
52 chat_id,
53 }
54 }
55}
56
57#[async_trait]
58impl DaemonAdapter for TelegramDaemon {
59 async fn run_daemon(
60 &self,
61 workspace_root: PathBuf,
62 start_loop: StartLoopFn,
63 ) -> anyhow::Result<()> {
64 let bot = TelegramBot::new(&self.bot_token, self.api_url.as_deref());
65 let chat_id = self.chat_id;
66
67 let state_manager = StateManager::new(workspace_root.join(".ralph/telegram-state.json"));
68
69 let _ = bot.send_message(chat_id, "Ralph daemon online 🤖").await;
71
72 let shutdown = Arc::new(AtomicBool::new(false));
74 {
75 let flag = shutdown.clone();
76 tokio::spawn(async move {
77 let _ = tokio::signal::ctrl_c().await;
78 flag.store(true, Ordering::Relaxed);
79 });
80 }
81 #[cfg(unix)]
82 {
83 let flag = shutdown.clone();
84 tokio::spawn(async move {
85 match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
86 Ok(mut sigterm) => {
87 sigterm.recv().await;
88 flag.store(true, Ordering::Relaxed);
89 }
90 Err(e) => {
91 error!(error = %e, "Failed to register SIGTERM handler");
92 flag.store(true, Ordering::Relaxed);
93 }
94 }
95 });
96 }
97
98 let mut offset: i32 = 0;
99
100 'daemon: while !shutdown.load(Ordering::Relaxed) {
102 let updates = match tokio::select! {
104 _ = wait_for_shutdown(shutdown.clone()) => {
105 break 'daemon;
106 }
107 updates = poll_updates(&self.bot_token, self.api_url.as_deref(), 30, offset) => updates,
108 } {
109 Ok(u) => u,
110 Err(e) => {
111 warn!(error = %e, "Telegram poll failed, retrying");
112 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
113 continue;
114 }
115 };
116
117 for update in updates {
118 offset = update.update_id + 1;
119
120 let text = match update.text.as_deref() {
121 Some(t) => t,
122 None => continue,
123 };
124
125 if let Ok(mut state) = state_manager.load_or_default() {
126 if state.chat_id.is_none() {
127 state.chat_id = Some(chat_id);
128 }
129 state.last_seen = Some(chrono::Utc::now());
130 state.last_update_id = Some(update.update_id);
131 if let Err(e) = state_manager.save(&state) {
132 warn!(error = %e, "Failed to persist Telegram state");
133 }
134 } else {
135 warn!("Failed to load Telegram state");
136 }
137
138 info!(text = %text, "Daemon received message");
139
140 if text.starts_with('/') {
142 let response = crate::commands::handle_command(text, &workspace_root)
143 .unwrap_or_else(|| {
144 "Unknown command. Use /help for the supported commands.".to_string()
145 });
146 let _ = bot.send_message(chat_id, &response).await;
147 continue;
148 }
149
150 let lock_path = lock_path(&workspace_root);
152 let state = match lock_state(&workspace_root) {
153 Ok(state) => state,
154 Err(e) => {
155 warn!(error = %e, "Failed to check loop lock state");
156 let _ = bot
157 .send_message(
158 chat_id,
159 "Failed to check loop state; try again in a moment.",
160 )
161 .await;
162 continue;
163 }
164 };
165 if state == LockState::Active {
166 let _ = bot
167 .send_message(
168 chat_id,
169 "A loop is already running — it will receive your messages directly.",
170 )
171 .await;
172 continue;
173 }
174
175 if state == LockState::Stale {
176 warn!(
177 lock_path = %lock_path.display(),
178 "Found stale loop lock; starting new loop"
179 );
180 }
181
182 let escaped = escape_html(text);
184 let ack = format!("Starting loop: <i>{}</i>", escaped);
185 let _ = bot.send_message(chat_id, &ack).await;
186
187 let prompt = text.to_string();
191 let mut loop_handle = tokio::spawn(start_loop(prompt));
192 let result = tokio::select! {
193 _ = wait_for_shutdown(shutdown.clone()) => {
194 loop_handle.abort();
195 let _ = loop_handle.await;
196 break 'daemon;
197 }
198 result = &mut loop_handle => result,
199 };
200
201 match result {
203 Ok(Ok(description)) => {
204 let notification =
205 format!("Loop complete ({}).", escape_html(&description));
206 let _ = bot.send_message(chat_id, ¬ification).await;
207 }
208 Ok(Err(e)) => {
209 let notification = format!("Loop failed: {}", escape_html(&e.to_string()));
210 let _ = bot.send_message(chat_id, ¬ification).await;
211 }
212 Err(e) => {
213 let notification = format!("Loop failed: {}", escape_html(&e.to_string()));
214 let _ = bot.send_message(chat_id, ¬ification).await;
215 }
216 }
217 }
218 }
219
220 let _ = bot.send_message(chat_id, "Ralph daemon offline 👋").await;
222
223 Ok(())
224 }
225}
226
227struct DaemonUpdate {
233 update_id: i32,
234 text: Option<String>,
235}
236
237async fn poll_updates(
242 token: &str,
243 api_url: Option<&str>,
244 timeout_secs: u64,
245 offset: i32,
246) -> anyhow::Result<Vec<DaemonUpdate>> {
247 use teloxide::payloads::GetUpdatesSetters;
248 use teloxide::requests::Requester;
249
250 let bot = crate::apply_api_url(teloxide::Bot::new(token), api_url);
251 let request = bot
252 .get_updates()
253 .offset(offset)
254 .timeout(timeout_secs as u32);
255
256 let updates = request
257 .await
258 .map_err(|e| anyhow::anyhow!("Telegram getUpdates failed: {}", e))?;
259
260 let mut results = Vec::new();
261 for update in updates {
262 #[allow(clippy::cast_possible_wrap)]
263 let id = update.id.0 as i32;
264
265 let text = match update.kind {
266 teloxide::types::UpdateKind::Message(ref msg) => msg.text().map(String::from),
267 _ => None,
268 };
269
270 results.push(DaemonUpdate {
271 update_id: id,
272 text,
273 });
274 }
275
276 Ok(results)
277}
278
279#[cfg(test)]
280mod tests {
281 use super::*;
282
283 #[test]
284 fn test_telegram_daemon_creation() {
285 let daemon = TelegramDaemon::new("test-token".to_string(), None, 12345);
286 assert_eq!(daemon.bot_token, "test-token");
287 assert_eq!(daemon.api_url, None);
288 assert_eq!(daemon.chat_id, 12345);
289 }
290
291 #[test]
292 fn test_telegram_daemon_with_api_url() {
293 let daemon = TelegramDaemon::new(
294 "test-token".to_string(),
295 Some("http://localhost:8081".to_string()),
296 12345,
297 );
298 assert_eq!(daemon.bot_token, "test-token");
299 assert_eq!(daemon.api_url, Some("http://localhost:8081".to_string()));
300 assert_eq!(daemon.chat_id, 12345);
301 }
302}