rain_engine_channels/
telegram.rs1use crate::{ChannelAdapter, ChannelConfig};
6use async_trait::async_trait;
7use rain_engine_client::RainEngineClient;
8use serde::Deserialize;
9use tracing::{error, info, warn};
10
11#[derive(Debug, Clone)]
12pub struct TelegramAdapter {
13 token: String,
14 client: reqwest::Client,
15 engine_client: RainEngineClient,
16 config: ChannelConfig,
17}
18
19impl TelegramAdapter {
20 pub fn new(token: String, config: ChannelConfig) -> Self {
21 Self {
22 engine_client: RainEngineClient::new(&config.runtime_url)
23 .expect("failed to init client"),
24 client: reqwest::Client::new(),
25 token,
26 config,
27 }
28 }
29
30 fn api_url(&self, method: &str) -> String {
31 format!("https://api.telegram.org/bot{}/{}", self.token, method)
32 }
33
34 fn session_id(&self, chat_id: i64) -> String {
35 format!(
36 "{}-telegram-{}",
37 self.config.default_session_prefix, chat_id
38 )
39 }
40
41 async fn send_message(&self, chat_id: i64, text: &str) -> Result<(), reqwest::Error> {
42 self.client
43 .post(self.api_url("sendMessage"))
44 .json(&serde_json::json!({
45 "chat_id": chat_id,
46 "text": text,
47 "parse_mode": "Markdown"
48 }))
49 .send()
50 .await?;
51 Ok(())
52 }
53}
54
55#[derive(Debug, Deserialize)]
56struct TelegramResponse {
57 ok: bool,
58 result: Option<Vec<TelegramUpdate>>,
59}
60
61#[derive(Debug, Deserialize)]
62struct TelegramUpdate {
63 update_id: i64,
64 message: Option<TelegramMessage>,
65}
66
67#[derive(Debug, Deserialize)]
68struct TelegramMessage {
69 chat: TelegramChat,
70 from: Option<TelegramUser>,
71 text: Option<String>,
72}
73
74#[derive(Debug, Deserialize)]
75struct TelegramChat {
76 id: i64,
77}
78
79#[derive(Debug, Deserialize)]
80struct TelegramUser {
81 id: i64,
82 #[allow(dead_code)]
83 first_name: String,
84}
85
86#[async_trait]
87impl ChannelAdapter for TelegramAdapter {
88 fn name(&self) -> &str {
89 "telegram"
90 }
91
92 async fn run(&self, cancel: tokio_util::sync::CancellationToken) {
93 info!("Telegram adapter started");
94 let mut offset: Option<i64> = None;
95
96 loop {
97 if cancel.is_cancelled() {
98 info!("Telegram adapter shutting down");
99 return;
100 }
101
102 let mut url = self.api_url("getUpdates");
103 url.push_str("?timeout=30");
104 if let Some(off) = offset {
105 url.push_str(&format!("&offset={off}"));
106 }
107
108 let response = tokio::select! {
109 _ = cancel.cancelled() => return,
110 result = self.client.get(&url).send() => {
111 match result {
112 Ok(resp) => resp,
113 Err(err) => {
114 warn!("Telegram poll error: {err}");
115 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
116 continue;
117 }
118 }
119 }
120 };
121
122 let updates: TelegramResponse = match response.json().await {
123 Ok(parsed) => parsed,
124 Err(err) => {
125 warn!("Telegram parse error: {err}");
126 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
127 continue;
128 }
129 };
130
131 if !updates.ok {
132 warn!("Telegram API returned ok=false");
133 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
134 continue;
135 }
136
137 let Some(results) = updates.result else {
138 continue;
139 };
140
141 for update in results {
142 offset = Some(update.update_id + 1);
143
144 let Some(message) = update.message else {
145 continue;
146 };
147 let Some(text) = message.text else {
148 continue;
149 };
150
151 let actor_id = message
152 .from
153 .as_ref()
154 .map(|u| format!("telegram:{}", u.id))
155 .unwrap_or_else(|| format!("telegram:{}", message.chat.id));
156 let session_id = self.session_id(message.chat.id);
157
158 info!(
159 chat_id = message.chat.id,
160 actor = %actor_id,
161 "Telegram message received"
162 );
163
164 match self
165 .engine_client
166 .send_human_input(&actor_id, &session_id, &text)
167 .await
168 {
169 Ok(result) => {
170 let reply = result
171 .outcome
172 .response
173 .as_deref()
174 .unwrap_or("*(no response)*");
175 if let Err(err) = self.send_message(message.chat.id, reply).await {
176 error!("Failed to send Telegram reply: {err}");
177 }
178 }
179 Err(err) => {
180 error!("Engine request failed: {err}");
181 let _ = self
182 .send_message(message.chat.id, "⚠️ Engine error, please try again.")
183 .await;
184 }
185 }
186 }
187 }
188 }
189}