1use crate::platform::{
4 ApprovalRequest, ChatPlatform, OutgoingMessage, ProgressStatus, ProgressUpdate,
5 WorkflowNotification,
6};
7use anyhow::{Context, Result};
8use serde::Deserialize;
9use tracing::warn;
10
11#[derive(Debug, Clone)]
13pub struct TelegramConfig {
14 pub bot_token: String,
16 pub default_chat_id: Option<String>,
18}
19
20impl TelegramConfig {
21 pub fn from_env() -> Result<Self> {
23 let bot_token =
24 std::env::var("TELEGRAM_BOT_TOKEN").context("TELEGRAM_BOT_TOKEN not set")?;
25 let default_chat_id = std::env::var("TELEGRAM_CHAT_ID").ok();
26
27 Ok(Self {
28 bot_token,
29 default_chat_id,
30 })
31 }
32}
33
34pub struct TelegramBot {
36 config: TelegramConfig,
37 client: reqwest::Client,
38}
39
40#[derive(Deserialize)]
42struct TelegramResponse {
43 ok: bool,
44 #[serde(default)]
45 description: Option<String>,
46 result: Option<serde_json::Value>,
47}
48
49#[derive(Debug, Clone, PartialEq, Eq)]
54pub enum TelegramCommand {
55 Run {
57 workflow: String,
58 shadow: bool,
59 variables: std::collections::HashMap<String, String>,
60 },
61 Workflows,
63 Audit { workflow_id: Option<String> },
65 Status,
67 Help,
69 Unknown { text: String },
71}
72
73impl TelegramBot {
74 pub fn new(config: TelegramConfig) -> Self {
75 Self {
76 config,
77 client: reqwest::Client::builder().connect_timeout(std::time::Duration::from_secs(10)).timeout(std::time::Duration::from_secs(30)).build().unwrap_or_else(|_| reqwest::Client::new()),
78 }
79 }
80
81 fn api_url(&self, method: &str) -> String {
87 format!(
88 "https://api.telegram.org/bot{}/{}",
89 self.config.bot_token, method
90 )
91 }
92
93 async fn api_call(
95 &self,
96 method: &str,
97 body: serde_json::Value,
98 ) -> Result<TelegramResponse> {
99 let url = self.api_url(method);
100 let response = self
101 .client
102 .post(&url)
103 .json(&body)
104 .send()
105 .await
106 .context("Telegram API request failed")?;
107
108 let api_response: TelegramResponse = response.json().await?;
109 if !api_response.ok {
110 anyhow::bail!(
111 "Telegram API error: {}",
112 api_response.description.unwrap_or_default()
113 );
114 }
115 Ok(api_response)
116 }
117
118 pub fn parse_command(text: &str) -> TelegramCommand {
120 let text = text.trim();
121
122 let parts: Vec<&str> = text.splitn(2, ' ').collect();
124 let cmd = parts[0].split('@').next().unwrap_or(parts[0]);
125 let args = parts.get(1).unwrap_or(&"").trim();
126
127 match cmd {
128 "/run" => {
129 let mut workflow = String::new();
130 let mut shadow = false;
131 let mut variables = std::collections::HashMap::new();
132
133 for token in args.split_whitespace() {
134 if token == "--shadow" {
135 shadow = true;
136 } else if let Some(var) = token.strip_prefix("--var=") {
137 if let Some((k, v)) = var.split_once('=') {
138 variables.insert(k.to_string(), v.to_string());
139 }
140 } else if workflow.is_empty() {
141 workflow = token.to_string();
142 }
143 }
144
145 if workflow.is_empty() {
146 TelegramCommand::Unknown {
147 text: text.to_string(),
148 }
149 } else {
150 TelegramCommand::Run {
151 workflow,
152 shadow,
153 variables,
154 }
155 }
156 }
157 "/workflows" | "/wf" => TelegramCommand::Workflows,
158 "/audit" => TelegramCommand::Audit {
159 workflow_id: if args.is_empty() {
160 None
161 } else {
162 Some(args.to_string())
163 },
164 },
165 "/status" => TelegramCommand::Status,
166 "/help" | "/start" => TelegramCommand::Help,
167 _ => TelegramCommand::Unknown {
168 text: text.to_string(),
169 },
170 }
171 }
172
173 fn approval_keyboard(execution_id: &str) -> serde_json::Value {
175 serde_json::json!({
176 "inline_keyboard": [[
177 {
178 "text": "Approve",
179 "callback_data": format!("approve:{}", execution_id)
180 },
181 {
182 "text": "Deny",
183 "callback_data": format!("deny:{}", execution_id)
184 }
185 ]]
186 })
187 }
188}
189
190impl ChatPlatform for TelegramBot {
191 async fn send_message(&self, msg: &OutgoingMessage) -> Result<String> {
192 let mut body = serde_json::json!({
193 "chat_id": msg.channel_id,
194 "text": msg.text,
195 "parse_mode": "Markdown",
196 });
197
198 if let Some(ref thread) = msg.thread_id {
199 if let Ok(msg_id) = thread.parse::<i64>() {
200 body["reply_to_message_id"] = serde_json::json!(msg_id);
201 }
202 }
203
204 if let Some(ref blocks) = msg.blocks {
205 body["reply_markup"] = blocks.clone();
207 }
208
209 let response = self.api_call("sendMessage", body).await?;
210
211 let message_id = response
213 .result
214 .as_ref()
215 .and_then(|r| r.get("message_id"))
216 .and_then(|id| id.as_i64())
217 .map(|id| id.to_string())
218 .unwrap_or_default();
219
220 Ok(message_id)
221 }
222
223 async fn send_approval(&self, channel_id: &str, request: &ApprovalRequest) -> Result<String> {
224 let text = format!(
225 "*Approval Required*\n\nStep: `{}`\n{}\n\nCommand: `{}`",
226 request.step_name, request.description, request.action
227 );
228
229 let keyboard = Self::approval_keyboard(&request.execution_id);
230
231 let msg = OutgoingMessage {
232 channel_id: channel_id.to_string(),
233 text,
234 thread_id: None,
235 blocks: Some(keyboard),
236 };
237
238 self.send_message(&msg).await
239 }
240
241 async fn update_message(&self, channel_id: &str, message_id: &str, text: &str) -> Result<()> {
242 let parsed_msg_id = message_id.parse::<i64>().unwrap_or_else(|e| {
243 warn!(
244 message_id = message_id,
245 error = %e,
246 "Failed to parse Telegram message_id as i64, defaulting to 0"
247 );
248 0
249 });
250 self.api_call(
251 "editMessageText",
252 serde_json::json!({
253 "chat_id": channel_id,
254 "message_id": parsed_msg_id,
255 "text": text,
256 "parse_mode": "Markdown",
257 }),
258 )
259 .await?;
260 Ok(())
261 }
262
263 async fn add_reaction(&self, channel_id: &str, message_id: &str, emoji: &str) -> Result<()> {
264 let parsed_msg_id = message_id.parse::<i64>().unwrap_or_else(|e| {
265 warn!(
266 message_id = message_id,
267 error = %e,
268 "Failed to parse Telegram message_id as i64, defaulting to 0"
269 );
270 0
271 });
272 self.api_call(
274 "setMessageReaction",
275 serde_json::json!({
276 "chat_id": channel_id,
277 "message_id": parsed_msg_id,
278 "reaction": [{
279 "type": "emoji",
280 "emoji": emoji
281 }]
282 }),
283 )
284 .await?;
285 Ok(())
286 }
287
288 async fn send_progress(
289 &self,
290 channel_id: &str,
291 thread_id: &str,
292 progress: &ProgressUpdate,
293 ) -> Result<String> {
294 let icon = match progress.status {
295 ProgressStatus::Started => "🚀",
296 ProgressStatus::StepRunning => "⏳",
297 ProgressStatus::StepDone => "✅",
298 ProgressStatus::StepFailed => "❌",
299 ProgressStatus::Completed => "🎉",
300 ProgressStatus::Failed => "💥",
301 };
302
303 let duration = progress
304 .duration_ms
305 .map(|ms| format!(" ({}ms)", ms))
306 .unwrap_or_default();
307
308 let text = format!(
309 "{} [{}/{}] `{}`{}",
310 icon,
311 progress.step_index + 1,
312 progress.total_steps,
313 progress.step_name,
314 duration
315 );
316
317 let msg = OutgoingMessage {
318 channel_id: channel_id.to_string(),
319 text,
320 thread_id: Some(thread_id.to_string()),
321 blocks: None,
322 };
323
324 self.send_message(&msg).await
325 }
326
327 async fn send_notification(
328 &self,
329 channel_id: &str,
330 thread_id: Option<&str>,
331 notification: &WorkflowNotification,
332 ) -> Result<String> {
333 let (emoji, title) = if notification.success {
334 ("✅", "Workflow Completed")
335 } else {
336 ("❌", "Workflow Failed")
337 };
338
339 let error_line = notification
340 .error
341 .as_ref()
342 .map(|e| format!("\nError: `{}`", e))
343 .unwrap_or_default();
344
345 let text = format!(
346 "{} *{}*\nWorkflow: `{}`\nSteps: {}/{}\nDuration: {}ms{}",
347 emoji,
348 title,
349 notification.workflow_id,
350 notification.steps_completed,
351 notification.total_steps,
352 notification.duration_ms,
353 error_line,
354 );
355
356 let msg = OutgoingMessage {
357 channel_id: channel_id.to_string(),
358 text,
359 thread_id: thread_id.map(String::from),
360 blocks: None,
361 };
362
363 self.send_message(&msg).await
364 }
365
366 async fn start_thread(
367 &self,
368 channel_id: &str,
369 execution_id: &str,
370 workflow_id: &str,
371 total_steps: usize,
372 shadow: bool,
373 ) -> Result<String> {
374 let mode = if shadow { "👻 Shadow" } else { "▶️ Live" };
375 let text = format!(
376 "{} Running: `{}` ({} steps)\nExecution: `{}`",
377 mode, workflow_id, total_steps, execution_id
378 );
379
380 let msg = OutgoingMessage {
381 channel_id: channel_id.to_string(),
382 text,
383 thread_id: None,
384 blocks: None,
385 };
386
387 self.send_message(&msg).await
388 }
389}
390
391#[cfg(test)]
392mod tests {
393 use super::*;
394
395 #[test]
396 fn test_telegram_config_fields() {
397 let config = TelegramConfig {
398 bot_token: "123456:ABC-DEF".into(),
399 default_chat_id: Some("12345".into()),
400 };
401 assert_eq!(config.bot_token, "123456:ABC-DEF");
402 }
403
404 #[test]
405 fn test_parse_run_command() {
406 let cmd = TelegramBot::parse_command("/run deploy --shadow");
407 assert_eq!(
408 cmd,
409 TelegramCommand::Run {
410 workflow: "deploy".into(),
411 shadow: true,
412 variables: std::collections::HashMap::new(),
413 }
414 );
415 }
416
417 #[test]
418 fn test_parse_run_with_vars() {
419 let cmd = TelegramBot::parse_command("/run deploy --var=env=prod");
420 match cmd {
421 TelegramCommand::Run {
422 workflow,
423 shadow,
424 variables,
425 } => {
426 assert_eq!(workflow, "deploy");
427 assert!(!shadow);
428 assert_eq!(variables.get("env").unwrap(), "prod");
429 }
430 _ => panic!("Expected Run command"),
431 }
432 }
433
434 #[test]
435 fn test_parse_workflows_command() {
436 assert_eq!(
437 TelegramBot::parse_command("/workflows"),
438 TelegramCommand::Workflows
439 );
440 assert_eq!(
441 TelegramBot::parse_command("/wf"),
442 TelegramCommand::Workflows
443 );
444 }
445
446 #[test]
447 fn test_parse_audit_command() {
448 assert_eq!(
449 TelegramBot::parse_command("/audit"),
450 TelegramCommand::Audit { workflow_id: None }
451 );
452 assert_eq!(
453 TelegramBot::parse_command("/audit my-workflow"),
454 TelegramCommand::Audit {
455 workflow_id: Some("my-workflow".into())
456 }
457 );
458 }
459
460 #[test]
461 fn test_parse_status_command() {
462 assert_eq!(
463 TelegramBot::parse_command("/status"),
464 TelegramCommand::Status
465 );
466 }
467
468 #[test]
469 fn test_parse_help_command() {
470 assert_eq!(TelegramBot::parse_command("/help"), TelegramCommand::Help);
471 assert_eq!(TelegramBot::parse_command("/start"), TelegramCommand::Help);
472 }
473
474 #[test]
475 fn test_parse_unknown_command() {
476 let cmd = TelegramBot::parse_command("/unknown");
477 assert!(matches!(cmd, TelegramCommand::Unknown { .. }));
478 }
479
480 #[test]
481 fn test_parse_command_with_bot_name() {
482 let cmd = TelegramBot::parse_command("/run@mybot deploy");
483 assert_eq!(
484 cmd,
485 TelegramCommand::Run {
486 workflow: "deploy".into(),
487 shadow: false,
488 variables: std::collections::HashMap::new(),
489 }
490 );
491 }
492
493 #[test]
494 fn test_approval_keyboard() {
495 let keyboard = TelegramBot::approval_keyboard("exec-123");
496 let buttons = keyboard["inline_keyboard"][0].as_array().unwrap();
497 assert_eq!(buttons.len(), 2);
498 assert_eq!(buttons[0]["text"], "Approve");
499 assert_eq!(buttons[0]["callback_data"], "approve:exec-123");
500 assert_eq!(buttons[1]["text"], "Deny");
501 assert_eq!(buttons[1]["callback_data"], "deny:exec-123");
502 }
503}