use async_trait::async_trait;
use futures::FutureExt;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::sync::mpsc;
use tracing::{error, info, warn};
use crate::bus::{InboundMessage, MessageBus, OutboundMessage};
use crate::config::TelegramConfig;
use crate::error::{Result, ZeptoError};
use super::{BaseChannelConfig, Channel};
pub struct TelegramChannel {
config: TelegramConfig,
base_config: BaseChannelConfig,
bus: Arc<MessageBus>,
running: Arc<AtomicBool>,
shutdown_tx: Option<mpsc::Sender<()>>,
bot: Option<teloxide::Bot>,
}
impl TelegramChannel {
pub fn new(config: TelegramConfig, bus: Arc<MessageBus>) -> Self {
let base_config = BaseChannelConfig {
name: "telegram".to_string(),
allowlist: config.allow_from.clone(),
};
Self {
config,
base_config,
bus,
running: Arc::new(AtomicBool::new(false)),
shutdown_tx: None,
bot: None,
}
}
pub fn telegram_config(&self) -> &TelegramConfig {
&self.config
}
pub fn is_enabled(&self) -> bool {
self.config.enabled
}
fn build_bot(token: &str) -> Result<teloxide::Bot> {
let client = teloxide::net::default_reqwest_settings()
.no_proxy()
.build()
.map_err(|e| {
ZeptoError::Channel(format!("Failed to build Telegram HTTP client: {}", e))
})?;
Ok(teloxide::Bot::with_client(token.to_string(), client))
}
}
#[async_trait]
impl Channel for TelegramChannel {
fn name(&self) -> &str {
"telegram"
}
async fn start(&mut self) -> Result<()> {
if self.running.swap(true, Ordering::SeqCst) {
info!("Telegram channel already running");
return Ok(());
}
if !self.config.enabled {
warn!("Telegram channel is disabled in configuration");
self.running.store(false, Ordering::SeqCst);
return Ok(());
}
if self.config.token.is_empty() {
error!("Telegram bot token is empty");
self.running.store(false, Ordering::SeqCst);
return Err(ZeptoError::Config("Telegram bot token is empty".into()));
}
info!("Starting Telegram channel");
let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);
self.shutdown_tx = Some(shutdown_tx);
let token = self.config.token.clone();
let bus = self.bus.clone();
let allowlist = self.config.allow_from.clone();
let running_clone = Arc::clone(&self.running);
let bot = match Self::build_bot(&token) {
Ok(bot) => bot,
Err(e) => {
self.running.store(false, Ordering::SeqCst);
return Err(e);
}
};
self.bot = Some(bot.clone());
tokio::spawn(async move {
use teloxide::prelude::*;
let task_result = std::panic::AssertUnwindSafe(async move {
if let Err(e) = bot.get_me().await {
error!("Telegram startup check failed: {}", e);
return;
}
let handler =
Update::filter_message().endpoint(
|_bot: Bot,
msg: Message,
bus: Arc<MessageBus>,
allowlist: Vec<String>| async move {
let user_id = msg
.from()
.map(|u| u.id.0.to_string())
.unwrap_or_else(|| "unknown".to_string());
if !allowlist.is_empty() && !allowlist.contains(&user_id) {
info!(
"Telegram: User {} not in allowlist, ignoring message",
user_id
);
return Ok(());
}
if let Some(text) = msg.text() {
let chat_id = msg.chat.id.0.to_string();
info!(
"Telegram: Received message from user {} in chat {}: {}",
user_id,
chat_id,
if text.len() > 50 {
format!("{}...", &text[..50])
} else {
text.to_string()
}
);
let inbound =
InboundMessage::new("telegram", &user_id, &chat_id, text);
if let Err(e) = bus.publish_inbound(inbound).await {
error!("Failed to publish inbound message to bus: {}", e);
}
}
Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
},
);
let mut dispatcher = Dispatcher::builder(bot, handler)
.dependencies(dptree::deps![bus, allowlist])
.build();
info!("Telegram bot dispatcher started, waiting for messages...");
tokio::select! {
_ = dispatcher.dispatch() => {
info!("Telegram dispatcher completed");
}
_ = shutdown_rx.recv() => {
info!("Telegram channel shutdown signal received");
}
}
})
.catch_unwind()
.await;
if task_result.is_err() {
error!("Telegram polling task panicked");
}
running_clone.store(false, Ordering::SeqCst);
info!("Telegram polling task stopped");
});
Ok(())
}
async fn stop(&mut self) -> Result<()> {
if !self.running.swap(false, Ordering::SeqCst) {
info!("Telegram channel already stopped");
return Ok(());
}
info!("Stopping Telegram channel");
if let Some(tx) = self.shutdown_tx.take() {
if tx.send(()).await.is_err() {
warn!("Telegram shutdown channel already closed");
}
}
self.bot = None;
info!("Telegram channel stopped");
Ok(())
}
async fn send(&self, msg: OutboundMessage) -> Result<()> {
use teloxide::prelude::*;
use teloxide::types::ChatId;
if !self.running.load(Ordering::SeqCst) {
warn!("Telegram channel not running, cannot send message");
return Err(ZeptoError::Channel(
"Telegram channel not running".to_string(),
));
}
let chat_id: i64 = msg.chat_id.parse().map_err(|_| {
ZeptoError::Channel(format!("Invalid Telegram chat ID: {}", msg.chat_id))
})?;
info!("Telegram: Sending message to chat {}", chat_id);
let bot = self
.bot
.as_ref()
.ok_or_else(|| ZeptoError::Channel("Telegram bot not initialized".to_string()))?;
bot.send_message(ChatId(chat_id), &msg.content)
.await
.map_err(|e| ZeptoError::Channel(format!("Failed to send Telegram message: {}", e)))?;
info!("Telegram: Message sent successfully to chat {}", chat_id);
Ok(())
}
fn is_running(&self) -> bool {
self.running.load(Ordering::SeqCst)
}
fn is_allowed(&self, user_id: &str) -> bool {
self.base_config.is_allowed(user_id)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_telegram_channel_creation() {
let config = TelegramConfig {
enabled: true,
token: "test-token".to_string(),
allow_from: vec!["user1".to_string()],
};
let bus = Arc::new(MessageBus::new());
let channel = TelegramChannel::new(config, bus);
assert_eq!(channel.name(), "telegram");
assert!(!channel.is_running());
assert!(channel.is_allowed("user1"));
assert!(!channel.is_allowed("user2"));
}
#[test]
fn test_telegram_empty_allowlist() {
let config = TelegramConfig {
enabled: true,
token: "test-token".to_string(),
allow_from: vec![],
};
let bus = Arc::new(MessageBus::new());
let channel = TelegramChannel::new(config, bus);
assert!(channel.is_allowed("anyone"));
assert!(channel.is_allowed("user1"));
assert!(channel.is_allowed("random_user_123"));
}
#[test]
fn test_telegram_config_access() {
let config = TelegramConfig {
enabled: true,
token: "my-bot-token".to_string(),
allow_from: vec!["admin".to_string()],
};
let bus = Arc::new(MessageBus::new());
let channel = TelegramChannel::new(config, bus);
assert!(channel.is_enabled());
assert_eq!(channel.telegram_config().token, "my-bot-token");
assert_eq!(channel.telegram_config().allow_from, vec!["admin"]);
}
#[test]
fn test_telegram_disabled_channel() {
let config = TelegramConfig {
enabled: false,
token: "test-token".to_string(),
allow_from: vec![],
};
let bus = Arc::new(MessageBus::new());
let channel = TelegramChannel::new(config, bus);
assert!(!channel.is_enabled());
}
#[test]
fn test_telegram_multiple_allowed_users() {
let config = TelegramConfig {
enabled: true,
token: "test-token".to_string(),
allow_from: vec![
"user1".to_string(),
"user2".to_string(),
"admin".to_string(),
],
};
let bus = Arc::new(MessageBus::new());
let channel = TelegramChannel::new(config, bus);
assert!(channel.is_allowed("user1"));
assert!(channel.is_allowed("user2"));
assert!(channel.is_allowed("admin"));
assert!(!channel.is_allowed("user3"));
assert!(!channel.is_allowed("hacker"));
}
#[tokio::test]
async fn test_telegram_start_without_token() {
let config = TelegramConfig {
enabled: true,
token: String::new(), allow_from: vec![],
};
let bus = Arc::new(MessageBus::new());
let mut channel = TelegramChannel::new(config, bus);
let result = channel.start().await;
assert!(result.is_err());
assert!(!channel.is_running());
}
#[tokio::test]
async fn test_telegram_start_disabled() {
let config = TelegramConfig {
enabled: false, token: "test-token".to_string(),
allow_from: vec![],
};
let bus = Arc::new(MessageBus::new());
let mut channel = TelegramChannel::new(config, bus);
let result = channel.start().await;
assert!(result.is_ok());
assert!(!channel.is_running());
}
#[tokio::test]
async fn test_telegram_stop_not_running() {
let config = TelegramConfig {
enabled: true,
token: "test-token".to_string(),
allow_from: vec![],
};
let bus = Arc::new(MessageBus::new());
let mut channel = TelegramChannel::new(config, bus);
let result = channel.stop().await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_telegram_send_not_running() {
let config = TelegramConfig {
enabled: true,
token: "test-token".to_string(),
allow_from: vec![],
};
let bus = Arc::new(MessageBus::new());
let channel = TelegramChannel::new(config, bus);
let msg = OutboundMessage::new("telegram", "12345", "Hello");
let result = channel.send(msg).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_telegram_base_config() {
let config = TelegramConfig {
enabled: true,
token: "test-token".to_string(),
allow_from: vec!["allowed_user".to_string()],
};
let bus = Arc::new(MessageBus::new());
let channel = TelegramChannel::new(config, bus);
assert_eq!(channel.base_config.name, "telegram");
assert_eq!(channel.base_config.allowlist, vec!["allowed_user"]);
}
}