use async_trait::async_trait;
use dashmap::DashMap;
use futures::FutureExt;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{mpsc, Mutex};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
use once_cell::sync::Lazy;
use regex::Regex;
use crate::bus::{InboundMessage, MediaAttachment, MediaType, MessageBus, OutboundMessage};
use crate::config::Config;
use crate::config::TelegramConfig;
use crate::error::{Result, ZeptoError};
use crate::memory::builtin_searcher::BuiltinSearcher;
use crate::memory::longterm::LongTermMemory;
const BARE_PHOTO_PLACEHOLDER: &str = "Please analyze this image.";
const MAX_STARTUP_RETRIES: u32 = 10;
const BASE_RETRY_DELAY_SECS: u64 = 2;
const MAX_RETRY_DELAY_SECS: u64 = 120;
use super::model_switch::{
format_current_model, format_model_list, hydrate_overrides, new_override_store,
parse_model_command, persist_single, remove_single, ModelCommand, ModelOverrideStore,
};
use super::persona_switch::{self, PersonaCommand, PersonaOverrideStore};
use super::{BaseChannelConfig, Channel};
#[derive(Clone)]
struct Allowlist(Vec<String>);
#[derive(Clone, Copy)]
struct AllowUsernames(bool);
#[derive(Clone, Copy)]
struct ReactionsEnabled(bool);
#[derive(Clone)]
struct DefaultModel(String);
#[derive(Clone)]
struct ConfiguredProviders {
names: Vec<String>,
models: Vec<(String, String)>,
}
type TypingMap = Arc<DashMap<String, CancellationToken>>;
#[derive(Clone)]
struct OverridesDep {
model: ModelOverrideStore,
persona: PersonaOverrideStore,
typing: TypingMap,
}
static RE_FENCED_CODE: Lazy<Regex> = Lazy::new(|| Regex::new(r"(?s)```[^\n]*\n(.*?)```").unwrap());
static RE_INLINE_CODE: Lazy<Regex> = Lazy::new(|| Regex::new(r"`([^`\n]+)`").unwrap());
static RE_BOLD_ITALIC: Lazy<Regex> = Lazy::new(|| Regex::new(r"\*\*\*(.+?)\*\*\*").unwrap());
static RE_BOLD: Lazy<Regex> = Lazy::new(|| Regex::new(r"\*\*(.+?)\*\*").unwrap());
static RE_ITALIC: Lazy<Regex> = Lazy::new(|| Regex::new(r"\*([^\*\n]+?)\*").unwrap());
static RE_ITALIC_UNDERSCORE: Lazy<Regex> = Lazy::new(|| {
Regex::new(r#"(?:^|(?P<pre>\s))_(?P<body>[^_\n]+?)_(?P<suf>[\s.,;:!?]|$)"#).unwrap()
});
static RE_STRIKETHROUGH: Lazy<Regex> = Lazy::new(|| Regex::new(r"~~(.+?)~~").unwrap());
static RE_LINK: Lazy<Regex> = Lazy::new(|| Regex::new(r"\[([^\]]+)\]\(([^)]+)\)").unwrap());
static RE_SPOILER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\|\|(.+?)\|\|").unwrap());
static RE_HEADER: Lazy<Regex> = Lazy::new(|| Regex::new(r"(?m)^#{1,6}\s+(.+)$").unwrap());
static RE_BULLET: Lazy<Regex> = Lazy::new(|| Regex::new(r"(?m)^[ \t]*[-*]\s+").unwrap());
static RE_NUMBERED_LIST: Lazy<Regex> = Lazy::new(|| Regex::new(r"(?m)^[ \t]*\d+\.\s+").unwrap());
static RE_BLOCKQUOTE: Lazy<Regex> = Lazy::new(|| Regex::new(r"(?m)^>\s?(.*)$").unwrap());
static RE_HR: Lazy<Regex> = Lazy::new(|| Regex::new(r"(?m)^-{3,}\s*$").unwrap());
fn escape_html(s: &str) -> String {
s.replace('&', "&")
.replace('<', "<")
.replace('>', ">")
}
fn html_tags_valid(html: &str) -> bool {
static RE_TAG: Lazy<Regex> = Lazy::new(|| Regex::new(r"<(/?)(\w[\w-]*)(?:\s[^>]*)?>").unwrap());
let mut stack: Vec<String> = Vec::new();
for caps in RE_TAG.captures_iter(html) {
let closing = &caps[1] == "/";
let tag = caps[2].to_lowercase();
if closing {
if stack.last().map(|s| s.as_str()) != Some(tag.as_str()) {
return false;
}
stack.pop();
} else {
stack.push(tag);
}
}
stack.is_empty()
}
fn strip_html_tags(html: &str) -> String {
static RE_STRIP: Lazy<Regex> = Lazy::new(|| Regex::new(r"<[^>]+>").unwrap());
let text = RE_STRIP.replace_all(html, "");
text.replace("&", "&")
.replace("<", "<")
.replace(">", ">")
}
fn render_telegram_html(content: &str) -> String {
let mut code_blocks: Vec<String> = Vec::new();
let mut text = RE_FENCED_CODE
.replace_all(content, |caps: ®ex::Captures| {
let idx = code_blocks.len();
let body = caps.get(1).map_or("", |m| m.as_str());
code_blocks.push(body.to_string());
format!("\x00CODEBLOCK{idx}\x00")
})
.into_owned();
let mut inline_codes: Vec<String> = Vec::new();
text = RE_INLINE_CODE
.replace_all(&text, |caps: ®ex::Captures| {
let idx = inline_codes.len();
inline_codes.push(caps[1].to_string());
format!("\x00INLINE{idx}\x00")
})
.into_owned();
text = escape_html(&text);
text = text
.replace("<u>", "<u>")
.replace("</u>", "</u>")
.replace("<ins>", "<u>")
.replace("</ins>", "</u>");
text = RE_HR.replace_all(&text, "").into_owned();
text = RE_HEADER.replace_all(&text, "<b>$1</b>\n").into_owned();
text = RE_BLOCKQUOTE
.replace_all(&text, "<blockquote>$1</blockquote>")
.into_owned();
text = RE_BULLET.replace_all(&text, "• ").into_owned();
text = RE_NUMBERED_LIST
.replace_all(&text, |caps: ®ex::Captures| {
let m = caps.get(0).unwrap().as_str().trim_start();
m.to_string()
})
.into_owned();
text = RE_BOLD_ITALIC
.replace_all(&text, "<b><i>$1</i></b>")
.into_owned();
text = RE_BOLD.replace_all(&text, "<b>$1</b>").into_owned();
text = RE_ITALIC.replace_all(&text, "<i>$1</i>").into_owned();
text = RE_ITALIC_UNDERSCORE
.replace_all(&text, |caps: ®ex::Captures| {
let pre = caps.name("pre").map_or("", |m| m.as_str());
let body = &caps["body"];
let suf = caps.name("suf").map_or("", |m| m.as_str());
format!("{pre}<i>{body}</i>{suf}")
})
.into_owned();
text = RE_STRIKETHROUGH
.replace_all(&text, "<s>$1</s>")
.into_owned();
text = RE_LINK
.replace_all(&text, |caps: ®ex::Captures| {
format!("<a href=\"{}\">{}</a>", &caps[2], &caps[1])
})
.into_owned();
text = RE_SPOILER
.replace_all(&text, "<tg-spoiler>$1</tg-spoiler>")
.into_owned();
for (idx, block) in code_blocks.iter().enumerate() {
let tag = format!("<pre>{}</pre>", escape_html(block.trim_end()));
text = text.replace(&format!("\x00CODEBLOCK{idx}\x00"), &tag);
}
for (idx, code) in inline_codes.iter().enumerate() {
let tag = format!("<code>{}</code>", escape_html(code));
text = text.replace(&format!("\x00INLINE{idx}\x00"), &tag);
}
if !html_tags_valid(&text) {
return strip_html_tags(&text);
}
text
}
fn is_numeric_allowlist_entry(entry: &str) -> bool {
let trimmed = entry.trim();
!trimmed.is_empty() && trimmed.bytes().all(|b| b.is_ascii_digit())
}
fn allowlist_has_username_entries(allowlist: &[String]) -> bool {
allowlist
.iter()
.any(|entry| !is_numeric_allowlist_entry(entry))
}
fn telegram_allowlist_allows(
allowlist: &[String],
user_id: &str,
username: &str,
allow_usernames: bool,
) -> bool {
allowlist.contains(&user_id.to_string())
|| (allow_usernames
&& !username.is_empty()
&& allowlist.iter().any(|entry| {
let entry_lower = entry.trim().to_lowercase();
let user_lower = username.to_lowercase();
entry_lower == user_lower
|| entry_lower == format!("@{user_lower}")
|| format!("@{entry_lower}") == user_lower
}))
}
async fn download_telegram_photo(
bot: &teloxide::Bot,
file_id: teloxide::types::FileId,
http_client: &reqwest::Client,
) -> Option<MediaAttachment> {
use crate::session::media::MAX_IMAGE_SIZE;
use teloxide::prelude::Requester;
let file = match tokio::time::timeout(Duration::from_secs(15), bot.get_file(file_id)).await {
Ok(Ok(f)) => f,
Ok(Err(e)) => {
warn!("Failed to get Telegram file info: {}", e);
return None;
}
Err(_) => {
warn!("Telegram get_file timed out after 15s");
return None;
}
};
if file.path.is_empty() {
warn!("Telegram file path is empty for photo");
return None;
}
let download_url = format!(
"https://api.telegram.org/file/bot{}/{}",
bot.token(),
file.path
);
let resp = match http_client.get(&download_url).send().await {
Ok(r) => r,
Err(e) => {
warn!("Failed to download Telegram photo: {}", e);
return None;
}
};
let mime_type = resp
.headers()
.get("content-type")
.and_then(|h| h.to_str().ok())
.filter(|ct| ct.starts_with("image/"))
.unwrap_or("image/jpeg")
.to_string();
let bytes = match resp.bytes().await {
Ok(b) => b,
Err(e) => {
warn!("Failed to read Telegram photo bytes: {}", e);
return None;
}
};
if bytes.len() > MAX_IMAGE_SIZE {
warn!("Telegram photo too large: {} bytes", bytes.len());
return None;
}
Some(
MediaAttachment::new(MediaType::Image)
.with_data(bytes.to_vec())
.with_mime_type(&mime_type),
)
}
pub struct TelegramChannel {
config: TelegramConfig,
base_config: BaseChannelConfig,
bus: Arc<MessageBus>,
running: Arc<AtomicBool>,
shutdown_tx: Option<mpsc::Sender<()>>,
bot: Option<teloxide::Bot>,
model_overrides: ModelOverrideStore,
persona_overrides: PersonaOverrideStore,
default_model: String,
configured_providers: Vec<String>,
configured_models: Vec<(String, String)>,
longterm_memory: Option<Arc<Mutex<LongTermMemory>>>,
typing_indicators: TypingMap,
http_client: reqwest::Client,
}
impl TelegramChannel {
pub fn new(
config: TelegramConfig,
bus: Arc<MessageBus>,
default_model: String,
configured_providers: Vec<String>,
configured_models: Vec<(String, String)>,
memory_enabled: bool,
) -> Self {
if allowlist_has_username_entries(&config.allow_from) {
if config.allow_usernames {
warn!(
"Telegram allow_from contains username entries. Username matching is a legacy compatibility mode and can drift if usernames are reassigned; migrate to numeric user IDs and set channels.telegram.allow_usernames=false when ready."
);
} else {
warn!(
"Telegram allow_from contains non-numeric entries, but channels.telegram.allow_usernames=false so only numeric user IDs will match."
);
}
}
let base_config = BaseChannelConfig {
name: "telegram".to_string(),
allowlist: config.allow_from.clone(),
deny_by_default: config.deny_by_default,
};
let longterm_memory = if memory_enabled {
let ltm_path = Config::dir().join("memory").join("model_prefs.json");
match LongTermMemory::with_path_and_searcher(ltm_path, Arc::new(BuiltinSearcher)) {
Ok(ltm) => Some(Arc::new(Mutex::new(ltm))),
Err(e) => {
warn!(
"Failed to initialize long-term memory for Telegram model switching: {}",
e
);
None
}
}
} else {
None
};
Self {
config,
base_config,
bus,
running: Arc::new(AtomicBool::new(false)),
shutdown_tx: None,
bot: None,
model_overrides: new_override_store(),
persona_overrides: persona_switch::new_persona_store(),
default_model,
configured_providers,
configured_models,
longterm_memory,
typing_indicators: Arc::new(DashMap::new()),
http_client: reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()
.expect("Failed to build HTTP client"),
}
}
pub fn telegram_config(&self) -> &TelegramConfig {
&self.config
}
pub fn is_enabled(&self) -> bool {
self.config.enabled
}
fn startup_backoff_delay(attempt: u32) -> Duration {
let delay_secs = BASE_RETRY_DELAY_SECS
.saturating_mul(2u64.saturating_pow(attempt))
.min(MAX_RETRY_DELAY_SECS);
Duration::from_secs(delay_secs)
}
fn build_bot(token: &str) -> Result<teloxide::Bot> {
let client = teloxide::net::default_reqwest_settings()
.no_proxy()
.build()
.map_err(|e| {
ZeptoError::Channel(format!("Failed to build Telegram HTTP client: {}", e))
})?;
Ok(teloxide::Bot::with_client(token.to_string(), client))
}
}
#[async_trait]
impl Channel for TelegramChannel {
fn name(&self) -> &str {
"telegram"
}
async fn start(&mut self) -> Result<()> {
if self.running.swap(true, Ordering::SeqCst) {
info!("Telegram channel already running");
return Ok(());
}
if !self.config.enabled {
warn!("Telegram channel is disabled in configuration");
self.running.store(false, Ordering::SeqCst);
return Ok(());
}
if self.config.token.is_empty() {
error!("Telegram bot token is empty");
self.running.store(false, Ordering::SeqCst);
return Err(ZeptoError::Config("Telegram bot token is empty".into()));
}
info!("Starting Telegram channel");
let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);
self.shutdown_tx = Some(shutdown_tx);
let token = self.config.token.clone();
let bus = self.bus.clone();
let allowlist = Allowlist(self.config.allow_from.clone());
let allow_usernames = AllowUsernames(self.config.allow_usernames);
let deny_by_default = self.config.deny_by_default;
let overrides_dep = OverridesDep {
model: self.model_overrides.clone(),
persona: self.persona_overrides.clone(),
typing: self.typing_indicators.clone(),
};
let default_model = DefaultModel(self.default_model.clone());
let configured_providers = ConfiguredProviders {
names: self.configured_providers.clone(),
models: self.configured_models.clone(),
};
let longterm_memory = self.longterm_memory.clone();
let reactions_enabled = ReactionsEnabled(self.config.reactions);
let http_client = self.http_client.clone();
let running_clone = Arc::clone(&self.running);
let bot = match Self::build_bot(&token) {
Ok(bot) => bot,
Err(e) => {
self.running.store(false, Ordering::SeqCst);
return Err(e);
}
};
self.bot = Some(bot.clone());
if let Some(ltm) = self.longterm_memory.as_ref() {
hydrate_overrides(&self.model_overrides, ltm).await;
}
if let Some(ltm) = self.longterm_memory.as_ref() {
persona_switch::hydrate_overrides(&self.persona_overrides, ltm).await;
}
tokio::spawn(async move {
use teloxide::prelude::*;
let task_result = std::panic::AssertUnwindSafe(async move {
let mut attempt: u32 = 0;
loop {
match bot.get_me().await {
Ok(_) => break,
Err(e) => {
use teloxide::RequestError;
let is_transient = matches!(
&e,
RequestError::Network(_)
| RequestError::Io(_)
| RequestError::RetryAfter(_)
);
if !is_transient || attempt >= MAX_STARTUP_RETRIES {
error!(
"Telegram startup check failed after {} attempt(s): {}",
attempt + 1,
e
);
return;
}
let delay = if let RequestError::RetryAfter(d) = &e {
d.duration()
} else {
TelegramChannel::startup_backoff_delay(attempt)
};
warn!(
"Telegram startup check failed (attempt {}/{}), retrying in {}s: {}",
attempt + 1,
MAX_STARTUP_RETRIES,
delay.as_secs(),
e
);
tokio::select! {
_ = shutdown_rx.recv() => {
info!("Telegram channel shutdown during startup retry");
return;
}
_ = tokio::time::sleep(delay) => {}
}
attempt += 1;
}
}
}
let handler =
Update::filter_message().endpoint(
|bot: Bot,
msg: Message,
bus: Arc<MessageBus>,
Allowlist(allowlist): Allowlist,
AllowUsernames(allow_usernames): AllowUsernames,
deny_by_default: bool,
ReactionsEnabled(reactions_enabled): ReactionsEnabled,
overrides_dep: OverridesDep,
DefaultModel(default_model): DefaultModel,
configured_providers_dep: ConfiguredProviders,
longterm_memory: Option<Arc<Mutex<LongTermMemory>>>,
http_client: reqwest::Client| async move {
let model_overrides = overrides_dep.model;
let persona_overrides = overrides_dep.persona;
let typing_indicators = overrides_dep.typing;
let configured_providers = configured_providers_dep.names;
let configured_models = configured_providers_dep.models;
let user = msg.from.as_ref();
let user_id = user
.map(|u| u.id.0.to_string())
.unwrap_or_else(|| "unknown".to_string());
let username = user
.and_then(|u| u.username.clone())
.unwrap_or_default();
let allowed = if allowlist.is_empty() {
!deny_by_default
} else {
telegram_allowlist_allows(
&allowlist,
&user_id,
&username,
allow_usernames,
)
};
if !allowed {
if allowlist.is_empty() {
info!(
"Telegram: User {} blocked — deny_by_default=true and allow_from is empty. \
Add their numeric user ID to channels.telegram.allow_from in config.json",
user_id
);
} else {
info!(
"Telegram: User {} (@{}) not in allow_from list ({} entries configured), ignoring message",
user_id,
if username.is_empty() { "no_username" } else { &username },
allowlist.len()
);
}
return Ok(());
}
{
use teloxide::types::ChatAction;
let typing_key = match msg.thread_id {
Some(tid) => {
format!("{}:{}:{}", msg.chat.id.0, tid.0 .0, msg.id.0)
}
None => format!("{}:{}", msg.chat.id.0, msg.id.0),
};
let cancel_token = CancellationToken::new();
typing_indicators
.insert(typing_key.clone(), cancel_token.clone());
let typing_bot = bot.clone();
let typing_chat_id = msg.chat.id;
let typing_thread_id = msg.thread_id;
let typing_map = typing_indicators.clone();
let typing_map_key = typing_key;
tokio::spawn(async move {
loop {
let mut action = typing_bot.send_chat_action(
typing_chat_id,
ChatAction::Typing,
);
if let Some(tid) = typing_thread_id {
action = action.message_thread_id(tid);
}
if let Err(e) = action.await {
debug!(
"Typing indicator send failed for {}: {}",
typing_map_key, e
);
}
tokio::select! {
_ = cancel_token.cancelled() => break,
_ = tokio::time::sleep(Duration::from_secs(4)) => {}
}
}
typing_map.remove(&typing_map_key);
});
}
let has_photo = msg.photo().is_some();
let has_image_doc = msg.document()
.and_then(|d| d.mime_type.as_ref())
.map(|m| m.as_ref().starts_with("image/"))
.unwrap_or(false);
let has_image = has_photo || has_image_doc;
if let Some(text) = msg.text()
.or_else(|| msg.caption())
.or(if has_image { Some(BARE_PHOTO_PLACEHOLDER) } else { None })
{
let chat_id = msg.chat.id.0.to_string();
let chat_id_num = msg.chat.id.0;
let thread_id: Option<String> =
msg.thread_id.map(|t| t.0 .0.to_string());
let override_key = if let Some(ref tid) = thread_id {
format!("{}:{}", chat_id, tid)
} else {
chat_id.clone()
};
info!(
"Telegram: Received message from user {} in chat {}: {}",
user_id,
chat_id,
crate::utils::string::preview(text, 50)
);
fn apply_thread_id(
req: teloxide::requests::JsonRequest<
teloxide::payloads::SendMessage,
>,
thread_id: &Option<String>,
) -> teloxide::requests::JsonRequest<
teloxide::payloads::SendMessage,
> {
if let Some(ref tid) = thread_id {
if let Ok(id) = tid.parse::<i32>() {
return req.message_thread_id(
teloxide::types::ThreadId(
teloxide::types::MessageId(id),
),
);
}
}
req
}
if let Some(cmd) = parse_model_command(text) {
match cmd {
ModelCommand::Show => {
let current = {
let overrides = model_overrides.read().await;
overrides.get(&override_key).cloned()
};
let reply =
format_current_model(current.as_ref(), &default_model);
let req = bot
.send_message(
teloxide::types::ChatId(chat_id_num),
reply,
);
let _ = apply_thread_id(req, &thread_id).await;
}
ModelCommand::Set(ov) => {
let reply = format!(
"Switched to {}:{}",
ov.provider.as_deref().unwrap_or("auto"),
ov.model
);
{
let mut overrides = model_overrides.write().await;
overrides.insert(override_key.clone(), ov.clone());
}
if let Some(ref ltm) = longterm_memory {
persist_single(&override_key, &ov, ltm).await;
}
let req = bot
.send_message(
teloxide::types::ChatId(chat_id_num),
reply,
);
let _ = apply_thread_id(req, &thread_id).await;
}
ModelCommand::Reset => {
{
let mut overrides = model_overrides.write().await;
overrides.remove(&override_key);
}
if let Some(ref ltm) = longterm_memory {
remove_single(&override_key, ltm).await;
}
let reply = format!("Reset to default: {}", default_model);
let req = bot
.send_message(
teloxide::types::ChatId(chat_id_num),
reply,
);
let _ = apply_thread_id(req, &thread_id).await;
}
ModelCommand::List => {
let current = {
let overrides = model_overrides.read().await;
overrides.get(&override_key).cloned()
};
let reply = format_model_list(
&configured_providers,
current.as_ref(),
&configured_models,
);
let req = bot
.send_message(
teloxide::types::ChatId(chat_id_num),
reply,
);
let _ = apply_thread_id(req, &thread_id).await;
}
ModelCommand::Fetch => {
let req = bot.send_message(
teloxide::types::ChatId(chat_id_num),
"Use /model list to see available models.\n/model fetch is only available in CLI mode.",
);
let _ = apply_thread_id(req, &thread_id).await;
}
}
return Ok(());
}
if let Some(cmd) = persona_switch::parse_persona_command(text) {
match cmd {
PersonaCommand::Show => {
let current = {
let overrides = persona_overrides.read().await;
overrides.get(&override_key).cloned()
};
let reply = persona_switch::format_current_persona(
current.as_deref(),
);
let req = bot
.send_message(
teloxide::types::ChatId(chat_id_num),
reply,
);
let _ = apply_thread_id(req, &thread_id).await;
}
PersonaCommand::Set(value) => {
let resolved =
persona_switch::resolve_soul_content(&value);
let reply = if resolved.is_empty() {
"Switched to default persona".to_string()
} else {
format!("Switched to persona: {}", value)
};
{
let mut overrides =
persona_overrides.write().await;
overrides
.insert(override_key.clone(), value.clone());
}
if let Some(ref ltm) = longterm_memory {
persona_switch::persist_single(
&override_key, &value, ltm,
)
.await;
}
let req = bot
.send_message(
teloxide::types::ChatId(chat_id_num),
reply,
);
let _ = apply_thread_id(req, &thread_id).await;
}
PersonaCommand::Reset => {
{
let mut overrides =
persona_overrides.write().await;
overrides.remove(&override_key);
}
if let Some(ref ltm) = longterm_memory {
persona_switch::remove_single(&override_key, ltm)
.await;
}
let reply =
"Persona reset to default".to_string();
let req = bot
.send_message(
teloxide::types::ChatId(chat_id_num),
reply,
);
let _ = apply_thread_id(req, &thread_id).await;
}
PersonaCommand::List => {
let current = {
let overrides = persona_overrides.read().await;
overrides.get(&override_key).cloned()
};
let reply = persona_switch::format_persona_list(
current.as_deref(),
);
let req = bot
.send_message(
teloxide::types::ChatId(chat_id_num),
reply,
);
let _ = apply_thread_id(req, &thread_id).await;
}
}
return Ok(());
}
if reactions_enabled {
use teloxide::types::ReactionType;
if let Err(e) = bot
.set_message_reaction(msg.chat.id, msg.id)
.reaction(vec![ReactionType::Emoji {
emoji: "\u{1F440}".to_string(),
}])
.await
{
debug!("Failed to set 👀 reaction: {}", e);
}
}
let mut inbound =
InboundMessage::new("telegram", &user_id, &chat_id, text);
inbound = inbound.with_metadata(
"telegram_message_id",
&msg.id.0.to_string(),
);
if let Some(ref tid) = thread_id {
inbound.session_key =
format!("telegram:{}:{}", chat_id, tid);
inbound =
inbound.with_metadata("telegram_thread_id", tid);
}
let override_entry = {
let overrides = model_overrides.read().await;
overrides.get(&override_key).cloned()
};
if let Some(ov) = override_entry {
inbound = inbound.with_metadata("model_override", &ov.model);
if let Some(provider) = ov.provider {
inbound =
inbound.with_metadata("provider_override", &provider);
}
}
let persona_entry = {
let overrides = persona_overrides.read().await;
overrides.get(&override_key).cloned()
};
if let Some(persona_value) = persona_entry {
inbound = inbound
.with_metadata("persona_override", &persona_value);
}
let mut image_ok = !has_image;
if let Some(photos) = msg.photo() {
if let Some(largest) = photos.last() {
if let Some(media) = download_telegram_photo(&bot, largest.file.id.clone(), &http_client).await {
inbound = inbound.with_media(media);
image_ok = true;
}
}
}
if !image_ok && has_image_doc {
if let Some(doc) = msg.document() {
let mime_str = doc.mime_type.as_ref()
.map(|m| m.to_string())
.unwrap_or_else(|| "image/jpeg".to_string());
if let Some(media) = download_telegram_photo(&bot, doc.file.id.clone(), &http_client).await {
inbound = inbound.with_media(
media.with_mime_type(&mime_str),
);
image_ok = true;
}
}
}
if !image_ok && has_image {
let req = bot.send_message(
teloxide::types::ChatId(chat_id_num),
"⚠️ Failed to download your image. Please try again.",
);
let _ = apply_thread_id(req, &thread_id).await;
}
if !image_ok && text == BARE_PHOTO_PLACEHOLDER {
} else if let Err(e) = bus.publish_inbound(inbound).await {
error!("Failed to publish inbound message to bus: {}", e);
}
}
Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
},
);
let mut dispatcher = Dispatcher::builder(bot, handler)
.dependencies(dptree::deps![
bus,
allowlist,
allow_usernames,
deny_by_default,
reactions_enabled,
overrides_dep,
default_model,
configured_providers,
longterm_memory,
http_client
])
.build();
info!("Telegram bot dispatcher started, waiting for messages...");
tokio::select! {
_ = dispatcher.dispatch() => {
info!("Telegram dispatcher completed");
}
_ = shutdown_rx.recv() => {
info!("Telegram channel shutdown signal received");
}
}
})
.catch_unwind()
.await;
if task_result.is_err() {
error!("Telegram polling task panicked");
}
running_clone.store(false, Ordering::SeqCst);
info!("Telegram polling task stopped");
});
Ok(())
}
async fn stop(&mut self) -> Result<()> {
if !self.running.swap(false, Ordering::SeqCst) {
info!("Telegram channel already stopped");
return Ok(());
}
info!("Stopping Telegram channel");
if let Some(tx) = self.shutdown_tx.take() {
if tx.send(()).await.is_err() {
warn!("Telegram shutdown channel already closed");
}
}
self.bot = None;
for entry in self.typing_indicators.iter() {
entry.value().cancel();
}
self.typing_indicators.clear();
info!("Telegram channel stopped");
Ok(())
}
async fn send(&self, msg: OutboundMessage) -> Result<()> {
use teloxide::prelude::*;
use teloxide::types::{ChatId, MessageId, ParseMode, ReactionType, ReplyParameters};
if !self.running.load(Ordering::SeqCst) {
warn!("Telegram channel not running, cannot send message");
return Err(ZeptoError::Channel(
"Telegram channel not running".to_string(),
));
}
let chat_id: i64 = msg.chat_id.parse().map_err(|_| {
ZeptoError::Channel(format!("Invalid Telegram chat ID: {}", msg.chat_id))
})?;
if let Some(msg_id) = msg.metadata.get("telegram_message_id") {
let typing_key = match msg.metadata.get("telegram_thread_id") {
Some(tid) => format!("{}:{}:{}", chat_id, tid, msg_id),
None => format!("{}:{}", chat_id, msg_id),
};
if let Some((_, token)) = self.typing_indicators.remove(&typing_key) {
token.cancel();
}
}
info!("Telegram: Sending message to chat {}", chat_id);
let bot = self
.bot
.as_ref()
.ok_or_else(|| ZeptoError::Channel("Telegram bot not initialized".to_string()))?;
let rendered = render_telegram_html(&msg.content);
let mut req = bot
.send_message(ChatId(chat_id), rendered)
.parse_mode(ParseMode::Html);
if let Some(thread_id_str) = msg.metadata.get("telegram_thread_id") {
if let Ok(tid) = thread_id_str.parse::<i32>() {
req = req
.message_thread_id(teloxide::types::ThreadId(teloxide::types::MessageId(tid)));
}
}
{
let reply_id = msg
.reply_to
.as_deref()
.or(msg.metadata.get("telegram_message_id").map(|s| s.as_str()));
if let Some(id_str) = reply_id {
if let Ok(id) = id_str.parse::<i32>() {
req = req.reply_parameters(
ReplyParameters::new(MessageId(id)).allow_sending_without_reply(),
);
}
}
}
req.await
.map_err(|e| ZeptoError::Channel(format!("Failed to send Telegram message: {}", e)))?;
if self.config.reactions {
if let Some(mid_str) = msg.metadata.get("telegram_message_id") {
if let Ok(mid) = mid_str.parse::<i32>() {
if let Err(e) = bot
.set_message_reaction(ChatId(chat_id), MessageId(mid))
.reaction(vec![ReactionType::Emoji {
emoji: "\u{2705}".to_string(),
}])
.await
{
debug!("Failed to set ✅ reaction: {}", e);
}
}
}
}
info!("Telegram: Message sent successfully to chat {}", chat_id);
Ok(())
}
fn is_running(&self) -> bool {
self.running.load(Ordering::SeqCst)
}
fn is_allowed(&self, user_id: &str) -> bool {
self.base_config.is_allowed(user_id)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_telegram_channel_creation() {
let config = TelegramConfig {
enabled: true,
token: "test-token".to_string(),
allow_from: vec!["user1".to_string()],
..Default::default()
};
let bus = Arc::new(MessageBus::new());
let channel = TelegramChannel::new(
config,
bus,
"default-model".to_string(),
vec![],
vec![],
false,
);
assert_eq!(channel.name(), "telegram");
assert!(!channel.is_running());
assert!(channel.is_allowed("user1"));
assert!(!channel.is_allowed("user2"));
}
#[test]
fn test_telegram_empty_allowlist() {
let config = TelegramConfig {
enabled: true,
token: "test-token".to_string(),
allow_from: vec![],
..Default::default()
};
let bus = Arc::new(MessageBus::new());
let channel = TelegramChannel::new(
config,
bus,
"default-model".to_string(),
vec![],
vec![],
false,
);
assert!(channel.is_allowed("anyone"));
assert!(channel.is_allowed("user1"));
assert!(channel.is_allowed("random_user_123"));
}
#[test]
fn test_telegram_config_access() {
let config = TelegramConfig {
enabled: true,
token: "my-bot-token".to_string(),
allow_from: vec!["admin".to_string()],
..Default::default()
};
let bus = Arc::new(MessageBus::new());
let channel = TelegramChannel::new(
config,
bus,
"default-model".to_string(),
vec![],
vec![],
false,
);
assert!(channel.is_enabled());
assert_eq!(channel.telegram_config().token, "my-bot-token");
assert_eq!(channel.telegram_config().allow_from, vec!["admin"]);
}
#[test]
fn test_telegram_disabled_channel() {
let config = TelegramConfig {
enabled: false,
token: "test-token".to_string(),
allow_from: vec![],
..Default::default()
};
let bus = Arc::new(MessageBus::new());
let channel = TelegramChannel::new(
config,
bus,
"default-model".to_string(),
vec![],
vec![],
false,
);
assert!(!channel.is_enabled());
}
#[test]
fn test_telegram_multiple_allowed_users() {
let config = TelegramConfig {
enabled: true,
token: "test-token".to_string(),
allow_from: vec![
"user1".to_string(),
"user2".to_string(),
"admin".to_string(),
],
..Default::default()
};
let bus = Arc::new(MessageBus::new());
let channel = TelegramChannel::new(
config,
bus,
"default-model".to_string(),
vec![],
vec![],
false,
);
assert!(channel.is_allowed("user1"));
assert!(channel.is_allowed("user2"));
assert!(channel.is_allowed("admin"));
assert!(!channel.is_allowed("user3"));
assert!(!channel.is_allowed("hacker"));
}
#[test]
fn test_telegram_allowlist_allows_numeric_user_id_without_usernames() {
let allowlist = vec!["123456".to_string()];
assert!(telegram_allowlist_allows(
&allowlist, "123456", "alice", false
));
assert!(!telegram_allowlist_allows(
&allowlist, "999999", "alice", false
));
}
#[test]
fn test_telegram_allowlist_rejects_username_when_disabled() {
let allowlist = vec!["alice".to_string(), "@bob".to_string()];
assert!(!telegram_allowlist_allows(
&allowlist, "123456", "alice", false
));
assert!(!telegram_allowlist_allows(
&allowlist, "123456", "bob", false
));
}
#[test]
fn test_telegram_allowlist_allows_legacy_username_when_enabled() {
let allowlist = vec!["alice".to_string(), "@bob".to_string()];
assert!(telegram_allowlist_allows(
&allowlist, "123456", "alice", true
));
assert!(telegram_allowlist_allows(&allowlist, "123456", "bob", true));
}
#[test]
fn test_render_telegram_html_escapes_html() {
let rendered = render_telegram_html("5 < 7 & 9 > 2");
assert_eq!(rendered, "5 < 7 & 9 > 2");
}
#[test]
fn test_render_telegram_html_spoiler_pairs() {
let rendered = render_telegram_html("Secret: ||classified|| data");
assert_eq!(rendered, "Secret: <tg-spoiler>classified</tg-spoiler> data");
}
#[test]
fn test_render_telegram_html_unmatched_spoiler() {
let rendered = render_telegram_html("Dangling ||spoiler");
assert_eq!(rendered, "Dangling ||spoiler");
}
#[test]
fn test_render_bold() {
assert_eq!(
render_telegram_html("Hello **world**"),
"Hello <b>world</b>"
);
}
#[test]
fn test_render_italic() {
assert_eq!(render_telegram_html("Hello *world*"), "Hello <i>world</i>");
}
#[test]
fn test_render_italic_underscore() {
assert_eq!(
render_telegram_html("something _impossible_."),
"something <i>impossible</i>."
);
}
#[test]
fn test_render_italic_underscore_ignores_snake_case() {
assert_eq!(
render_telegram_html("use my_var_name here"),
"use my_var_name here"
);
}
#[test]
fn test_render_underline_passthrough() {
assert_eq!(
render_telegram_html("this is <u>underlined</u> text"),
"this is <u>underlined</u> text"
);
}
#[test]
fn test_render_bold_and_italic() {
assert_eq!(
render_telegram_html("**bold** and *italic*"),
"<b>bold</b> and <i>italic</i>"
);
}
#[test]
fn test_render_inline_code() {
assert_eq!(
render_telegram_html("Use `foo()` here"),
"Use <code>foo()</code> here"
);
}
#[test]
fn test_render_inline_code_preserves_html() {
assert_eq!(
render_telegram_html("Try `x < 5 && y > 2`"),
"Try <code>x < 5 && y > 2</code>"
);
}
#[test]
fn test_render_fenced_code_block() {
let input = "Before\n```rust\nfn main() {\n println!(\"<hello>\");\n}\n```\nAfter";
let rendered = render_telegram_html(input);
assert!(rendered.contains("<pre>"));
assert!(rendered.contains("<hello>"));
assert!(rendered.contains("</pre>"));
assert!(rendered.starts_with("Before\n"));
assert!(rendered.ends_with("\nAfter"));
}
#[test]
fn test_code_block_no_markdown_conversion() {
let input = "```\n**not bold** *not italic*\n```";
let rendered = render_telegram_html(input);
assert!(rendered.contains("**not bold** *not italic*"));
assert!(!rendered.contains("<b>"));
assert!(!rendered.contains("<i>"));
}
#[test]
fn test_render_link() {
assert_eq!(
render_telegram_html("See [docs](https://example.com)"),
"See <a href=\"https://example.com\">docs</a>"
);
}
#[test]
fn test_render_header() {
assert_eq!(render_telegram_html("## Summary"), "<b>Summary</b>\n");
assert_eq!(render_telegram_html("### Details"), "<b>Details</b>\n");
}
#[test]
fn test_render_bullets() {
assert_eq!(
render_telegram_html("- item one\n- item two"),
"• item one\n• item two"
);
}
#[test]
fn test_render_star_bullets() {
assert_eq!(
render_telegram_html("* item one\n* item two"),
"• item one\n• item two"
);
}
#[test]
fn test_render_horizontal_rule() {
assert_eq!(render_telegram_html("above\n---\nbelow"), "above\n\nbelow");
}
#[test]
fn test_render_spoiler_with_formatting() {
assert_eq!(
render_telegram_html("||**secret**||"),
"<tg-spoiler><b>secret</b></tg-spoiler>"
);
}
#[test]
fn test_render_empty_input() {
assert_eq!(render_telegram_html(""), "");
}
#[test]
fn test_render_plain_text_unchanged() {
assert_eq!(render_telegram_html("Hello world"), "Hello world");
}
#[test]
fn test_render_unclosed_bold() {
let rendered = render_telegram_html("Hello **world");
assert_eq!(rendered, "Hello **world");
}
#[tokio::test]
async fn test_telegram_start_without_token() {
let config = TelegramConfig {
enabled: true,
token: String::new(), allow_from: vec![],
..Default::default()
};
let bus = Arc::new(MessageBus::new());
let mut channel = TelegramChannel::new(
config,
bus,
"default-model".to_string(),
vec![],
vec![],
false,
);
let result = channel.start().await;
assert!(result.is_err());
assert!(!channel.is_running());
}
#[tokio::test]
async fn test_telegram_start_disabled() {
let config = TelegramConfig {
enabled: false, token: "test-token".to_string(),
allow_from: vec![],
..Default::default()
};
let bus = Arc::new(MessageBus::new());
let mut channel = TelegramChannel::new(
config,
bus,
"default-model".to_string(),
vec![],
vec![],
false,
);
let result = channel.start().await;
assert!(result.is_ok());
assert!(!channel.is_running());
}
#[tokio::test]
async fn test_telegram_stop_not_running() {
let config = TelegramConfig {
enabled: true,
token: "test-token".to_string(),
allow_from: vec![],
..Default::default()
};
let bus = Arc::new(MessageBus::new());
let mut channel = TelegramChannel::new(
config,
bus,
"default-model".to_string(),
vec![],
vec![],
false,
);
let result = channel.stop().await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_telegram_send_not_running() {
let config = TelegramConfig {
enabled: true,
token: "test-token".to_string(),
allow_from: vec![],
..Default::default()
};
let bus = Arc::new(MessageBus::new());
let channel = TelegramChannel::new(
config,
bus,
"default-model".to_string(),
vec![],
vec![],
false,
);
let msg = OutboundMessage::new("telegram", "12345", "Hello");
let result = channel.send(msg).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_telegram_base_config() {
let config = TelegramConfig {
enabled: true,
token: "test-token".to_string(),
allow_from: vec!["allowed_user".to_string()],
..Default::default()
};
let bus = Arc::new(MessageBus::new());
let channel = TelegramChannel::new(
config,
bus,
"default-model".to_string(),
vec![],
vec![],
false,
);
assert_eq!(channel.base_config.name, "telegram");
assert_eq!(channel.base_config.allowlist, vec!["allowed_user"]);
}
#[test]
fn test_startup_backoff_delay_increases() {
let d0 = TelegramChannel::startup_backoff_delay(0);
let d1 = TelegramChannel::startup_backoff_delay(1);
let d2 = TelegramChannel::startup_backoff_delay(2);
assert_eq!(d0, Duration::from_secs(2));
assert_eq!(d1, Duration::from_secs(4));
assert_eq!(d2, Duration::from_secs(8));
assert!(d1 > d0);
assert!(d2 > d1);
}
#[test]
fn test_startup_backoff_delay_caps_at_max() {
let d_high = TelegramChannel::startup_backoff_delay(20);
assert_eq!(d_high, Duration::from_secs(MAX_RETRY_DELAY_SECS));
}
#[test]
fn test_startup_backoff_delay_no_overflow() {
let d = TelegramChannel::startup_backoff_delay(u32::MAX);
assert_eq!(d, Duration::from_secs(MAX_RETRY_DELAY_SECS));
}
#[test]
fn test_thread_id_override_key() {
let chat_id = "12345";
let thread_id: Option<String> = Some("99".to_string());
let override_key = if let Some(ref tid) = thread_id {
format!("{}:{}", chat_id, tid)
} else {
chat_id.to_string()
};
assert_eq!(override_key, "12345:99");
}
#[test]
fn test_thread_id_override_key_no_thread() {
let chat_id = "12345";
let thread_id: Option<String> = None;
let override_key = if let Some(ref tid) = thread_id {
format!("{}:{}", chat_id, tid)
} else {
chat_id.to_string()
};
assert_eq!(override_key, "12345");
}
#[test]
fn test_inbound_message_with_thread_id() {
use crate::bus::InboundMessage;
let mut inbound = InboundMessage::new("telegram", "user1", "chat1", "Hello");
let thread_id = Some("42".to_string());
if let Some(ref tid) = thread_id {
inbound.session_key = format!("telegram:{}:{}", "chat1", tid);
inbound = inbound.with_metadata("telegram_thread_id", tid);
}
assert_eq!(inbound.session_key, "telegram:chat1:42");
assert_eq!(
inbound.metadata.get("telegram_thread_id"),
Some(&"42".to_string())
);
}
#[test]
fn test_outbound_with_thread_metadata() {
use crate::bus::OutboundMessage;
let msg = OutboundMessage::new("telegram", "chat1", "Reply")
.with_metadata("telegram_thread_id", "42");
assert_eq!(
msg.metadata.get("telegram_thread_id"),
Some(&"42".to_string())
);
}
#[test]
fn test_html_tags_valid_well_formed() {
assert!(html_tags_valid("<b>bold</b>"));
assert!(html_tags_valid("<b>bold <i>italic</i></b>"));
assert!(html_tags_valid("plain text"));
}
#[test]
fn test_html_tags_valid_crossing() {
assert!(!html_tags_valid("<b>bold <i>cross</b> bad</i>"));
}
#[test]
fn test_html_tags_valid_unclosed() {
assert!(!html_tags_valid("<b>unclosed"));
}
#[test]
fn test_strip_html_tags() {
assert_eq!(strip_html_tags("<b>bold</b>"), "bold");
assert_eq!(strip_html_tags("a & b < c"), "a & b < c");
}
#[test]
fn test_render_crossing_bold_italic_falls_back() {
let input = "**bold *cross** end*";
let output = render_telegram_html(input);
assert!(!output.contains("</b>") || html_tags_valid(&output));
}
#[test]
fn test_render_strikethrough() {
assert_eq!(render_telegram_html("~~removed~~"), "<s>removed</s>");
}
#[test]
fn test_render_bold_italic_combined() {
assert_eq!(
render_telegram_html("***bold and italic***"),
"<b><i>bold and italic</i></b>"
);
}
#[test]
fn test_render_blockquote() {
assert_eq!(
render_telegram_html("> quoted text"),
"<blockquote>quoted text</blockquote>"
);
}
#[test]
fn test_render_numbered_list() {
let input = "1. First\n2. Second";
let output = render_telegram_html(input);
assert!(output.contains("1. First"));
assert!(output.contains("2. Second"));
assert!(!output.starts_with(' '));
}
#[test]
fn test_render_header_with_newline() {
let output = render_telegram_html("# Title\nBody");
assert!(output.contains("<b>Title</b>"));
assert!(output.contains("Body"));
}
#[test]
fn test_render_mixed_formatting() {
let input = "**bold** and *italic* and ~~struck~~ and `code`";
let output = render_telegram_html(input);
assert_eq!(
output,
"<b>bold</b> and <i>italic</i> and <s>struck</s> and <code>code</code>"
);
}
#[test]
fn test_typing_key_format_consistency() {
let chat_id: i64 = 123456789;
let thread_id: i32 = 42;
let msg_id: i32 = 100;
let handler_key_threaded = format!("{}:{}:{}", chat_id, thread_id, msg_id);
let handler_key_plain = format!("{}:{}", chat_id, msg_id);
let send_key_threaded = format!(
"{}:{}:{}",
chat_id,
thread_id.to_string(),
msg_id.to_string()
);
let send_key_plain = format!("{}:{}", chat_id, msg_id.to_string());
assert_eq!(handler_key_threaded, send_key_threaded);
assert_eq!(handler_key_plain, send_key_plain);
}
#[test]
fn test_typing_key_per_message_isolation() {
let chat_id: i64 = 123456789;
let msg_id_a: i32 = 100;
let msg_id_b: i32 = 101;
let key_a = format!("{}:{}", chat_id, msg_id_a);
let key_b = format!("{}:{}", chat_id, msg_id_b);
assert_ne!(key_a, key_b);
}
}