Skip to main content

sparrow/gateway/
telegram.rs

1use tokio::sync::mpsc;
2
3use super::{GatewayMessage, GatewayResponse, GatewayTransport};
4
5// ─── Telegram Bot transport ─────────────────────────────────────────────────────
6
7pub struct TelegramTransport {
8    bot_token: String,
9    api_url: String,
10    client: reqwest::Client,
11    allowed_users: Vec<String>,
12}
13
14impl TelegramTransport {
15    pub fn new(bot_token: String, allowed_users: Vec<String>) -> Self {
16        let api_url = format!("https://api.telegram.org/bot{}", bot_token);
17        Self {
18            bot_token,
19            api_url,
20            client: reqwest::Client::new(),
21            allowed_users,
22        }
23    }
24
25    /// Send a message via Telegram Bot API
26    async fn send_message(&self, chat_id: &str, text: &str) -> anyhow::Result<()> {
27        self.client
28            .post(format!("{}/sendMessage", self.api_url))
29            .json(&serde_json::json!({
30                "chat_id": chat_id,
31                "text": text,
32                "parse_mode": "Markdown",
33            }))
34            .send()
35            .await?;
36        Ok(())
37    }
38
39    /// Send a message with inline keyboard buttons
40    async fn send_message_with_buttons(
41        &self,
42        chat_id: &str,
43        text: &str,
44        buttons: &[Vec<String>],
45    ) -> anyhow::Result<()> {
46        let inline_keyboard: Vec<Vec<serde_json::Value>> = buttons
47            .iter()
48            .map(|row| {
49                row.iter()
50                    .map(|b| {
51                        serde_json::json!({
52                            "text": b,
53                            "callback_data": b,
54                        })
55                    })
56                    .collect()
57            })
58            .collect();
59
60        self.client
61            .post(format!("{}/sendMessage", self.api_url))
62            .json(&serde_json::json!({
63                "chat_id": chat_id,
64                "text": text,
65                "parse_mode": "Markdown",
66                "reply_markup": {
67                    "inline_keyboard": inline_keyboard,
68                }
69            }))
70            .send()
71            .await?;
72        Ok(())
73    }
74}
75
76#[async_trait::async_trait]
77impl GatewayTransport for TelegramTransport {
78    fn name(&self) -> &str {
79        "telegram"
80    }
81
82    async fn start(&self, tx: mpsc::UnboundedSender<GatewayMessage>) -> anyhow::Result<()> {
83        let token = self.bot_token.clone();
84        let api_url = self.api_url.clone();
85        let client = self.client.clone();
86        let allowed = self.allowed_users.clone();
87
88        tracing::info!("Telegram gateway starting (bot token: {}...)", &token[..8]);
89
90        tokio::spawn(async move {
91            let mut offset: i64 = 0;
92
93            loop {
94                // Long polling: getUpdates
95                let resp = client
96                    .post(format!("{}/getUpdates", api_url))
97                    .json(&serde_json::json!({
98                        "offset": offset,
99                        "timeout": 30,
100                        "allowed_updates": ["message"],
101                    }))
102                    .send()
103                    .await;
104
105                match resp {
106                    Ok(r) => {
107                        if let Ok(json) = r.json::<serde_json::Value>().await {
108                            if let Some(updates) = json["result"].as_array() {
109                                for update in updates {
110                                    if let Some(update_id) = update["update_id"].as_i64() {
111                                        offset = update_id + 1;
112                                    }
113
114                                    if let Some(msg) = update["message"].as_object() {
115                                        let chat = &msg["chat"];
116                                        let chat_id = chat["id"]
117                                            .as_i64()
118                                            .map(|i| i.to_string())
119                                            .unwrap_or_default();
120                                        let user_id = msg["from"]["id"]
121                                            .as_i64()
122                                            .map(|i| i.to_string())
123                                            .unwrap_or_default();
124                                        if !allowed.is_empty() && !allowed.contains(&user_id) {
125                                            continue;
126                                        }
127                                        let text = msg["text"].as_str().unwrap_or("").to_string();
128                                        let message_id =
129                                            msg["message_id"].as_i64().map(|i| i.to_string());
130
131                                        if !text.is_empty() {
132                                            let _ = tx.send(GatewayMessage {
133                                                surface: "telegram".into(),
134                                                user_id,
135                                                chat_id,
136                                                text,
137                                                message_id,
138                                            });
139                                        }
140                                    }
141                                }
142                            }
143                        }
144                    }
145                    Err(e) => {
146                        tracing::error!("Telegram poll error: {}", e);
147                        tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
148                    }
149                }
150            }
151        });
152
153        Ok(())
154    }
155
156    async fn send(&self, response: GatewayResponse) -> anyhow::Result<()> {
157        if response.buttons.is_empty() {
158            self.send_message(&response.chat_id, &response.text).await
159        } else {
160            self.send_message_with_buttons(&response.chat_id, &response.text, &response.buttons)
161                .await
162        }
163    }
164
165    async fn stop(&self) -> anyhow::Result<()> {
166        tracing::info!("Telegram gateway stopped");
167        Ok(())
168    }
169}