sparrow/gateway/
telegram.rs1use tokio::sync::mpsc;
2
3use super::{GatewayMessage, GatewayResponse, GatewayTransport};
4
5pub struct TelegramTransport {
8 bot_token: String,
9 api_url: String,
10 client: reqwest::Client,
11 allowed_users: Vec<String>,
12}
13
14impl TelegramTransport {
15 pub fn new(bot_token: String, allowed_users: Vec<String>) -> Self {
16 let api_url = format!("https://api.telegram.org/bot{}", bot_token);
17 Self {
18 bot_token,
19 api_url,
20 client: reqwest::Client::new(),
21 allowed_users,
22 }
23 }
24
25 async fn send_message(&self, chat_id: &str, text: &str) -> anyhow::Result<()> {
27 self.client
28 .post(format!("{}/sendMessage", self.api_url))
29 .json(&serde_json::json!({
30 "chat_id": chat_id,
31 "text": text,
32 "parse_mode": "Markdown",
33 }))
34 .send()
35 .await?;
36 Ok(())
37 }
38
39 async fn send_message_with_buttons(
41 &self,
42 chat_id: &str,
43 text: &str,
44 buttons: &[Vec<String>],
45 ) -> anyhow::Result<()> {
46 let inline_keyboard: Vec<Vec<serde_json::Value>> = buttons
47 .iter()
48 .map(|row| {
49 row.iter()
50 .map(|b| {
51 serde_json::json!({
52 "text": b,
53 "callback_data": b,
54 })
55 })
56 .collect()
57 })
58 .collect();
59
60 self.client
61 .post(format!("{}/sendMessage", self.api_url))
62 .json(&serde_json::json!({
63 "chat_id": chat_id,
64 "text": text,
65 "parse_mode": "Markdown",
66 "reply_markup": {
67 "inline_keyboard": inline_keyboard,
68 }
69 }))
70 .send()
71 .await?;
72 Ok(())
73 }
74}
75
76#[async_trait::async_trait]
77impl GatewayTransport for TelegramTransport {
78 fn name(&self) -> &str {
79 "telegram"
80 }
81
82 async fn start(&self, tx: mpsc::UnboundedSender<GatewayMessage>) -> anyhow::Result<()> {
83 let token = self.bot_token.clone();
84 let api_url = self.api_url.clone();
85 let client = self.client.clone();
86 let allowed = self.allowed_users.clone();
87
88 tracing::info!("Telegram gateway starting (bot token: {}...)", &token[..8]);
89
90 tokio::spawn(async move {
91 let mut offset: i64 = 0;
92
93 loop {
94 let resp = client
96 .post(format!("{}/getUpdates", api_url))
97 .json(&serde_json::json!({
98 "offset": offset,
99 "timeout": 30,
100 "allowed_updates": ["message"],
101 }))
102 .send()
103 .await;
104
105 match resp {
106 Ok(r) => {
107 if let Ok(json) = r.json::<serde_json::Value>().await {
108 if let Some(updates) = json["result"].as_array() {
109 for update in updates {
110 if let Some(update_id) = update["update_id"].as_i64() {
111 offset = update_id + 1;
112 }
113
114 if let Some(msg) = update["message"].as_object() {
115 let chat = &msg["chat"];
116 let chat_id = chat["id"]
117 .as_i64()
118 .map(|i| i.to_string())
119 .unwrap_or_default();
120 let user_id = msg["from"]["id"]
121 .as_i64()
122 .map(|i| i.to_string())
123 .unwrap_or_default();
124 if !allowed.is_empty() && !allowed.contains(&user_id) {
125 continue;
126 }
127 let text = msg["text"].as_str().unwrap_or("").to_string();
128 let message_id =
129 msg["message_id"].as_i64().map(|i| i.to_string());
130
131 if !text.is_empty() {
132 let _ = tx.send(GatewayMessage {
133 surface: "telegram".into(),
134 user_id,
135 chat_id,
136 text,
137 message_id,
138 });
139 }
140 }
141 }
142 }
143 }
144 }
145 Err(e) => {
146 tracing::error!("Telegram poll error: {}", e);
147 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
148 }
149 }
150 }
151 });
152
153 Ok(())
154 }
155
156 async fn send(&self, response: GatewayResponse) -> anyhow::Result<()> {
157 if response.buttons.is_empty() {
158 self.send_message(&response.chat_id, &response.text).await
159 } else {
160 self.send_message_with_buttons(&response.chat_id, &response.text, &response.buttons)
161 .await
162 }
163 }
164
165 async fn stop(&self) -> anyhow::Result<()> {
166 tracing::info!("Telegram gateway stopped");
167 Ok(())
168 }
169}