use async_trait::async_trait;
use futures::FutureExt;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{mpsc, Mutex};
use tracing::{error, info, warn};
use crate::bus::{InboundMessage, MessageBus, OutboundMessage};
use crate::config::Config;
use crate::config::TelegramConfig;
use crate::error::{Result, ZeptoError};
use crate::memory::builtin_searcher::BuiltinSearcher;
use crate::memory::longterm::LongTermMemory;
const MAX_STARTUP_RETRIES: u32 = 10;
const BASE_RETRY_DELAY_SECS: u64 = 2;
const MAX_RETRY_DELAY_SECS: u64 = 120;
use super::model_switch::{
format_current_model, format_model_list, hydrate_overrides, new_override_store,
parse_model_command, persist_single, remove_single, ModelCommand, ModelOverrideStore,
};
use super::{BaseChannelConfig, Channel};
#[derive(Clone)]
struct Allowlist(Vec<String>);
#[derive(Clone)]
struct DefaultModel(String);
#[derive(Clone)]
struct ConfiguredProviders(Vec<String>);
fn render_telegram_html(content: &str) -> String {
let mut out = String::with_capacity(content.len() + 16);
let mut chars = content.chars().peekable();
let mut spoiler_open = false;
while let Some(ch) = chars.next() {
if ch == '|' && chars.peek() == Some(&'|') {
let _ = chars.next();
if spoiler_open {
out.push_str("</tg-spoiler>");
} else {
out.push_str("<tg-spoiler>");
}
spoiler_open = !spoiler_open;
continue;
}
match ch {
'&' => out.push_str("&"),
'<' => out.push_str("<"),
'>' => out.push_str(">"),
_ => out.push(ch),
}
}
if spoiler_open {
out.push_str("</tg-spoiler>");
}
out
}
pub struct TelegramChannel {
config: TelegramConfig,
base_config: BaseChannelConfig,
bus: Arc<MessageBus>,
running: Arc<AtomicBool>,
shutdown_tx: Option<mpsc::Sender<()>>,
bot: Option<teloxide::Bot>,
model_overrides: ModelOverrideStore,
default_model: String,
configured_providers: Vec<String>,
longterm_memory: Option<Arc<Mutex<LongTermMemory>>>,
}
impl TelegramChannel {
pub fn new(
config: TelegramConfig,
bus: Arc<MessageBus>,
default_model: String,
configured_providers: Vec<String>,
memory_enabled: bool,
) -> Self {
let base_config = BaseChannelConfig {
name: "telegram".to_string(),
allowlist: config.allow_from.clone(),
deny_by_default: config.deny_by_default,
};
let longterm_memory = if memory_enabled {
let ltm_path = Config::dir().join("memory").join("model_prefs.json");
match LongTermMemory::with_path_and_searcher(ltm_path, Arc::new(BuiltinSearcher)) {
Ok(ltm) => Some(Arc::new(Mutex::new(ltm))),
Err(e) => {
warn!(
"Failed to initialize long-term memory for Telegram model switching: {}",
e
);
None
}
}
} else {
None
};
Self {
config,
base_config,
bus,
running: Arc::new(AtomicBool::new(false)),
shutdown_tx: None,
bot: None,
model_overrides: new_override_store(),
default_model,
configured_providers,
longterm_memory,
}
}
pub fn telegram_config(&self) -> &TelegramConfig {
&self.config
}
pub fn is_enabled(&self) -> bool {
self.config.enabled
}
fn startup_backoff_delay(attempt: u32) -> Duration {
let delay_secs = BASE_RETRY_DELAY_SECS
.saturating_mul(2u64.saturating_pow(attempt))
.min(MAX_RETRY_DELAY_SECS);
Duration::from_secs(delay_secs)
}
fn build_bot(token: &str) -> Result<teloxide::Bot> {
let client = teloxide::net::default_reqwest_settings()
.no_proxy()
.build()
.map_err(|e| {
ZeptoError::Channel(format!("Failed to build Telegram HTTP client: {}", e))
})?;
Ok(teloxide::Bot::with_client(token.to_string(), client))
}
}
#[async_trait]
impl Channel for TelegramChannel {
fn name(&self) -> &str {
"telegram"
}
async fn start(&mut self) -> Result<()> {
if self.running.swap(true, Ordering::SeqCst) {
info!("Telegram channel already running");
return Ok(());
}
if !self.config.enabled {
warn!("Telegram channel is disabled in configuration");
self.running.store(false, Ordering::SeqCst);
return Ok(());
}
if self.config.token.is_empty() {
error!("Telegram bot token is empty");
self.running.store(false, Ordering::SeqCst);
return Err(ZeptoError::Config("Telegram bot token is empty".into()));
}
info!("Starting Telegram channel");
let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);
self.shutdown_tx = Some(shutdown_tx);
let token = self.config.token.clone();
let bus = self.bus.clone();
let allowlist = Allowlist(self.config.allow_from.clone());
let deny_by_default = self.config.deny_by_default;
let model_overrides = self.model_overrides.clone();
let default_model = DefaultModel(self.default_model.clone());
let configured_providers = ConfiguredProviders(self.configured_providers.clone());
let longterm_memory = self.longterm_memory.clone();
let running_clone = Arc::clone(&self.running);
let bot = match Self::build_bot(&token) {
Ok(bot) => bot,
Err(e) => {
self.running.store(false, Ordering::SeqCst);
return Err(e);
}
};
self.bot = Some(bot.clone());
if let Some(ltm) = self.longterm_memory.as_ref() {
hydrate_overrides(&self.model_overrides, ltm).await;
}
tokio::spawn(async move {
use teloxide::prelude::*;
let task_result = std::panic::AssertUnwindSafe(async move {
let mut attempt: u32 = 0;
loop {
match bot.get_me().await {
Ok(_) => break,
Err(e) => {
use teloxide::RequestError;
let is_transient = matches!(
&e,
RequestError::Network(_)
| RequestError::Io(_)
| RequestError::RetryAfter(_)
);
if !is_transient || attempt >= MAX_STARTUP_RETRIES {
error!(
"Telegram startup check failed after {} attempt(s): {}",
attempt + 1,
e
);
return;
}
let delay = if let RequestError::RetryAfter(d) = &e {
*d
} else {
TelegramChannel::startup_backoff_delay(attempt)
};
warn!(
"Telegram startup check failed (attempt {}/{}), retrying in {}s: {}",
attempt + 1,
MAX_STARTUP_RETRIES,
delay.as_secs(),
e
);
tokio::select! {
_ = shutdown_rx.recv() => {
info!("Telegram channel shutdown during startup retry");
return;
}
_ = tokio::time::sleep(delay) => {}
}
attempt += 1;
}
}
}
let handler =
Update::filter_message().endpoint(
|bot: Bot,
msg: Message,
bus: Arc<MessageBus>,
Allowlist(allowlist): Allowlist,
deny_by_default: bool,
model_overrides: ModelOverrideStore,
DefaultModel(default_model): DefaultModel,
ConfiguredProviders(configured_providers): ConfiguredProviders,
longterm_memory: Option<Arc<Mutex<LongTermMemory>>>| async move {
let user = msg.from();
let user_id = user
.map(|u| u.id.0.to_string())
.unwrap_or_else(|| "unknown".to_string());
let username = user
.and_then(|u| u.username.clone())
.unwrap_or_default();
let allowed = if allowlist.is_empty() {
!deny_by_default
} else {
allowlist.contains(&user_id)
|| (!username.is_empty()
&& allowlist.iter().any(|entry| {
let entry_lower = entry.to_lowercase();
let user_lower = username.to_lowercase();
entry_lower == user_lower
|| entry_lower == format!("@{user_lower}")
|| format!("@{entry_lower}") == user_lower
}))
};
if !allowed {
if allowlist.is_empty() {
info!(
"Telegram: User {} blocked — deny_by_default=true and allow_from is empty. \
Add their numeric user ID to channels.telegram.allow_from in config.json",
user_id
);
} else {
info!(
"Telegram: User {} (@{}) not in allow_from list ({} entries configured), ignoring message",
user_id,
if username.is_empty() { "no_username" } else { &username },
allowlist.len()
);
}
return Ok(());
}
if let Some(text) = msg.text() {
let chat_id = msg.chat.id.0.to_string();
let chat_id_num = msg.chat.id.0;
info!(
"Telegram: Received message from user {} in chat {}: {}",
user_id,
chat_id,
crate::utils::string::preview(text, 50)
);
if let Some(cmd) = parse_model_command(text) {
match cmd {
ModelCommand::Show => {
let current = {
let overrides = model_overrides.read().await;
overrides.get(&chat_id).cloned()
};
let reply =
format_current_model(current.as_ref(), &default_model);
let _ = bot
.send_message(
teloxide::types::ChatId(chat_id_num),
reply,
)
.await;
}
ModelCommand::Set(ov) => {
let reply = format!(
"Switched to {}:{}",
ov.provider.as_deref().unwrap_or("auto"),
ov.model
);
{
let mut overrides = model_overrides.write().await;
overrides.insert(chat_id.clone(), ov.clone());
}
if let Some(ref ltm) = longterm_memory {
persist_single(&chat_id, &ov, ltm).await;
}
let _ = bot
.send_message(
teloxide::types::ChatId(chat_id_num),
reply,
)
.await;
}
ModelCommand::Reset => {
{
let mut overrides = model_overrides.write().await;
overrides.remove(&chat_id);
}
if let Some(ref ltm) = longterm_memory {
remove_single(&chat_id, ltm).await;
}
let reply = format!("Reset to default: {}", default_model);
let _ = bot
.send_message(
teloxide::types::ChatId(chat_id_num),
reply,
)
.await;
}
ModelCommand::List => {
let current = {
let overrides = model_overrides.read().await;
overrides.get(&chat_id).cloned()
};
let reply = format_model_list(
&configured_providers,
current.as_ref(),
);
let _ = bot
.send_message(
teloxide::types::ChatId(chat_id_num),
reply,
)
.await;
}
}
return Ok(());
}
let mut inbound =
InboundMessage::new("telegram", &user_id, &chat_id, text);
let override_entry = {
let overrides = model_overrides.read().await;
overrides.get(&chat_id).cloned()
};
if let Some(ov) = override_entry {
inbound = inbound.with_metadata("model_override", &ov.model);
if let Some(provider) = ov.provider {
inbound =
inbound.with_metadata("provider_override", &provider);
}
}
if let Err(e) = bus.publish_inbound(inbound).await {
error!("Failed to publish inbound message to bus: {}", e);
}
}
Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
},
);
let mut dispatcher = Dispatcher::builder(bot, handler)
.dependencies(dptree::deps![
bus,
allowlist,
deny_by_default,
model_overrides,
default_model,
configured_providers,
longterm_memory
])
.build();
info!("Telegram bot dispatcher started, waiting for messages...");
tokio::select! {
_ = dispatcher.dispatch() => {
info!("Telegram dispatcher completed");
}
_ = shutdown_rx.recv() => {
info!("Telegram channel shutdown signal received");
}
}
})
.catch_unwind()
.await;
if task_result.is_err() {
error!("Telegram polling task panicked");
}
running_clone.store(false, Ordering::SeqCst);
info!("Telegram polling task stopped");
});
Ok(())
}
async fn stop(&mut self) -> Result<()> {
if !self.running.swap(false, Ordering::SeqCst) {
info!("Telegram channel already stopped");
return Ok(());
}
info!("Stopping Telegram channel");
if let Some(tx) = self.shutdown_tx.take() {
if tx.send(()).await.is_err() {
warn!("Telegram shutdown channel already closed");
}
}
self.bot = None;
info!("Telegram channel stopped");
Ok(())
}
async fn send(&self, msg: OutboundMessage) -> Result<()> {
use teloxide::prelude::*;
use teloxide::types::{ChatId, ParseMode};
if !self.running.load(Ordering::SeqCst) {
warn!("Telegram channel not running, cannot send message");
return Err(ZeptoError::Channel(
"Telegram channel not running".to_string(),
));
}
let chat_id: i64 = msg.chat_id.parse().map_err(|_| {
ZeptoError::Channel(format!("Invalid Telegram chat ID: {}", msg.chat_id))
})?;
info!("Telegram: Sending message to chat {}", chat_id);
let bot = self
.bot
.as_ref()
.ok_or_else(|| ZeptoError::Channel("Telegram bot not initialized".to_string()))?;
let rendered = render_telegram_html(&msg.content);
bot.send_message(ChatId(chat_id), rendered)
.parse_mode(ParseMode::Html)
.await
.map_err(|e| ZeptoError::Channel(format!("Failed to send Telegram message: {}", e)))?;
info!("Telegram: Message sent successfully to chat {}", chat_id);
Ok(())
}
fn is_running(&self) -> bool {
self.running.load(Ordering::SeqCst)
}
fn is_allowed(&self, user_id: &str) -> bool {
self.base_config.is_allowed(user_id)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_telegram_channel_creation() {
let config = TelegramConfig {
enabled: true,
token: "test-token".to_string(),
allow_from: vec!["user1".to_string()],
..Default::default()
};
let bus = Arc::new(MessageBus::new());
let channel = TelegramChannel::new(config, bus, "default-model".to_string(), vec![], false);
assert_eq!(channel.name(), "telegram");
assert!(!channel.is_running());
assert!(channel.is_allowed("user1"));
assert!(!channel.is_allowed("user2"));
}
#[test]
fn test_telegram_empty_allowlist() {
let config = TelegramConfig {
enabled: true,
token: "test-token".to_string(),
allow_from: vec![],
..Default::default()
};
let bus = Arc::new(MessageBus::new());
let channel = TelegramChannel::new(config, bus, "default-model".to_string(), vec![], false);
assert!(channel.is_allowed("anyone"));
assert!(channel.is_allowed("user1"));
assert!(channel.is_allowed("random_user_123"));
}
#[test]
fn test_telegram_config_access() {
let config = TelegramConfig {
enabled: true,
token: "my-bot-token".to_string(),
allow_from: vec!["admin".to_string()],
..Default::default()
};
let bus = Arc::new(MessageBus::new());
let channel = TelegramChannel::new(config, bus, "default-model".to_string(), vec![], false);
assert!(channel.is_enabled());
assert_eq!(channel.telegram_config().token, "my-bot-token");
assert_eq!(channel.telegram_config().allow_from, vec!["admin"]);
}
#[test]
fn test_telegram_disabled_channel() {
let config = TelegramConfig {
enabled: false,
token: "test-token".to_string(),
allow_from: vec![],
..Default::default()
};
let bus = Arc::new(MessageBus::new());
let channel = TelegramChannel::new(config, bus, "default-model".to_string(), vec![], false);
assert!(!channel.is_enabled());
}
#[test]
fn test_telegram_multiple_allowed_users() {
let config = TelegramConfig {
enabled: true,
token: "test-token".to_string(),
allow_from: vec![
"user1".to_string(),
"user2".to_string(),
"admin".to_string(),
],
..Default::default()
};
let bus = Arc::new(MessageBus::new());
let channel = TelegramChannel::new(config, bus, "default-model".to_string(), vec![], false);
assert!(channel.is_allowed("user1"));
assert!(channel.is_allowed("user2"));
assert!(channel.is_allowed("admin"));
assert!(!channel.is_allowed("user3"));
assert!(!channel.is_allowed("hacker"));
}
#[test]
fn test_render_telegram_html_escapes_html() {
let rendered = render_telegram_html("5 < 7 & 9 > 2");
assert_eq!(rendered, "5 < 7 & 9 > 2");
}
#[test]
fn test_render_telegram_html_spoiler_pairs() {
let rendered = render_telegram_html("Secret: ||classified|| data");
assert_eq!(rendered, "Secret: <tg-spoiler>classified</tg-spoiler> data");
}
#[test]
fn test_render_telegram_html_unmatched_spoiler() {
let rendered = render_telegram_html("Dangling ||spoiler");
assert_eq!(rendered, "Dangling <tg-spoiler>spoiler</tg-spoiler>");
}
#[tokio::test]
async fn test_telegram_start_without_token() {
let config = TelegramConfig {
enabled: true,
token: String::new(), allow_from: vec![],
..Default::default()
};
let bus = Arc::new(MessageBus::new());
let mut channel =
TelegramChannel::new(config, bus, "default-model".to_string(), vec![], false);
let result = channel.start().await;
assert!(result.is_err());
assert!(!channel.is_running());
}
#[tokio::test]
async fn test_telegram_start_disabled() {
let config = TelegramConfig {
enabled: false, token: "test-token".to_string(),
allow_from: vec![],
..Default::default()
};
let bus = Arc::new(MessageBus::new());
let mut channel =
TelegramChannel::new(config, bus, "default-model".to_string(), vec![], false);
let result = channel.start().await;
assert!(result.is_ok());
assert!(!channel.is_running());
}
#[tokio::test]
async fn test_telegram_stop_not_running() {
let config = TelegramConfig {
enabled: true,
token: "test-token".to_string(),
allow_from: vec![],
..Default::default()
};
let bus = Arc::new(MessageBus::new());
let mut channel =
TelegramChannel::new(config, bus, "default-model".to_string(), vec![], false);
let result = channel.stop().await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_telegram_send_not_running() {
let config = TelegramConfig {
enabled: true,
token: "test-token".to_string(),
allow_from: vec![],
..Default::default()
};
let bus = Arc::new(MessageBus::new());
let channel = TelegramChannel::new(config, bus, "default-model".to_string(), vec![], false);
let msg = OutboundMessage::new("telegram", "12345", "Hello");
let result = channel.send(msg).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_telegram_base_config() {
let config = TelegramConfig {
enabled: true,
token: "test-token".to_string(),
allow_from: vec!["allowed_user".to_string()],
..Default::default()
};
let bus = Arc::new(MessageBus::new());
let channel = TelegramChannel::new(config, bus, "default-model".to_string(), vec![], false);
assert_eq!(channel.base_config.name, "telegram");
assert_eq!(channel.base_config.allowlist, vec!["allowed_user"]);
}
#[test]
fn test_startup_backoff_delay_increases() {
let d0 = TelegramChannel::startup_backoff_delay(0);
let d1 = TelegramChannel::startup_backoff_delay(1);
let d2 = TelegramChannel::startup_backoff_delay(2);
assert_eq!(d0, Duration::from_secs(2));
assert_eq!(d1, Duration::from_secs(4));
assert_eq!(d2, Duration::from_secs(8));
assert!(d1 > d0);
assert!(d2 > d1);
}
#[test]
fn test_startup_backoff_delay_caps_at_max() {
let d_high = TelegramChannel::startup_backoff_delay(20);
assert_eq!(d_high, Duration::from_secs(MAX_RETRY_DELAY_SECS));
}
#[test]
fn test_startup_backoff_delay_no_overflow() {
let d = TelegramChannel::startup_backoff_delay(u32::MAX);
assert_eq!(d, Duration::from_secs(MAX_RETRY_DELAY_SECS));
}
}