1use std::path::PathBuf;
12use std::sync::Arc;
13use std::sync::atomic::{AtomicBool, Ordering};
14
15use async_trait::async_trait;
16use tracing::{error, info, warn};
17
18use ralph_proto::daemon::{DaemonAdapter, StartLoopFn};
19
20use crate::bot::{BotApi, TelegramBot, escape_html};
21use crate::loop_lock::{LockState, lock_path, lock_state};
22use crate::state::StateManager;
23
24async fn wait_for_shutdown(shutdown: Arc<AtomicBool>) {
25 while !shutdown.load(Ordering::Relaxed) {
26 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
27 }
28}
29
30pub struct TelegramDaemon {
36 bot_token: String,
37 chat_id: i64,
38}
39
40impl TelegramDaemon {
41 pub fn new(bot_token: String, chat_id: i64) -> Self {
46 Self { bot_token, chat_id }
47 }
48}
49
50#[async_trait]
51impl DaemonAdapter for TelegramDaemon {
52 async fn run_daemon(
53 &self,
54 workspace_root: PathBuf,
55 start_loop: StartLoopFn,
56 ) -> anyhow::Result<()> {
57 let bot = TelegramBot::new(&self.bot_token);
58 let chat_id = self.chat_id;
59
60 let state_manager = StateManager::new(workspace_root.join(".ralph/telegram-state.json"));
61
62 let _ = bot.send_message(chat_id, "Ralph daemon online 🤖").await;
64
65 let shutdown = Arc::new(AtomicBool::new(false));
67 {
68 let flag = shutdown.clone();
69 tokio::spawn(async move {
70 let _ = tokio::signal::ctrl_c().await;
71 flag.store(true, Ordering::Relaxed);
72 });
73 }
74 #[cfg(unix)]
75 {
76 let flag = shutdown.clone();
77 tokio::spawn(async move {
78 match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
79 Ok(mut sigterm) => {
80 sigterm.recv().await;
81 flag.store(true, Ordering::Relaxed);
82 }
83 Err(e) => {
84 error!(error = %e, "Failed to register SIGTERM handler");
85 flag.store(true, Ordering::Relaxed);
86 }
87 }
88 });
89 }
90
91 let mut offset: i32 = 0;
92
93 'daemon: while !shutdown.load(Ordering::Relaxed) {
95 let updates = match tokio::select! {
97 _ = wait_for_shutdown(shutdown.clone()) => {
98 break 'daemon;
99 }
100 updates = poll_updates(&self.bot_token, 30, offset) => updates,
101 } {
102 Ok(u) => u,
103 Err(e) => {
104 warn!(error = %e, "Telegram poll failed, retrying");
105 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
106 continue;
107 }
108 };
109
110 for update in updates {
111 offset = update.update_id + 1;
112
113 let text = match update.text.as_deref() {
114 Some(t) => t,
115 None => continue,
116 };
117
118 if let Ok(mut state) = state_manager.load_or_default() {
119 if state.chat_id.is_none() {
120 state.chat_id = Some(chat_id);
121 }
122 state.last_seen = Some(chrono::Utc::now());
123 state.last_update_id = Some(update.update_id);
124 if let Err(e) = state_manager.save(&state) {
125 warn!(error = %e, "Failed to persist Telegram state");
126 }
127 } else {
128 warn!("Failed to load Telegram state");
129 }
130
131 info!(text = %text, "Daemon received message");
132
133 if text.starts_with('/') {
135 match text.split_whitespace().next().unwrap_or("") {
136 "/status" => {
137 let msg = match lock_state(&workspace_root) {
138 Ok(LockState::Active) => "A loop is running.".to_string(),
139 Ok(LockState::Stale) => {
140 "No active loop (stale lock file found).".to_string()
141 }
142 Ok(LockState::Inactive) => {
143 "Idle — waiting for messages.".to_string()
144 }
145 Err(e) => format!("Failed to check lock state: {}", e),
146 };
147 let _ = bot.send_message(chat_id, &msg).await;
148 }
149 _ => {
150 let _ = bot
151 .send_message(
152 chat_id,
153 "Unknown command. I only handle /status while idle.",
154 )
155 .await;
156 }
157 }
158 continue;
159 }
160
161 let lock_path = lock_path(&workspace_root);
163 let state = match lock_state(&workspace_root) {
164 Ok(state) => state,
165 Err(e) => {
166 warn!(error = %e, "Failed to check loop lock state");
167 let _ = bot
168 .send_message(
169 chat_id,
170 "Failed to check loop state; try again in a moment.",
171 )
172 .await;
173 continue;
174 }
175 };
176 if state == LockState::Active {
177 let _ = bot
178 .send_message(
179 chat_id,
180 "A loop is already running — it will receive your messages directly.",
181 )
182 .await;
183 continue;
184 }
185
186 if state == LockState::Stale {
187 warn!(
188 lock_path = %lock_path.display(),
189 "Found stale loop lock; starting new loop"
190 );
191 }
192
193 let escaped = escape_html(text);
195 let ack = format!("Starting loop: <i>{}</i>", escaped);
196 let _ = bot.send_message(chat_id, &ack).await;
197
198 let prompt = text.to_string();
202 let mut loop_handle = tokio::spawn(start_loop(prompt));
203 let result = tokio::select! {
204 _ = wait_for_shutdown(shutdown.clone()) => {
205 loop_handle.abort();
206 let _ = loop_handle.await;
207 break 'daemon;
208 }
209 result = &mut loop_handle => result,
210 };
211
212 match result {
214 Ok(Ok(description)) => {
215 let notification =
216 format!("Loop complete ({}).", escape_html(&description));
217 let _ = bot.send_message(chat_id, ¬ification).await;
218 }
219 Ok(Err(e)) => {
220 let notification = format!("Loop failed: {}", escape_html(&e.to_string()));
221 let _ = bot.send_message(chat_id, ¬ification).await;
222 }
223 Err(e) => {
224 let notification = format!("Loop failed: {}", escape_html(&e.to_string()));
225 let _ = bot.send_message(chat_id, ¬ification).await;
226 }
227 }
228 }
229 }
230
231 let _ = bot.send_message(chat_id, "Ralph daemon offline 👋").await;
233
234 Ok(())
235 }
236}
237
238struct DaemonUpdate {
244 update_id: i32,
245 text: Option<String>,
246}
247
248async fn poll_updates(
253 token: &str,
254 timeout_secs: u64,
255 offset: i32,
256) -> anyhow::Result<Vec<DaemonUpdate>> {
257 use teloxide::payloads::GetUpdatesSetters;
258 use teloxide::requests::Requester;
259
260 let bot = teloxide::Bot::new(token);
261 let request = bot
262 .get_updates()
263 .offset(offset)
264 .timeout(timeout_secs as u32);
265
266 let updates = request
267 .await
268 .map_err(|e| anyhow::anyhow!("Telegram getUpdates failed: {}", e))?;
269
270 let mut results = Vec::new();
271 for update in updates {
272 #[allow(clippy::cast_possible_wrap)]
273 let id = update.id.0 as i32;
274
275 let text = match update.kind {
276 teloxide::types::UpdateKind::Message(ref msg) => msg.text().map(String::from),
277 _ => None,
278 };
279
280 results.push(DaemonUpdate {
281 update_id: id,
282 text,
283 });
284 }
285
286 Ok(results)
287}
288
289#[cfg(test)]
290mod tests {
291 use super::*;
292
293 #[test]
294 fn test_telegram_daemon_creation() {
295 let daemon = TelegramDaemon::new("test-token".to_string(), 12345);
296 assert_eq!(daemon.bot_token, "test-token");
297 assert_eq!(daemon.chat_id, 12345);
298 }
299}