use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use async_trait::async_trait;
use tracing::{error, info, warn};
use ralph_proto::daemon::{DaemonAdapter, StartLoopFn};
use crate::bot::{BotApi, TelegramBot, escape_html};
use crate::loop_lock::{LockState, lock_path, lock_state};
use crate::state::StateManager;
async fn wait_for_shutdown(shutdown: Arc<AtomicBool>) {
while !shutdown.load(Ordering::Relaxed) {
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
}
}
pub struct TelegramDaemon {
bot_token: String,
chat_id: i64,
}
impl TelegramDaemon {
pub fn new(bot_token: String, chat_id: i64) -> Self {
Self { bot_token, chat_id }
}
}
#[async_trait]
impl DaemonAdapter for TelegramDaemon {
async fn run_daemon(
&self,
workspace_root: PathBuf,
start_loop: StartLoopFn,
) -> anyhow::Result<()> {
let bot = TelegramBot::new(&self.bot_token);
let chat_id = self.chat_id;
let state_manager = StateManager::new(workspace_root.join(".ralph/telegram-state.json"));
let _ = bot.send_message(chat_id, "Ralph daemon online 🤖").await;
let shutdown = Arc::new(AtomicBool::new(false));
{
let flag = shutdown.clone();
tokio::spawn(async move {
let _ = tokio::signal::ctrl_c().await;
flag.store(true, Ordering::Relaxed);
});
}
#[cfg(unix)]
{
let flag = shutdown.clone();
tokio::spawn(async move {
match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
Ok(mut sigterm) => {
sigterm.recv().await;
flag.store(true, Ordering::Relaxed);
}
Err(e) => {
error!(error = %e, "Failed to register SIGTERM handler");
flag.store(true, Ordering::Relaxed);
}
}
});
}
let mut offset: i32 = 0;
'daemon: while !shutdown.load(Ordering::Relaxed) {
let updates = match tokio::select! {
_ = wait_for_shutdown(shutdown.clone()) => {
break 'daemon;
}
updates = poll_updates(&self.bot_token, 30, offset) => updates,
} {
Ok(u) => u,
Err(e) => {
warn!(error = %e, "Telegram poll failed, retrying");
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
continue;
}
};
for update in updates {
offset = update.update_id + 1;
let text = match update.text.as_deref() {
Some(t) => t,
None => continue,
};
if let Ok(mut state) = state_manager.load_or_default() {
if state.chat_id.is_none() {
state.chat_id = Some(chat_id);
}
state.last_seen = Some(chrono::Utc::now());
state.last_update_id = Some(update.update_id);
if let Err(e) = state_manager.save(&state) {
warn!(error = %e, "Failed to persist Telegram state");
}
} else {
warn!("Failed to load Telegram state");
}
info!(text = %text, "Daemon received message");
if text.starts_with('/') {
let response = crate::commands::handle_command(text, &workspace_root)
.unwrap_or_else(|| {
"Unknown command. Use /help for the supported commands.".to_string()
});
let _ = bot.send_message(chat_id, &response).await;
continue;
}
let lock_path = lock_path(&workspace_root);
let state = match lock_state(&workspace_root) {
Ok(state) => state,
Err(e) => {
warn!(error = %e, "Failed to check loop lock state");
let _ = bot
.send_message(
chat_id,
"Failed to check loop state; try again in a moment.",
)
.await;
continue;
}
};
if state == LockState::Active {
let _ = bot
.send_message(
chat_id,
"A loop is already running — it will receive your messages directly.",
)
.await;
continue;
}
if state == LockState::Stale {
warn!(
lock_path = %lock_path.display(),
"Found stale loop lock; starting new loop"
);
}
let escaped = escape_html(text);
let ack = format!("Starting loop: <i>{}</i>", escaped);
let _ = bot.send_message(chat_id, &ack).await;
let prompt = text.to_string();
let mut loop_handle = tokio::spawn(start_loop(prompt));
let result = tokio::select! {
_ = wait_for_shutdown(shutdown.clone()) => {
loop_handle.abort();
let _ = loop_handle.await;
break 'daemon;
}
result = &mut loop_handle => result,
};
match result {
Ok(Ok(description)) => {
let notification =
format!("Loop complete ({}).", escape_html(&description));
let _ = bot.send_message(chat_id, ¬ification).await;
}
Ok(Err(e)) => {
let notification = format!("Loop failed: {}", escape_html(&e.to_string()));
let _ = bot.send_message(chat_id, ¬ification).await;
}
Err(e) => {
let notification = format!("Loop failed: {}", escape_html(&e.to_string()));
let _ = bot.send_message(chat_id, ¬ification).await;
}
}
}
}
let _ = bot.send_message(chat_id, "Ralph daemon offline 👋").await;
Ok(())
}
}
struct DaemonUpdate {
update_id: i32,
text: Option<String>,
}
async fn poll_updates(
token: &str,
timeout_secs: u64,
offset: i32,
) -> anyhow::Result<Vec<DaemonUpdate>> {
use teloxide::payloads::GetUpdatesSetters;
use teloxide::requests::Requester;
let bot = teloxide::Bot::new(token);
let request = bot
.get_updates()
.offset(offset)
.timeout(timeout_secs as u32);
let updates = request
.await
.map_err(|e| anyhow::anyhow!("Telegram getUpdates failed: {}", e))?;
let mut results = Vec::new();
for update in updates {
#[allow(clippy::cast_possible_wrap)]
let id = update.id.0 as i32;
let text = match update.kind {
teloxide::types::UpdateKind::Message(ref msg) => msg.text().map(String::from),
_ => None,
};
results.push(DaemonUpdate {
update_id: id,
text,
});
}
Ok(results)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_telegram_daemon_creation() {
let daemon = TelegramDaemon::new("test-token".to_string(), 12345);
assert_eq!(daemon.bot_token, "test-token");
assert_eq!(daemon.chat_id, 12345);
}
}