Skip to main content

ralph_telegram/
daemon.rs

1//! Telegram daemon adapter.
2//!
3//! Implements [`DaemonAdapter`] for Telegram, providing a persistent process
4//! that listens for messages and starts orchestration loops on demand.
5//!
6//! Uses a **turn-taking model**: the daemon polls Telegram while idle, but
7//! stops polling when a loop starts — the loop's own [`TelegramService`]
8//! takes over for the full Telegram feature set (commands, guidance,
9//! responses, check-ins). When the loop finishes, the daemon resumes.
10
11use std::path::PathBuf;
12use std::sync::Arc;
13use std::sync::atomic::{AtomicBool, Ordering};
14
15use async_trait::async_trait;
16use tracing::{error, info, warn};
17
18use ralph_proto::daemon::{DaemonAdapter, StartLoopFn};
19
20use crate::bot::{BotApi, TelegramBot, escape_html};
21use crate::loop_lock::{LockState, lock_path, lock_state};
22use crate::state::StateManager;
23
24async fn wait_for_shutdown(shutdown: Arc<AtomicBool>) {
25    while !shutdown.load(Ordering::Relaxed) {
26        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
27    }
28}
29
30/// A Telegram-based daemon adapter.
31///
32/// Polls Telegram for messages while idle and delegates loop execution
33/// to the provided [`StartLoopFn`] callback. Supports the shared slash-command
34/// set (`/status`, `/tasks`, `/model`, etc.) and graceful shutdown via
35/// `SIGINT`/`SIGTERM`.
36pub struct TelegramDaemon {
37    bot_token: String,
38    api_url: Option<String>,
39    chat_id: i64,
40}
41
42impl TelegramDaemon {
43    /// Create a new Telegram daemon.
44    ///
45    /// `bot_token` — Telegram Bot API token.
46    /// `api_url` — Optional custom Telegram Bot API URL.
47    /// `chat_id` — The Telegram chat to communicate with.
48    pub fn new(bot_token: String, api_url: Option<String>, chat_id: i64) -> Self {
49        Self {
50            bot_token,
51            api_url,
52            chat_id,
53        }
54    }
55}
56
57#[async_trait]
58impl DaemonAdapter for TelegramDaemon {
59    async fn run_daemon(
60        &self,
61        workspace_root: PathBuf,
62        start_loop: StartLoopFn,
63    ) -> anyhow::Result<()> {
64        let bot = TelegramBot::new(&self.bot_token, self.api_url.as_deref());
65        let chat_id = self.chat_id;
66
67        let state_manager = StateManager::new(workspace_root.join(".ralph/telegram-state.json"));
68
69        // Send greeting
70        let _ = bot.send_message(chat_id, "Ralph daemon online 🤖").await;
71
72        // Install signal handlers for graceful shutdown
73        let shutdown = Arc::new(AtomicBool::new(false));
74        {
75            let flag = shutdown.clone();
76            tokio::spawn(async move {
77                let _ = tokio::signal::ctrl_c().await;
78                flag.store(true, Ordering::Relaxed);
79            });
80        }
81        #[cfg(unix)]
82        {
83            let flag = shutdown.clone();
84            tokio::spawn(async move {
85                match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
86                    Ok(mut sigterm) => {
87                        sigterm.recv().await;
88                        flag.store(true, Ordering::Relaxed);
89                    }
90                    Err(e) => {
91                        error!(error = %e, "Failed to register SIGTERM handler");
92                        flag.store(true, Ordering::Relaxed);
93                    }
94                }
95            });
96        }
97
98        let mut offset: i32 = 0;
99
100        // Main daemon loop
101        'daemon: while !shutdown.load(Ordering::Relaxed) {
102            // ── Idle: poll Telegram for messages ──
103            let updates = match tokio::select! {
104                _ = wait_for_shutdown(shutdown.clone()) => {
105                    break 'daemon;
106                }
107                updates = poll_updates(&self.bot_token, self.api_url.as_deref(), 30, offset) => updates,
108            } {
109                Ok(u) => u,
110                Err(e) => {
111                    warn!(error = %e, "Telegram poll failed, retrying");
112                    tokio::time::sleep(std::time::Duration::from_secs(5)).await;
113                    continue;
114                }
115            };
116
117            for update in updates {
118                offset = update.update_id + 1;
119
120                let text = match update.text.as_deref() {
121                    Some(t) => t,
122                    None => continue,
123                };
124
125                if let Ok(mut state) = state_manager.load_or_default() {
126                    if state.chat_id.is_none() {
127                        state.chat_id = Some(chat_id);
128                    }
129                    state.last_seen = Some(chrono::Utc::now());
130                    state.last_update_id = Some(update.update_id);
131                    if let Err(e) = state_manager.save(&state) {
132                        warn!(error = %e, "Failed to persist Telegram state");
133                    }
134                } else {
135                    warn!("Failed to load Telegram state");
136                }
137
138                info!(text = %text, "Daemon received message");
139
140                // Handle slash-commands while idle using the shared command parser.
141                if text.starts_with('/') {
142                    let response = crate::commands::handle_command(text, &workspace_root)
143                        .unwrap_or_else(|| {
144                            "Unknown command. Use /help for the supported commands.".to_string()
145                        });
146                    let _ = bot.send_message(chat_id, &response).await;
147                    continue;
148                }
149
150                // Regular message → check lock state
151                let lock_path = lock_path(&workspace_root);
152                let state = match lock_state(&workspace_root) {
153                    Ok(state) => state,
154                    Err(e) => {
155                        warn!(error = %e, "Failed to check loop lock state");
156                        let _ = bot
157                            .send_message(
158                                chat_id,
159                                "Failed to check loop state; try again in a moment.",
160                            )
161                            .await;
162                        continue;
163                    }
164                };
165                if state == LockState::Active {
166                    let _ = bot
167                        .send_message(
168                            chat_id,
169                            "A loop is already running — it will receive your messages directly.",
170                        )
171                        .await;
172                    continue;
173                }
174
175                if state == LockState::Stale {
176                    warn!(
177                        lock_path = %lock_path.display(),
178                        "Found stale loop lock; starting new loop"
179                    );
180                }
181
182                // No loop running — start one with this message as prompt
183                let escaped = escape_html(text);
184                let ack = format!("Starting loop: <i>{}</i>", escaped);
185                let _ = bot.send_message(chat_id, &ack).await;
186
187                // ── Loop Running: hand off Telegram to the loop ──
188                // The loop's TelegramService polls getUpdates, handles commands,
189                // guidance, responses, check-ins. We just await completion.
190                let prompt = text.to_string();
191                let mut loop_handle = tokio::spawn(start_loop(prompt));
192                let result = tokio::select! {
193                    _ = wait_for_shutdown(shutdown.clone()) => {
194                        loop_handle.abort();
195                        let _ = loop_handle.await;
196                        break 'daemon;
197                    }
198                    result = &mut loop_handle => result,
199                };
200
201                // Loop finished — daemon resumes polling.
202                match result {
203                    Ok(Ok(description)) => {
204                        let notification =
205                            format!("Loop complete ({}).", escape_html(&description));
206                        let _ = bot.send_message(chat_id, &notification).await;
207                    }
208                    Ok(Err(e)) => {
209                        let notification = format!("Loop failed: {}", escape_html(&e.to_string()));
210                        let _ = bot.send_message(chat_id, &notification).await;
211                    }
212                    Err(e) => {
213                        let notification = format!("Loop failed: {}", escape_html(&e.to_string()));
214                        let _ = bot.send_message(chat_id, &notification).await;
215                    }
216                }
217            }
218        }
219
220        // Farewell
221        let _ = bot.send_message(chat_id, "Ralph daemon offline 👋").await;
222
223        Ok(())
224    }
225}
226
227// ─────────────────────────────────────────────────────────────────────────────
228// Lightweight Telegram polling (teloxide Bot client)
229// ─────────────────────────────────────────────────────────────────────────────
230
231/// A minimal parsed update for daemon idle polling.
232struct DaemonUpdate {
233    update_id: i32,
234    text: Option<String>,
235}
236
237/// Long-poll `getUpdates` using the teloxide Bot client.
238///
239/// Uses teloxide's built-in HTTP client rather than raw `reqwest`
240/// since `ralph-telegram` already depends on teloxide.
241async fn poll_updates(
242    token: &str,
243    api_url: Option<&str>,
244    timeout_secs: u64,
245    offset: i32,
246) -> anyhow::Result<Vec<DaemonUpdate>> {
247    use teloxide::payloads::GetUpdatesSetters;
248    use teloxide::requests::Requester;
249
250    let bot = crate::apply_api_url(teloxide::Bot::new(token), api_url);
251    let request = bot
252        .get_updates()
253        .offset(offset)
254        .timeout(timeout_secs as u32);
255
256    let updates = request
257        .await
258        .map_err(|e| anyhow::anyhow!("Telegram getUpdates failed: {}", e))?;
259
260    let mut results = Vec::new();
261    for update in updates {
262        #[allow(clippy::cast_possible_wrap)]
263        let id = update.id.0 as i32;
264
265        let text = match update.kind {
266            teloxide::types::UpdateKind::Message(ref msg) => msg.text().map(String::from),
267            _ => None,
268        };
269
270        results.push(DaemonUpdate {
271            update_id: id,
272            text,
273        });
274    }
275
276    Ok(results)
277}
278
279#[cfg(test)]
280mod tests {
281    use super::*;
282
283    #[test]
284    fn test_telegram_daemon_creation() {
285        let daemon = TelegramDaemon::new("test-token".to_string(), None, 12345);
286        assert_eq!(daemon.bot_token, "test-token");
287        assert_eq!(daemon.api_url, None);
288        assert_eq!(daemon.chat_id, 12345);
289    }
290
291    #[test]
292    fn test_telegram_daemon_with_api_url() {
293        let daemon = TelegramDaemon::new(
294            "test-token".to_string(),
295            Some("http://localhost:8081".to_string()),
296            12345,
297        );
298        assert_eq!(daemon.bot_token, "test-token");
299        assert_eq!(daemon.api_url, Some("http://localhost:8081".to_string()));
300        assert_eq!(daemon.chat_id, 12345);
301    }
302}