Skip to main content

rain_engine_channels/
telegram.rs

1//! Telegram Bot API adapter using long-poll getUpdates.
2//!
3//! Requires `TELEGRAM_BOT_TOKEN` to be set.
4
5use crate::{ChannelAdapter, ChannelConfig};
6use async_trait::async_trait;
7use rain_engine_client::RainEngineClient;
8use serde::Deserialize;
9use tracing::{error, info, warn};
10
11#[derive(Debug, Clone)]
12pub struct TelegramAdapter {
13    token: String,
14    client: reqwest::Client,
15    engine_client: RainEngineClient,
16    config: ChannelConfig,
17}
18
19impl TelegramAdapter {
20    pub fn new(token: String, config: ChannelConfig) -> Self {
21        Self {
22            engine_client: RainEngineClient::new(&config.runtime_url)
23                .expect("failed to init client"),
24            client: reqwest::Client::new(),
25            token,
26            config,
27        }
28    }
29
30    fn api_url(&self, method: &str) -> String {
31        format!("https://api.telegram.org/bot{}/{}", self.token, method)
32    }
33
34    fn session_id(&self, chat_id: i64) -> String {
35        format!(
36            "{}-telegram-{}",
37            self.config.default_session_prefix, chat_id
38        )
39    }
40
41    async fn send_message(&self, chat_id: i64, text: &str) -> Result<(), reqwest::Error> {
42        self.client
43            .post(self.api_url("sendMessage"))
44            .json(&serde_json::json!({
45                "chat_id": chat_id,
46                "text": text,
47                "parse_mode": "Markdown"
48            }))
49            .send()
50            .await?;
51        Ok(())
52    }
53}
54
55#[derive(Debug, Deserialize)]
56struct TelegramResponse {
57    ok: bool,
58    result: Option<Vec<TelegramUpdate>>,
59}
60
61#[derive(Debug, Deserialize)]
62struct TelegramUpdate {
63    update_id: i64,
64    message: Option<TelegramMessage>,
65}
66
67#[derive(Debug, Deserialize)]
68struct TelegramMessage {
69    chat: TelegramChat,
70    from: Option<TelegramUser>,
71    text: Option<String>,
72}
73
74#[derive(Debug, Deserialize)]
75struct TelegramChat {
76    id: i64,
77}
78
79#[derive(Debug, Deserialize)]
80struct TelegramUser {
81    id: i64,
82    #[allow(dead_code)]
83    first_name: String,
84}
85
86#[async_trait]
87impl ChannelAdapter for TelegramAdapter {
88    fn name(&self) -> &str {
89        "telegram"
90    }
91
92    async fn run(&self, cancel: tokio_util::sync::CancellationToken) {
93        info!("Telegram adapter started");
94        let mut offset: Option<i64> = None;
95
96        loop {
97            if cancel.is_cancelled() {
98                info!("Telegram adapter shutting down");
99                return;
100            }
101
102            let mut url = self.api_url("getUpdates");
103            url.push_str("?timeout=30");
104            if let Some(off) = offset {
105                url.push_str(&format!("&offset={off}"));
106            }
107
108            let response = tokio::select! {
109                _ = cancel.cancelled() => return,
110                result = self.client.get(&url).send() => {
111                    match result {
112                        Ok(resp) => resp,
113                        Err(err) => {
114                            warn!("Telegram poll error: {err}");
115                            tokio::time::sleep(std::time::Duration::from_secs(5)).await;
116                            continue;
117                        }
118                    }
119                }
120            };
121
122            let updates: TelegramResponse = match response.json().await {
123                Ok(parsed) => parsed,
124                Err(err) => {
125                    warn!("Telegram parse error: {err}");
126                    tokio::time::sleep(std::time::Duration::from_secs(5)).await;
127                    continue;
128                }
129            };
130
131            if !updates.ok {
132                warn!("Telegram API returned ok=false");
133                tokio::time::sleep(std::time::Duration::from_secs(5)).await;
134                continue;
135            }
136
137            let Some(results) = updates.result else {
138                continue;
139            };
140
141            for update in results {
142                offset = Some(update.update_id + 1);
143
144                let Some(message) = update.message else {
145                    continue;
146                };
147                let Some(text) = message.text else {
148                    continue;
149                };
150
151                let actor_id = message
152                    .from
153                    .as_ref()
154                    .map(|u| format!("telegram:{}", u.id))
155                    .unwrap_or_else(|| format!("telegram:{}", message.chat.id));
156                let session_id = self.session_id(message.chat.id);
157
158                info!(
159                    chat_id = message.chat.id,
160                    actor = %actor_id,
161                    "Telegram message received"
162                );
163
164                match self
165                    .engine_client
166                    .send_human_input(&actor_id, &session_id, &text)
167                    .await
168                {
169                    Ok(result) => {
170                        let reply = result
171                            .outcome
172                            .response
173                            .as_deref()
174                            .unwrap_or("*(no response)*");
175                        if let Err(err) = self.send_message(message.chat.id, reply).await {
176                            error!("Failed to send Telegram reply: {err}");
177                        }
178                    }
179                    Err(err) => {
180                        error!("Engine request failed: {err}");
181                        let _ = self
182                            .send_message(message.chat.id, "⚠️ Engine error, please try again.")
183                            .await;
184                    }
185                }
186            }
187        }
188    }
189}