Skip to main content

ralph_telegram/
daemon.rs

1//! Telegram daemon adapter.
2//!
3//! Implements [`DaemonAdapter`] for Telegram, providing a persistent process
4//! that listens for messages and starts orchestration loops on demand.
5//!
6//! Uses a **turn-taking model**: the daemon polls Telegram while idle, but
7//! stops polling when a loop starts — the loop's own [`TelegramService`]
8//! takes over for the full Telegram feature set (commands, guidance,
9//! responses, check-ins). When the loop finishes, the daemon resumes.
10
11use std::path::PathBuf;
12use std::sync::Arc;
13use std::sync::atomic::{AtomicBool, Ordering};
14
15use async_trait::async_trait;
16use tracing::{error, info, warn};
17
18use ralph_proto::daemon::{DaemonAdapter, StartLoopFn};
19
20use crate::bot::{BotApi, TelegramBot, escape_html};
21use crate::loop_lock::{LockState, lock_path, lock_state};
22use crate::state::StateManager;
23
24async fn wait_for_shutdown(shutdown: Arc<AtomicBool>) {
25    while !shutdown.load(Ordering::Relaxed) {
26        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
27    }
28}
29
30/// A Telegram-based daemon adapter.
31///
32/// Polls Telegram for messages while idle and delegates loop execution
33/// to the provided [`StartLoopFn`] callback. Supports `/status` commands
34/// and graceful shutdown via `SIGINT`/`SIGTERM`.
35pub struct TelegramDaemon {
36    bot_token: String,
37    chat_id: i64,
38}
39
40impl TelegramDaemon {
41    /// Create a new Telegram daemon.
42    ///
43    /// `bot_token` — Telegram Bot API token.
44    /// `chat_id` — The Telegram chat to communicate with.
45    pub fn new(bot_token: String, chat_id: i64) -> Self {
46        Self { bot_token, chat_id }
47    }
48}
49
50#[async_trait]
51impl DaemonAdapter for TelegramDaemon {
52    async fn run_daemon(
53        &self,
54        workspace_root: PathBuf,
55        start_loop: StartLoopFn,
56    ) -> anyhow::Result<()> {
57        let bot = TelegramBot::new(&self.bot_token);
58        let chat_id = self.chat_id;
59
60        let state_manager = StateManager::new(workspace_root.join(".ralph/telegram-state.json"));
61
62        // Send greeting
63        let _ = bot.send_message(chat_id, "Ralph daemon online 🤖").await;
64
65        // Install signal handlers for graceful shutdown
66        let shutdown = Arc::new(AtomicBool::new(false));
67        {
68            let flag = shutdown.clone();
69            tokio::spawn(async move {
70                let _ = tokio::signal::ctrl_c().await;
71                flag.store(true, Ordering::Relaxed);
72            });
73        }
74        #[cfg(unix)]
75        {
76            let flag = shutdown.clone();
77            tokio::spawn(async move {
78                match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
79                    Ok(mut sigterm) => {
80                        sigterm.recv().await;
81                        flag.store(true, Ordering::Relaxed);
82                    }
83                    Err(e) => {
84                        error!(error = %e, "Failed to register SIGTERM handler");
85                        flag.store(true, Ordering::Relaxed);
86                    }
87                }
88            });
89        }
90
91        let mut offset: i32 = 0;
92
93        // Main daemon loop
94        'daemon: while !shutdown.load(Ordering::Relaxed) {
95            // ── Idle: poll Telegram for messages ──
96            let updates = match tokio::select! {
97                _ = wait_for_shutdown(shutdown.clone()) => {
98                    break 'daemon;
99                }
100                updates = poll_updates(&self.bot_token, 30, offset) => updates,
101            } {
102                Ok(u) => u,
103                Err(e) => {
104                    warn!(error = %e, "Telegram poll failed, retrying");
105                    tokio::time::sleep(std::time::Duration::from_secs(5)).await;
106                    continue;
107                }
108            };
109
110            for update in updates {
111                offset = update.update_id + 1;
112
113                let text = match update.text.as_deref() {
114                    Some(t) => t,
115                    None => continue,
116                };
117
118                if let Ok(mut state) = state_manager.load_or_default() {
119                    if state.chat_id.is_none() {
120                        state.chat_id = Some(chat_id);
121                    }
122                    state.last_seen = Some(chrono::Utc::now());
123                    state.last_update_id = Some(update.update_id);
124                    if let Err(e) = state_manager.save(&state) {
125                        warn!(error = %e, "Failed to persist Telegram state");
126                    }
127                } else {
128                    warn!("Failed to load Telegram state");
129                }
130
131                info!(text = %text, "Daemon received message");
132
133                // Handle daemon-only commands
134                if text.starts_with('/') {
135                    match text.split_whitespace().next().unwrap_or("") {
136                        "/status" => {
137                            let msg = match lock_state(&workspace_root) {
138                                Ok(LockState::Active) => "A loop is running.".to_string(),
139                                Ok(LockState::Stale) => {
140                                    "No active loop (stale lock file found).".to_string()
141                                }
142                                Ok(LockState::Inactive) => {
143                                    "Idle — waiting for messages.".to_string()
144                                }
145                                Err(e) => format!("Failed to check lock state: {}", e),
146                            };
147                            let _ = bot.send_message(chat_id, &msg).await;
148                        }
149                        _ => {
150                            let _ = bot
151                                .send_message(
152                                    chat_id,
153                                    "Unknown command. I only handle /status while idle.",
154                                )
155                                .await;
156                        }
157                    }
158                    continue;
159                }
160
161                // Regular message → check lock state
162                let lock_path = lock_path(&workspace_root);
163                let state = match lock_state(&workspace_root) {
164                    Ok(state) => state,
165                    Err(e) => {
166                        warn!(error = %e, "Failed to check loop lock state");
167                        let _ = bot
168                            .send_message(
169                                chat_id,
170                                "Failed to check loop state; try again in a moment.",
171                            )
172                            .await;
173                        continue;
174                    }
175                };
176                if state == LockState::Active {
177                    let _ = bot
178                        .send_message(
179                            chat_id,
180                            "A loop is already running — it will receive your messages directly.",
181                        )
182                        .await;
183                    continue;
184                }
185
186                if state == LockState::Stale {
187                    warn!(
188                        lock_path = %lock_path.display(),
189                        "Found stale loop lock; starting new loop"
190                    );
191                }
192
193                // No loop running — start one with this message as prompt
194                let escaped = escape_html(text);
195                let ack = format!("Starting loop: <i>{}</i>", escaped);
196                let _ = bot.send_message(chat_id, &ack).await;
197
198                // ── Loop Running: hand off Telegram to the loop ──
199                // The loop's TelegramService polls getUpdates, handles commands,
200                // guidance, responses, check-ins. We just await completion.
201                let prompt = text.to_string();
202                let mut loop_handle = tokio::spawn(start_loop(prompt));
203                let result = tokio::select! {
204                    _ = wait_for_shutdown(shutdown.clone()) => {
205                        loop_handle.abort();
206                        let _ = loop_handle.await;
207                        break 'daemon;
208                    }
209                    result = &mut loop_handle => result,
210                };
211
212                // Loop finished — daemon resumes polling.
213                match result {
214                    Ok(Ok(description)) => {
215                        let notification =
216                            format!("Loop complete ({}).", escape_html(&description));
217                        let _ = bot.send_message(chat_id, &notification).await;
218                    }
219                    Ok(Err(e)) => {
220                        let notification = format!("Loop failed: {}", escape_html(&e.to_string()));
221                        let _ = bot.send_message(chat_id, &notification).await;
222                    }
223                    Err(e) => {
224                        let notification = format!("Loop failed: {}", escape_html(&e.to_string()));
225                        let _ = bot.send_message(chat_id, &notification).await;
226                    }
227                }
228            }
229        }
230
231        // Farewell
232        let _ = bot.send_message(chat_id, "Ralph daemon offline 👋").await;
233
234        Ok(())
235    }
236}
237
238// ─────────────────────────────────────────────────────────────────────────────
239// Lightweight Telegram polling (teloxide Bot client)
240// ─────────────────────────────────────────────────────────────────────────────
241
242/// A minimal parsed update for daemon idle polling.
243struct DaemonUpdate {
244    update_id: i32,
245    text: Option<String>,
246}
247
248/// Long-poll `getUpdates` using the teloxide Bot client.
249///
250/// Uses teloxide's built-in HTTP client rather than raw `reqwest`
251/// since `ralph-telegram` already depends on teloxide.
252async fn poll_updates(
253    token: &str,
254    timeout_secs: u64,
255    offset: i32,
256) -> anyhow::Result<Vec<DaemonUpdate>> {
257    use teloxide::payloads::GetUpdatesSetters;
258    use teloxide::requests::Requester;
259
260    let bot = teloxide::Bot::new(token);
261    let request = bot
262        .get_updates()
263        .offset(offset)
264        .timeout(timeout_secs as u32);
265
266    let updates = request
267        .await
268        .map_err(|e| anyhow::anyhow!("Telegram getUpdates failed: {}", e))?;
269
270    let mut results = Vec::new();
271    for update in updates {
272        #[allow(clippy::cast_possible_wrap)]
273        let id = update.id.0 as i32;
274
275        let text = match update.kind {
276            teloxide::types::UpdateKind::Message(ref msg) => msg.text().map(String::from),
277            _ => None,
278        };
279
280        results.push(DaemonUpdate {
281            update_id: id,
282            text,
283        });
284    }
285
286    Ok(results)
287}
288
289#[cfg(test)]
290mod tests {
291    use super::*;
292
293    #[test]
294    fn test_telegram_daemon_creation() {
295        let daemon = TelegramDaemon::new("test-token".to_string(), 12345);
296        assert_eq!(daemon.bot_token, "test-token");
297        assert_eq!(daemon.chat_id, 12345);
298    }
299}