use super::traits::{Channel, ChannelMessage, SendMessage};
use crate::config::{Config, StreamMode};
use crate::security::pairing::PairingGuard;
use anyhow::Context;
use async_trait::async_trait;
use directories::UserDirs;
use parking_lot::Mutex;
use reqwest::multipart::{Form, Part};
use std::path::Path;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use tokio::fs;
const TELEGRAM_MAX_MESSAGE_LENGTH: usize = 4096;
const TELEGRAM_CONTINUATION_OVERHEAD: usize = 30;
const TELEGRAM_ACK_REACTIONS: &[&str] = &["⚡️", "👌", "👀", "🔥", "👍"];
#[derive(Debug, Clone, PartialEq, Eq)]
struct IncomingAttachment {
file_id: String,
file_name: Option<String>,
file_size: Option<u64>,
caption: Option<String>,
kind: IncomingAttachmentKind,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum IncomingAttachmentKind {
Document,
Photo,
}
const TELEGRAM_BIND_COMMAND: &str = "/bind";
fn split_message_for_telegram(message: &str) -> Vec<String> {
if message.chars().count() <= TELEGRAM_MAX_MESSAGE_LENGTH {
return vec![message.to_string()];
}
let mut chunks = Vec::new();
let mut remaining = message;
let chunk_limit = TELEGRAM_MAX_MESSAGE_LENGTH - TELEGRAM_CONTINUATION_OVERHEAD;
while !remaining.is_empty() {
if remaining.chars().count() <= TELEGRAM_MAX_MESSAGE_LENGTH {
chunks.push(remaining.to_string());
break;
}
let hard_split = remaining
.char_indices()
.nth(chunk_limit)
.map_or(remaining.len(), |(idx, _)| idx);
let chunk_end = if hard_split == remaining.len() {
hard_split
} else {
let search_area = &remaining[..hard_split];
if let Some(pos) = search_area.rfind('\n') {
if search_area[..pos].chars().count() >= chunk_limit / 2 {
pos + 1
} else {
search_area.rfind(' ').unwrap_or(hard_split) + 1
}
} else if let Some(pos) = search_area.rfind(' ') {
pos + 1
} else {
hard_split
}
};
chunks.push(remaining[..chunk_end].to_string());
remaining = &remaining[chunk_end..];
}
chunks
}
fn pick_uniform_index(len: usize) -> usize {
debug_assert!(len > 0);
let upper = len as u64;
let reject_threshold = (u64::MAX / upper) * upper;
loop {
let value = rand::random::<u64>();
if value < reject_threshold {
return (value % upper) as usize;
}
}
}
fn random_telegram_ack_reaction() -> &'static str {
TELEGRAM_ACK_REACTIONS[pick_uniform_index(TELEGRAM_ACK_REACTIONS.len())]
}
fn build_telegram_ack_reaction_request(
chat_id: &str,
message_id: i64,
emoji: &str,
) -> serde_json::Value {
serde_json::json!({
"chat_id": chat_id,
"message_id": message_id,
"reaction": [{
"type": "emoji",
"emoji": emoji
}]
})
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum TelegramAttachmentKind {
Image,
Document,
Video,
Audio,
Voice,
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct TelegramAttachment {
kind: TelegramAttachmentKind,
target: String,
}
impl TelegramAttachmentKind {
fn from_marker(marker: &str) -> Option<Self> {
match marker.trim().to_ascii_uppercase().as_str() {
"IMAGE" | "PHOTO" => Some(Self::Image),
"DOCUMENT" | "FILE" => Some(Self::Document),
"VIDEO" => Some(Self::Video),
"AUDIO" => Some(Self::Audio),
"VOICE" => Some(Self::Voice),
_ => None,
}
}
}
fn is_image_extension(path: &Path) -> bool {
path.extension()
.and_then(|ext| ext.to_str())
.map(|ext| {
matches!(
ext.to_ascii_lowercase().as_str(),
"png" | "jpg" | "jpeg" | "gif" | "webp" | "bmp"
)
})
.unwrap_or(false)
}
fn format_attachment_content(
kind: IncomingAttachmentKind,
local_filename: &str,
local_path: &Path,
) -> String {
match kind {
IncomingAttachmentKind::Photo if is_image_extension(local_path) => {
format!("[IMAGE:{}]", local_path.display())
}
_ => {
format!("[Document: {}] {}", local_filename, local_path.display())
}
}
}
fn is_http_url(target: &str) -> bool {
target.starts_with("http://") || target.starts_with("https://")
}
fn infer_attachment_kind_from_target(target: &str) -> Option<TelegramAttachmentKind> {
let normalized = target
.split('?')
.next()
.unwrap_or(target)
.split('#')
.next()
.unwrap_or(target);
let extension = Path::new(normalized)
.extension()
.and_then(|ext| ext.to_str())?
.to_ascii_lowercase();
match extension.as_str() {
"png" | "jpg" | "jpeg" | "gif" | "webp" | "bmp" => Some(TelegramAttachmentKind::Image),
"mp4" | "mov" | "mkv" | "avi" | "webm" => Some(TelegramAttachmentKind::Video),
"mp3" | "m4a" | "wav" | "flac" => Some(TelegramAttachmentKind::Audio),
"ogg" | "oga" | "opus" => Some(TelegramAttachmentKind::Voice),
"pdf" | "txt" | "md" | "csv" | "json" | "zip" | "tar" | "gz" | "doc" | "docx" | "xls"
| "xlsx" | "ppt" | "pptx" => Some(TelegramAttachmentKind::Document),
_ => None,
}
}
fn parse_path_only_attachment(message: &str) -> Option<TelegramAttachment> {
let trimmed = message.trim();
if trimmed.is_empty() || trimmed.contains('\n') {
return None;
}
let candidate = trimmed.trim_matches(|c| matches!(c, '`' | '"' | '\''));
if candidate.chars().any(char::is_whitespace) {
return None;
}
let candidate = candidate.strip_prefix("file://").unwrap_or(candidate);
let kind = infer_attachment_kind_from_target(candidate)?;
if !is_http_url(candidate) && !Path::new(candidate).exists() {
return None;
}
Some(TelegramAttachment {
kind,
target: candidate.to_string(),
})
}
fn strip_tool_call_tags(message: &str) -> String {
super::strip_tool_call_tags(message)
}
fn parse_attachment_markers(message: &str) -> (String, Vec<TelegramAttachment>) {
let mut cleaned = String::with_capacity(message.len());
let mut attachments = Vec::new();
let mut cursor = 0;
while cursor < message.len() {
let Some(open_rel) = message[cursor..].find('[') else {
cleaned.push_str(&message[cursor..]);
break;
};
let open = cursor + open_rel;
cleaned.push_str(&message[cursor..open]);
let Some(close_rel) = message[open..].find(']') else {
cleaned.push_str(&message[open..]);
break;
};
let close = open + close_rel;
let marker = &message[open + 1..close];
let parsed = marker.split_once(':').and_then(|(kind, target)| {
let kind = TelegramAttachmentKind::from_marker(kind)?;
let target = target.trim();
if target.is_empty() {
return None;
}
Some(TelegramAttachment {
kind,
target: target.to_string(),
})
});
if let Some(attachment) = parsed {
attachments.push(attachment);
} else {
cleaned.push_str(&message[open..=close]);
}
cursor = close + 1;
}
(cleaned.trim().to_string(), attachments)
}
const TELEGRAM_MAX_FILE_DOWNLOAD_BYTES: u64 = 20 * 1024 * 1024;
pub struct TelegramChannel {
bot_token: String,
allowed_users: Arc<RwLock<Vec<String>>>,
pairing: Option<PairingGuard>,
client: reqwest::Client,
typing_handle: Mutex<Option<tokio::task::JoinHandle<()>>>,
stream_mode: StreamMode,
draft_update_interval_ms: u64,
last_draft_edit: Mutex<std::collections::HashMap<String, std::time::Instant>>,
mention_only: bool,
bot_username: Mutex<Option<String>>,
api_base: String,
transcription: Option<crate::config::TranscriptionConfig>,
voice_transcriptions: Mutex<std::collections::HashMap<String, String>>,
workspace_dir: Option<std::path::PathBuf>,
}
impl TelegramChannel {
pub fn new(bot_token: String, allowed_users: Vec<String>, mention_only: bool) -> Self {
let normalized_allowed = Self::normalize_allowed_users(allowed_users);
let pairing = if normalized_allowed.is_empty() {
let guard = PairingGuard::new(true, &[]);
if let Some(code) = guard.pairing_code() {
println!(" 🔐 Telegram pairing required. One-time bind code: {code}");
println!(" Send `{TELEGRAM_BIND_COMMAND} <code>` from your Telegram account.");
}
Some(guard)
} else {
None
};
Self {
bot_token,
allowed_users: Arc::new(RwLock::new(normalized_allowed)),
pairing,
client: reqwest::Client::new(),
stream_mode: StreamMode::Off,
draft_update_interval_ms: 1000,
last_draft_edit: Mutex::new(std::collections::HashMap::new()),
typing_handle: Mutex::new(None),
mention_only,
bot_username: Mutex::new(None),
api_base: "https://api.telegram.org".to_string(),
transcription: None,
voice_transcriptions: Mutex::new(std::collections::HashMap::new()),
workspace_dir: None,
}
}
pub fn with_workspace_dir(mut self, dir: std::path::PathBuf) -> Self {
self.workspace_dir = Some(dir);
self
}
pub fn with_streaming(
mut self,
stream_mode: StreamMode,
draft_update_interval_ms: u64,
) -> Self {
self.stream_mode = stream_mode;
self.draft_update_interval_ms = draft_update_interval_ms;
self
}
pub fn with_api_base(mut self, api_base: String) -> Self {
self.api_base = api_base;
self
}
pub fn with_transcription(mut self, config: crate::config::TranscriptionConfig) -> Self {
if config.enabled {
self.transcription = Some(config);
}
self
}
fn parse_reply_target(reply_target: &str) -> (String, Option<String>) {
if let Some((chat_id, thread_id)) = reply_target.split_once(':') {
(chat_id.to_string(), Some(thread_id.to_string()))
} else {
(reply_target.to_string(), None)
}
}
fn extract_update_message_target(update: &serde_json::Value) -> Option<(String, i64)> {
let message = update.get("message")?;
let chat_id = message
.get("chat")
.and_then(|chat| chat.get("id"))
.and_then(serde_json::Value::as_i64)?
.to_string();
let message_id = message
.get("message_id")
.and_then(serde_json::Value::as_i64)?;
Some((chat_id, message_id))
}
fn try_add_ack_reaction_nonblocking(&self, chat_id: String, message_id: i64) {
let client = self.http_client();
let url = self.api_url("setMessageReaction");
let emoji = random_telegram_ack_reaction().to_string();
let body = build_telegram_ack_reaction_request(&chat_id, message_id, &emoji);
tokio::spawn(async move {
let response = match client.post(&url).json(&body).send().await {
Ok(resp) => resp,
Err(err) => {
tracing::warn!(
"Telegram: failed to add ACK reaction to chat_id={chat_id}, message_id={message_id}: {err}"
);
return;
}
};
if !response.status().is_success() {
let status = response.status();
let err_body = response.text().await.unwrap_or_default();
tracing::warn!(
"Telegram: add ACK reaction failed for chat_id={chat_id}, message_id={message_id}: status={status}, body={err_body}"
);
}
});
}
fn http_client(&self) -> reqwest::Client {
crate::config::build_runtime_proxy_client("channel.telegram")
}
fn normalize_identity(value: &str) -> String {
value.trim().trim_start_matches('@').to_string()
}
fn normalize_allowed_users(allowed_users: Vec<String>) -> Vec<String> {
allowed_users
.into_iter()
.map(|entry| Self::normalize_identity(&entry))
.filter(|entry| !entry.is_empty())
.collect()
}
async fn load_config_without_env() -> anyhow::Result<Config> {
let home = UserDirs::new()
.map(|u| u.home_dir().to_path_buf())
.context("Could not find home directory")?;
let zeroclaw_dir = home.join(".zeroclaw");
let config_path = zeroclaw_dir.join("config.toml");
let contents = fs::read_to_string(&config_path)
.await
.with_context(|| format!("Failed to read config file: {}", config_path.display()))?;
let mut config: Config = toml::from_str(&contents).context(
"Failed to parse config.toml — check [channels.telegram] section for syntax errors",
)?;
config.config_path = config_path;
config.workspace_dir = zeroclaw_dir.join("workspace");
Ok(config)
}
async fn persist_allowed_identity(&self, identity: &str) -> anyhow::Result<()> {
let mut config = Self::load_config_without_env().await?;
let Some(telegram) = config.channels_config.telegram.as_mut() else {
anyhow::bail!(
"Missing [channels.telegram] section in config.toml. \
Add bot_token and allowed_users under [channels.telegram], \
or run `zeroclaw onboard --channels-only` to configure interactively"
);
};
let normalized = Self::normalize_identity(identity);
if normalized.is_empty() {
anyhow::bail!("Cannot persist empty Telegram identity");
}
if !telegram.allowed_users.iter().any(|u| u == &normalized) {
telegram.allowed_users.push(normalized);
config
.save()
.await
.context("Failed to persist Telegram allowlist to config.toml")?;
}
Ok(())
}
fn add_allowed_identity_runtime(&self, identity: &str) {
let normalized = Self::normalize_identity(identity);
if normalized.is_empty() {
return;
}
if let Ok(mut users) = self.allowed_users.write() {
if !users.iter().any(|u| u == &normalized) {
users.push(normalized);
}
}
}
fn extract_bind_code(text: &str) -> Option<&str> {
let mut parts = text.split_whitespace();
let command = parts.next()?;
let base_command = command.split('@').next().unwrap_or(command);
if base_command != TELEGRAM_BIND_COMMAND {
return None;
}
parts.next().map(str::trim).filter(|code| !code.is_empty())
}
fn pairing_code_active(&self) -> bool {
self.pairing
.as_ref()
.and_then(PairingGuard::pairing_code)
.is_some()
}
fn api_url(&self, method: &str) -> String {
format!("{}/bot{}/{method}", self.api_base, self.bot_token)
}
async fn fetch_bot_username(&self) -> anyhow::Result<String> {
let resp = self.http_client().get(self.api_url("getMe")).send().await?;
if !resp.status().is_success() {
anyhow::bail!("Failed to fetch bot info: {}", resp.status());
}
let data: serde_json::Value = resp.json().await?;
let username = data
.get("result")
.and_then(|r| r.get("username"))
.and_then(|u| u.as_str())
.context("Bot username not found in response")?;
Ok(username.to_string())
}
async fn get_bot_username(&self) -> Option<String> {
{
let cache = self.bot_username.lock();
if let Some(ref username) = *cache {
return Some(username.clone());
}
}
match self.fetch_bot_username().await {
Ok(username) => {
let mut cache = self.bot_username.lock();
*cache = Some(username.clone());
Some(username)
}
Err(e) => {
tracing::warn!("Failed to fetch bot username: {e}");
None
}
}
}
fn is_telegram_username_char(ch: char) -> bool {
ch.is_ascii_alphanumeric() || ch == '_'
}
fn find_bot_mention_spans(text: &str, bot_username: &str) -> Vec<(usize, usize)> {
let bot_username = bot_username.trim_start_matches('@');
if bot_username.is_empty() {
return Vec::new();
}
let mut spans = Vec::new();
for (at_idx, ch) in text.char_indices() {
if ch != '@' {
continue;
}
if at_idx > 0 {
let prev = text[..at_idx].chars().next_back().unwrap_or(' ');
if Self::is_telegram_username_char(prev) {
continue;
}
}
let username_start = at_idx + 1;
let mut username_end = username_start;
for (rel_idx, candidate_ch) in text[username_start..].char_indices() {
if Self::is_telegram_username_char(candidate_ch) {
username_end = username_start + rel_idx + candidate_ch.len_utf8();
} else {
break;
}
}
if username_end == username_start {
continue;
}
let mention_username = &text[username_start..username_end];
if mention_username.eq_ignore_ascii_case(bot_username) {
spans.push((at_idx, username_end));
}
}
spans
}
fn contains_bot_mention(text: &str, bot_username: &str) -> bool {
!Self::find_bot_mention_spans(text, bot_username).is_empty()
}
fn normalize_incoming_content(text: &str, bot_username: &str) -> Option<String> {
let spans = Self::find_bot_mention_spans(text, bot_username);
if spans.is_empty() {
let normalized = text.split_whitespace().collect::<Vec<_>>().join(" ");
return (!normalized.is_empty()).then_some(normalized);
}
let mut normalized = String::with_capacity(text.len());
let mut cursor = 0;
for (start, end) in spans {
normalized.push_str(&text[cursor..start]);
cursor = end;
}
normalized.push_str(&text[cursor..]);
let normalized = normalized.split_whitespace().collect::<Vec<_>>().join(" ");
(!normalized.is_empty()).then_some(normalized)
}
fn is_group_message(message: &serde_json::Value) -> bool {
message
.get("chat")
.and_then(|c| c.get("type"))
.and_then(|t| t.as_str())
.map(|t| t == "group" || t == "supergroup")
.unwrap_or(false)
}
fn is_user_allowed(&self, username: &str) -> bool {
let identity = Self::normalize_identity(username);
self.allowed_users
.read()
.map(|users| users.iter().any(|u| u == "*" || u == &identity))
.unwrap_or(false)
}
fn is_any_user_allowed<'a, I>(&self, identities: I) -> bool
where
I: IntoIterator<Item = &'a str>,
{
identities.into_iter().any(|id| self.is_user_allowed(id))
}
async fn handle_unauthorized_message(&self, update: &serde_json::Value) {
let Some(message) = update.get("message") else {
return;
};
let Some(text) = message.get("text").and_then(serde_json::Value::as_str) else {
return;
};
let username_opt = message
.get("from")
.and_then(|from| from.get("username"))
.and_then(serde_json::Value::as_str);
let username = username_opt.unwrap_or("unknown");
let normalized_username = Self::normalize_identity(username);
let sender_id = message
.get("from")
.and_then(|from| from.get("id"))
.and_then(serde_json::Value::as_i64);
let sender_id_str = sender_id.map(|id| id.to_string());
let normalized_sender_id = sender_id_str.as_deref().map(Self::normalize_identity);
let chat_id = message
.get("chat")
.and_then(|chat| chat.get("id"))
.and_then(serde_json::Value::as_i64)
.map(|id| id.to_string());
let Some(chat_id) = chat_id else {
tracing::warn!("Telegram: missing chat_id in message, skipping");
return;
};
let mut identities = vec![normalized_username.as_str()];
if let Some(ref id) = normalized_sender_id {
identities.push(id.as_str());
}
if self.is_any_user_allowed(identities.iter().copied()) {
return;
}
if let Some(code) = Self::extract_bind_code(text) {
if let Some(pairing) = self.pairing.as_ref() {
match pairing.try_pair(code, &chat_id).await {
Ok(Some(_token)) => {
let bind_identity = normalized_sender_id.clone().or_else(|| {
if normalized_username.is_empty() || normalized_username == "unknown" {
None
} else {
Some(normalized_username.clone())
}
});
if let Some(identity) = bind_identity {
self.add_allowed_identity_runtime(&identity);
match self.persist_allowed_identity(&identity).await {
Ok(()) => {
let _ = self
.send(&SendMessage::new(
"✅ Telegram account bound successfully. You can talk to ZeroClaw now.",
&chat_id,
))
.await;
tracing::info!(
"Telegram: paired and allowlisted identity={identity}"
);
}
Err(e) => {
tracing::error!(
"Telegram: failed to persist allowlist after bind: {e}"
);
let _ = self
.send(&SendMessage::new(
"⚠️ Bound for this runtime, but failed to persist config. Access may be lost after restart; check config file permissions.",
&chat_id,
))
.await;
}
}
} else {
let _ = self
.send(&SendMessage::new(
"❌ Could not identify your Telegram account. Ensure your account has a username or stable user ID, then retry.",
&chat_id,
))
.await;
}
}
Ok(None) => {
let _ = self
.send(&SendMessage::new(
"❌ Invalid binding code. Ask operator for the latest code and retry.",
&chat_id,
))
.await;
}
Err(lockout_secs) => {
let _ = self
.send(&SendMessage::new(
format!("⏳ Too many invalid attempts. Retry in {lockout_secs}s."),
&chat_id,
))
.await;
}
}
} else {
let _ = self
.send(&SendMessage::new(
"ℹ️ Telegram pairing is not active. Ask operator to add your user ID to channels.telegram.allowed_users in config.toml.",
&chat_id,
))
.await;
}
return;
}
tracing::warn!(
"Telegram: ignoring message from unauthorized user: username={username}, sender_id={}. \
Allowlist Telegram username (without '@') or numeric user ID.",
sender_id_str.as_deref().unwrap_or("unknown")
);
let suggested_identity = normalized_sender_id
.clone()
.or_else(|| {
if normalized_username.is_empty() || normalized_username == "unknown" {
None
} else {
Some(normalized_username.clone())
}
})
.unwrap_or_else(|| "YOUR_TELEGRAM_ID".to_string());
let _ = self
.send(&SendMessage::new(
format!(
"🔐 This bot requires operator approval.\n\nCopy this command to operator terminal:\n`zeroclaw channel bind-telegram {suggested_identity}`\n\nAfter operator runs it, send your message again."
),
&chat_id,
))
.await;
if self.pairing_code_active() {
let _ = self
.send(&SendMessage::new(
"ℹ️ If operator provides a one-time pairing code, you can also run `/bind <code>`.",
&chat_id,
))
.await;
}
}
async fn get_file_path(&self, file_id: &str) -> anyhow::Result<String> {
let url = self.api_url("getFile");
let resp = self
.http_client()
.get(&url)
.query(&[("file_id", file_id)])
.send()
.await
.context("Failed to call Telegram getFile")?;
let data: serde_json::Value = resp.json().await?;
data.get("result")
.and_then(|r| r.get("file_path"))
.and_then(serde_json::Value::as_str)
.map(String::from)
.context("Telegram getFile: missing file_path in response")
}
async fn download_file(&self, file_path: &str) -> anyhow::Result<Vec<u8>> {
let url = format!(
"https://api.telegram.org/file/bot{}/{file_path}",
self.bot_token
);
let resp = self
.http_client()
.get(&url)
.send()
.await
.context("Failed to download Telegram file")?;
if !resp.status().is_success() {
anyhow::bail!("Telegram file download failed: {}", resp.status());
}
Ok(resp.bytes().await?.to_vec())
}
fn parse_voice_metadata(message: &serde_json::Value) -> Option<(String, u64)> {
let voice = message.get("voice").or_else(|| message.get("audio"))?;
let file_id = voice.get("file_id")?.as_str()?.to_string();
let duration = voice
.get("duration")
.and_then(serde_json::Value::as_u64)
.unwrap_or(0);
Some((file_id, duration))
}
fn parse_attachment_metadata(message: &serde_json::Value) -> Option<IncomingAttachment> {
if let Some(doc) = message.get("document") {
let file_id = doc.get("file_id")?.as_str()?.to_string();
let file_name = doc
.get("file_name")
.and_then(serde_json::Value::as_str)
.map(String::from);
let file_size = doc.get("file_size").and_then(serde_json::Value::as_u64);
let caption = message
.get("caption")
.and_then(serde_json::Value::as_str)
.map(String::from);
return Some(IncomingAttachment {
file_id,
file_name,
file_size,
caption,
kind: IncomingAttachmentKind::Document,
});
}
if let Some(photos) = message.get("photo").and_then(serde_json::Value::as_array) {
let best = photos.last()?;
let file_id = best.get("file_id")?.as_str()?.to_string();
let file_size = best.get("file_size").and_then(serde_json::Value::as_u64);
let caption = message
.get("caption")
.and_then(serde_json::Value::as_str)
.map(String::from);
return Some(IncomingAttachment {
file_id,
file_name: None,
file_size,
caption,
kind: IncomingAttachmentKind::Photo,
});
}
None
}
async fn try_parse_attachment_message(
&self,
update: &serde_json::Value,
) -> Option<ChannelMessage> {
let message = update.get("message")?;
let attachment = Self::parse_attachment_metadata(message)?;
if let Some(size) = attachment.file_size {
if size > TELEGRAM_MAX_FILE_DOWNLOAD_BYTES {
tracing::info!(
"Skipping attachment: file size {size} bytes exceeds {} MB limit",
TELEGRAM_MAX_FILE_DOWNLOAD_BYTES / (1024 * 1024)
);
return None;
}
}
let (username, sender_id, sender_identity) = Self::extract_sender_info(message);
let mut identities = vec![username.as_str()];
if let Some(id) = sender_id.as_deref() {
identities.push(id);
}
if !self.is_any_user_allowed(identities.iter().copied()) {
return None;
}
let chat_id = message
.get("chat")
.and_then(|chat| chat.get("id"))
.and_then(serde_json::Value::as_i64)
.map(|id| id.to_string())?;
let message_id = message
.get("message_id")
.and_then(serde_json::Value::as_i64)
.unwrap_or(0);
let thread_id = message
.get("message_thread_id")
.and_then(serde_json::Value::as_i64)
.map(|id| id.to_string());
let reply_target = if let Some(ref tid) = thread_id {
format!("{}:{}", chat_id, tid)
} else {
chat_id.clone()
};
let workspace = self.workspace_dir.as_ref().or_else(|| {
tracing::warn!("Cannot save attachment: workspace_dir not configured");
None
})?;
let save_dir = workspace.join("telegram_files");
if let Err(e) = tokio::fs::create_dir_all(&save_dir).await {
tracing::warn!("Failed to create telegram_files directory: {e}");
return None;
}
let tg_file_path = match self.get_file_path(&attachment.file_id).await {
Ok(p) => p,
Err(e) => {
tracing::warn!("Failed to get attachment file path: {e}");
return None;
}
};
let file_data = match self.download_file(&tg_file_path).await {
Ok(d) => d,
Err(e) => {
tracing::warn!("Failed to download attachment: {e}");
return None;
}
};
let local_filename = match &attachment.file_name {
Some(name) => name.clone(),
None => {
let ext = tg_file_path.rsplit('.').next().unwrap_or("jpg");
format!("photo_{chat_id}_{message_id}.{ext}")
}
};
let local_path = save_dir.join(&local_filename);
if let Err(e) = tokio::fs::write(&local_path, &file_data).await {
tracing::warn!("Failed to save attachment to {}: {e}", local_path.display());
return None;
}
let mut content = format_attachment_content(attachment.kind, &local_filename, &local_path);
if let Some(caption) = &attachment.caption {
if !caption.is_empty() {
use std::fmt::Write;
let _ = write!(content, "\n\n{caption}");
}
}
if let Some(quote) = self.extract_reply_context(message) {
content = format!("{quote}\n\n{content}");
}
Some(ChannelMessage {
id: format!("telegram_{chat_id}_{message_id}"),
sender: sender_identity,
reply_target,
content,
channel: "telegram".to_string(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
thread_ts: thread_id,
})
}
async fn try_parse_voice_message(&self, update: &serde_json::Value) -> Option<ChannelMessage> {
let config = self.transcription.as_ref()?;
let message = update.get("message")?;
let (file_id, duration) = Self::parse_voice_metadata(message)?;
if duration > config.max_duration_secs {
tracing::info!(
"Skipping voice message: duration {duration}s exceeds limit {}s",
config.max_duration_secs
);
return None;
}
let (username, sender_id, sender_identity) = Self::extract_sender_info(message);
let mut identities = vec![username.as_str()];
if let Some(id) = sender_id.as_deref() {
identities.push(id);
}
if !self.is_any_user_allowed(identities.iter().copied()) {
return None;
}
let chat_id = message
.get("chat")
.and_then(|chat| chat.get("id"))
.and_then(serde_json::Value::as_i64)
.map(|id| id.to_string())?;
let message_id = message
.get("message_id")
.and_then(serde_json::Value::as_i64)
.unwrap_or(0);
let thread_id = message
.get("message_thread_id")
.and_then(serde_json::Value::as_i64)
.map(|id| id.to_string());
let reply_target = if let Some(ref tid) = thread_id {
format!("{}:{}", chat_id, tid)
} else {
chat_id.clone()
};
let file_path = match self.get_file_path(&file_id).await {
Ok(p) => p,
Err(e) => {
tracing::warn!("Failed to get voice file path: {e}");
return None;
}
};
let file_name = file_path
.rsplit('/')
.next()
.unwrap_or("voice.ogg")
.to_string();
let audio_data = match self.download_file(&file_path).await {
Ok(d) => d,
Err(e) => {
tracing::warn!("Failed to download voice file: {e}");
return None;
}
};
let text =
match super::transcription::transcribe_audio(audio_data, &file_name, config).await {
Ok(t) => t,
Err(e) => {
tracing::warn!("Voice transcription failed: {e}");
return None;
}
};
if text.trim().is_empty() {
tracing::info!("Voice transcription returned empty text, skipping");
return None;
}
{
let mut cache = self.voice_transcriptions.lock();
if cache.len() >= 100 {
cache.clear();
}
cache.insert(format!("{chat_id}:{message_id}"), text.clone());
}
let content = if let Some(quote) = self.extract_reply_context(message) {
format!("{quote}\n\n[Voice] {text}")
} else {
format!("[Voice] {text}")
};
Some(ChannelMessage {
id: format!("telegram_{chat_id}_{message_id}"),
sender: sender_identity,
reply_target,
content,
channel: "telegram".to_string(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
thread_ts: thread_id,
})
}
fn extract_sender_info(message: &serde_json::Value) -> (String, Option<String>, String) {
let username = message
.get("from")
.and_then(|from| from.get("username"))
.and_then(serde_json::Value::as_str)
.unwrap_or("unknown")
.to_string();
let sender_id = message
.get("from")
.and_then(|from| from.get("id"))
.and_then(serde_json::Value::as_i64)
.map(|id| id.to_string());
let sender_identity = if username == "unknown" {
sender_id.clone().unwrap_or_else(|| "unknown".to_string())
} else {
username.clone()
};
(username, sender_id, sender_identity)
}
fn extract_reply_context(&self, message: &serde_json::Value) -> Option<String> {
let reply = message.get("reply_to_message")?;
let reply_sender = reply
.get("from")
.and_then(|from| from.get("username"))
.and_then(serde_json::Value::as_str)
.or_else(|| {
reply
.get("from")
.and_then(|from| from.get("first_name"))
.and_then(serde_json::Value::as_str)
})
.unwrap_or("unknown");
let reply_text = if let Some(text) = reply.get("text").and_then(serde_json::Value::as_str) {
text.to_string()
} else if reply.get("voice").is_some() || reply.get("audio").is_some() {
let reply_mid = reply.get("message_id").and_then(serde_json::Value::as_i64);
let chat_id = message
.get("chat")
.and_then(|c| c.get("id"))
.and_then(serde_json::Value::as_i64);
if let (Some(mid), Some(cid)) = (reply_mid, chat_id) {
self.voice_transcriptions
.lock()
.get(&format!("{cid}:{mid}"))
.map(|t| format!("[Voice] {t}"))
.unwrap_or_else(|| "[Voice message]".to_string())
} else {
"[Voice message]".to_string()
}
} else if reply.get("photo").is_some() {
"[Photo]".to_string()
} else if reply.get("document").is_some() {
"[Document]".to_string()
} else if reply.get("video").is_some() {
"[Video]".to_string()
} else if reply.get("sticker").is_some() {
"[Sticker]".to_string()
} else {
"[Message]".to_string()
};
let quoted_lines: String = reply_text
.lines()
.map(|line| format!("> {line}"))
.collect::<Vec<_>>()
.join("\n");
Some(format!("> @{reply_sender}:\n{quoted_lines}"))
}
fn parse_update_message(&self, update: &serde_json::Value) -> Option<ChannelMessage> {
let message = update.get("message")?;
let text = message.get("text").and_then(serde_json::Value::as_str)?;
let (username, sender_id, sender_identity) = Self::extract_sender_info(message);
let mut identities = vec![username.as_str()];
if let Some(id) = sender_id.as_deref() {
identities.push(id);
}
if !self.is_any_user_allowed(identities.iter().copied()) {
return None;
}
let is_group = Self::is_group_message(message);
if self.mention_only && is_group {
let bot_username = self.bot_username.lock();
if let Some(ref bot_username) = *bot_username {
if !Self::contains_bot_mention(&text, bot_username) {
return None;
}
} else {
return None;
}
}
let chat_id = message
.get("chat")
.and_then(|chat| chat.get("id"))
.and_then(serde_json::Value::as_i64)
.map(|id| id.to_string())?;
let message_id = message
.get("message_id")
.and_then(serde_json::Value::as_i64)
.unwrap_or(0);
let thread_id = message
.get("message_thread_id")
.and_then(serde_json::Value::as_i64)
.map(|id| id.to_string());
let reply_target = if let Some(ref tid) = thread_id {
format!("{}:{}", chat_id, tid)
} else {
chat_id.clone()
};
let content = if self.mention_only && is_group {
let bot_username = self.bot_username.lock();
let bot_username = bot_username.as_ref()?;
Self::normalize_incoming_content(&text, bot_username)?
} else {
text.to_string()
};
let content = if let Some(quote) = self.extract_reply_context(message) {
format!("{quote}\n\n{content}")
} else {
content
};
Some(ChannelMessage {
id: format!("telegram_{chat_id}_{message_id}"),
sender: sender_identity,
reply_target,
content,
channel: "telegram".to_string(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
thread_ts: thread_id,
})
}
async fn resolve_photo_data_uri(&self, file_id: &str) -> anyhow::Result<String> {
use base64::Engine as _;
let get_file_url = self.api_url(&format!("getFile?file_id={}", file_id));
let resp = self.http_client().get(&get_file_url).send().await?;
let json: serde_json::Value = resp.json().await?;
let file_path = json
.get("result")
.and_then(|r| r.get("file_path"))
.and_then(|p| p.as_str())
.ok_or_else(|| anyhow::anyhow!("getFile: no file_path in response"))?
.to_string();
let download_url = format!(
"https://api.telegram.org/file/bot{}/{}",
self.bot_token, file_path
);
let img_resp = self.http_client().get(&download_url).send().await?;
let bytes = img_resp.bytes().await?;
let resized_bytes = tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<u8>> {
let img = image::load_from_memory(&bytes)?;
let (w, h) = (img.width(), img.height());
let max_dim = 512u32;
let resized = if w > max_dim || h > max_dim {
img.thumbnail(max_dim, max_dim)
} else {
img
};
let mut buf = Vec::new();
resized.write_to(
&mut std::io::Cursor::new(&mut buf),
image::ImageFormat::Jpeg,
)?;
Ok(buf)
})
.await??;
let b64 = base64::engine::general_purpose::STANDARD.encode(&resized_bytes);
Ok(format!("data:image/jpeg;base64,{}", b64))
}
fn markdown_to_telegram_html(text: &str) -> String {
let lines: Vec<&str> = text.split('\n').collect();
let mut result_lines: Vec<String> = Vec::new();
for line in &lines {
let trimmed_line = line.trim_start();
if trimmed_line.starts_with("```") {
result_lines.push(trimmed_line.to_string());
continue;
}
let mut line_out = String::new();
let stripped = line.trim_start_matches('#');
let header_level = line.len() - stripped.len();
if header_level > 0 && line.starts_with('#') && stripped.starts_with(' ') {
let title = Self::escape_html(stripped.trim());
result_lines.push(format!("<b>{title}</b>"));
continue;
}
let mut i = 0;
let bytes = line.as_bytes();
let len = bytes.len();
while i < len {
if i + 1 < len && bytes[i] == b'*' && bytes[i + 1] == b'*' {
if let Some(end) = line[i + 2..].find("**") {
let inner = Self::escape_html(&line[i + 2..i + 2 + end]);
line_out.push_str(&format!("<b>{inner}</b>"));
i += 4 + end;
continue;
}
}
if i + 1 < len && bytes[i] == b'_' && bytes[i + 1] == b'_' {
if let Some(end) = line[i + 2..].find("__") {
let inner = Self::escape_html(&line[i + 2..i + 2 + end]);
line_out.push_str(&format!("<b>{inner}</b>"));
i += 4 + end;
continue;
}
}
if bytes[i] == b'*' && (i == 0 || bytes[i - 1] != b'*') {
if let Some(end) = line[i + 1..].find('*') {
if end > 0 {
let inner = Self::escape_html(&line[i + 1..i + 1 + end]);
line_out.push_str(&format!("<i>{inner}</i>"));
i += 2 + end;
continue;
}
}
}
if bytes[i] == b'`' && (i == 0 || bytes[i - 1] != b'`') {
if let Some(end) = line[i + 1..].find('`') {
let inner = Self::escape_html(&line[i + 1..i + 1 + end]);
line_out.push_str(&format!("<code>{inner}</code>"));
i += 2 + end;
continue;
}
}
if bytes[i] == b'[' {
if let Some(bracket_end) = line[i + 1..].find(']') {
let text_part = &line[i + 1..i + 1 + bracket_end];
let after_bracket = i + 1 + bracket_end + 1; if after_bracket < len && bytes[after_bracket] == b'(' {
if let Some(paren_end) = line[after_bracket + 1..].find(')') {
let url = &line[after_bracket + 1..after_bracket + 1 + paren_end];
if url.starts_with("http://") || url.starts_with("https://") {
let text_html = Self::escape_html(text_part);
let url_html = Self::escape_html(url);
line_out.push_str(&format!(
"<a href=\"{url_html}\">{text_html}</a>"
));
i = after_bracket + 1 + paren_end + 1;
continue;
}
}
}
}
}
if i + 1 < len && bytes[i] == b'~' && bytes[i + 1] == b'~' {
if let Some(end) = line[i + 2..].find("~~") {
let inner = Self::escape_html(&line[i + 2..i + 2 + end]);
line_out.push_str(&format!("<s>{inner}</s>"));
i += 4 + end;
continue;
}
}
let ch = line[i..].chars().next().unwrap();
match ch {
'<' => line_out.push_str("<"),
'>' => line_out.push_str(">"),
'&' => line_out.push_str("&"),
'"' => line_out.push_str("""),
'\'' => line_out.push_str("'"),
_ => line_out.push(ch),
}
i += ch.len_utf8();
}
result_lines.push(line_out);
}
let joined = result_lines.join("\n");
let mut final_out = String::with_capacity(joined.len());
let mut in_code_block = false;
let mut code_buf = String::new();
for line in joined.split('\n') {
let trimmed = line.trim();
if trimmed.starts_with("```") {
if !in_code_block {
in_code_block = true;
code_buf.clear();
} else {
in_code_block = false;
let escaped = code_buf.trim_end_matches('\n');
final_out.push_str(&format!("<pre><code>{escaped}</code></pre>\n"));
code_buf.clear();
}
} else if in_code_block {
code_buf.push_str(line);
code_buf.push('\n');
} else {
final_out.push_str(line);
final_out.push('\n');
}
}
if in_code_block && !code_buf.is_empty() {
final_out.push_str(&format!(
"<pre><code>{}</code></pre>\n",
code_buf.trim_end()
));
}
final_out.trim_end_matches('\n').to_string()
}
fn escape_html(s: &str) -> String {
s.replace('&', "&")
.replace('<', "<")
.replace('>', ">")
.replace('"', """)
.replace('\'', "'")
}
async fn send_text_chunks(
&self,
message: &str,
chat_id: &str,
thread_id: Option<&str>,
) -> anyhow::Result<()> {
let chunks = split_message_for_telegram(message);
for (index, chunk) in chunks.iter().enumerate() {
let text = if chunks.len() > 1 {
if index == 0 {
format!("{chunk}\n\n(continues...)")
} else if index == chunks.len() - 1 {
format!("(continued)\n\n{chunk}")
} else {
format!("(continued)\n\n{chunk}\n\n(continues...)")
}
} else {
chunk.to_string()
};
let mut markdown_body = serde_json::json!({
"chat_id": chat_id,
"text": Self::markdown_to_telegram_html(&text),
"parse_mode": "HTML"
});
if let Some(tid) = thread_id {
markdown_body["message_thread_id"] = serde_json::Value::String(tid.to_string());
}
let markdown_resp = self
.http_client()
.post(self.api_url("sendMessage"))
.json(&markdown_body)
.send()
.await?;
if markdown_resp.status().is_success() {
if index < chunks.len() - 1 {
tokio::time::sleep(Duration::from_millis(100)).await;
}
continue;
}
let markdown_status = markdown_resp.status();
let markdown_err = markdown_resp.text().await.unwrap_or_default();
tracing::warn!(
status = ?markdown_status,
"Telegram sendMessage with Markdown failed; retrying without parse_mode"
);
let mut plain_body = serde_json::json!({
"chat_id": chat_id,
"text": text,
});
if let Some(tid) = thread_id {
plain_body["message_thread_id"] = serde_json::Value::String(tid.to_string());
}
let plain_resp = self
.http_client()
.post(self.api_url("sendMessage"))
.json(&plain_body)
.send()
.await?;
if !plain_resp.status().is_success() {
let plain_status = plain_resp.status();
let plain_err = plain_resp.text().await.unwrap_or_default();
anyhow::bail!(
"Telegram sendMessage failed (markdown {}: {}; plain {}: {})",
markdown_status,
markdown_err,
plain_status,
plain_err
);
}
if index < chunks.len() - 1 {
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
Ok(())
}
async fn send_media_by_url(
&self,
method: &str,
media_field: &str,
chat_id: &str,
thread_id: Option<&str>,
url: &str,
caption: Option<&str>,
) -> anyhow::Result<()> {
let mut body = serde_json::json!({
"chat_id": chat_id,
});
body[media_field] = serde_json::Value::String(url.to_string());
if let Some(tid) = thread_id {
body["message_thread_id"] = serde_json::Value::String(tid.to_string());
}
if let Some(cap) = caption {
body["caption"] = serde_json::Value::String(cap.to_string());
}
let resp = self
.http_client()
.post(self.api_url(method))
.json(&body)
.send()
.await?;
if !resp.status().is_success() {
let err = resp.text().await?;
anyhow::bail!("Telegram {method} by URL failed: {err}");
}
tracing::info!("Telegram {method} sent to {chat_id}: {url}");
Ok(())
}
async fn send_attachment(
&self,
chat_id: &str,
thread_id: Option<&str>,
attachment: &TelegramAttachment,
) -> anyhow::Result<()> {
let target = attachment.target.trim();
if is_http_url(target) {
let result = match attachment.kind {
TelegramAttachmentKind::Image => {
self.send_photo_by_url(chat_id, thread_id, target, None)
.await
}
TelegramAttachmentKind::Document => {
self.send_document_by_url(chat_id, thread_id, target, None)
.await
}
TelegramAttachmentKind::Video => {
self.send_video_by_url(chat_id, thread_id, target, None)
.await
}
TelegramAttachmentKind::Audio => {
self.send_audio_by_url(chat_id, thread_id, target, None)
.await
}
TelegramAttachmentKind::Voice => {
self.send_voice_by_url(chat_id, thread_id, target, None)
.await
}
};
if let Err(e) = result {
tracing::warn!(
url = target,
error = %e,
"Telegram send media by URL failed; falling back to text link"
);
let kind_label = match attachment.kind {
TelegramAttachmentKind::Image => "Image",
TelegramAttachmentKind::Document => "Document",
TelegramAttachmentKind::Video => "Video",
TelegramAttachmentKind::Audio => "Audio",
TelegramAttachmentKind::Voice => "Voice",
};
let fallback_text = format!("{kind_label}: {target}");
self.send_text_chunks(&fallback_text, chat_id, thread_id)
.await?;
}
return Ok(());
}
let remapped;
let target = if let Some(rel) = target.strip_prefix("/workspace/") {
if let Some(ws) = &self.workspace_dir {
remapped = ws.join(rel);
remapped.to_str().unwrap_or(target)
} else {
target
}
} else {
target
};
let path = Path::new(target);
if !path.exists() {
anyhow::bail!("Telegram attachment path not found: {target}");
}
match attachment.kind {
TelegramAttachmentKind::Image => self.send_photo(chat_id, thread_id, path, None).await,
TelegramAttachmentKind::Document => {
self.send_document(chat_id, thread_id, path, None).await
}
TelegramAttachmentKind::Video => self.send_video(chat_id, thread_id, path, None).await,
TelegramAttachmentKind::Audio => self.send_audio(chat_id, thread_id, path, None).await,
TelegramAttachmentKind::Voice => self.send_voice(chat_id, thread_id, path, None).await,
}
}
pub async fn send_document(
&self,
chat_id: &str,
thread_id: Option<&str>,
file_path: &Path,
caption: Option<&str>,
) -> anyhow::Result<()> {
let file_name = file_path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("file");
let file_bytes = tokio::fs::read(file_path).await?;
let part = Part::bytes(file_bytes).file_name(file_name.to_string());
let mut form = Form::new()
.text("chat_id", chat_id.to_string())
.part("document", part);
if let Some(tid) = thread_id {
form = form.text("message_thread_id", tid.to_string());
}
if let Some(cap) = caption {
form = form.text("caption", cap.to_string());
}
let resp = self
.http_client()
.post(self.api_url("sendDocument"))
.multipart(form)
.send()
.await?;
if !resp.status().is_success() {
let err = resp.text().await?;
anyhow::bail!("Telegram sendDocument failed: {err}");
}
tracing::info!("Telegram document sent to {chat_id}: {file_name}");
Ok(())
}
pub async fn send_document_bytes(
&self,
chat_id: &str,
thread_id: Option<&str>,
file_bytes: Vec<u8>,
file_name: &str,
caption: Option<&str>,
) -> anyhow::Result<()> {
let part = Part::bytes(file_bytes).file_name(file_name.to_string());
let mut form = Form::new()
.text("chat_id", chat_id.to_string())
.part("document", part);
if let Some(tid) = thread_id {
form = form.text("message_thread_id", tid.to_string());
}
if let Some(cap) = caption {
form = form.text("caption", cap.to_string());
}
let resp = self
.http_client()
.post(self.api_url("sendDocument"))
.multipart(form)
.send()
.await?;
if !resp.status().is_success() {
let err = resp.text().await?;
anyhow::bail!("Telegram sendDocument failed: {err}");
}
tracing::info!("Telegram document sent to {chat_id}: {file_name}");
Ok(())
}
pub async fn send_photo(
&self,
chat_id: &str,
thread_id: Option<&str>,
file_path: &Path,
caption: Option<&str>,
) -> anyhow::Result<()> {
let file_name = file_path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("photo.jpg");
let file_bytes = tokio::fs::read(file_path).await?;
let part = Part::bytes(file_bytes).file_name(file_name.to_string());
let mut form = Form::new()
.text("chat_id", chat_id.to_string())
.part("photo", part);
if let Some(tid) = thread_id {
form = form.text("message_thread_id", tid.to_string());
}
if let Some(cap) = caption {
form = form.text("caption", cap.to_string());
}
let resp = self
.http_client()
.post(self.api_url("sendPhoto"))
.multipart(form)
.send()
.await?;
if !resp.status().is_success() {
let err = resp.text().await?;
anyhow::bail!("Telegram sendPhoto failed: {err}");
}
tracing::info!("Telegram photo sent to {chat_id}: {file_name}");
Ok(())
}
pub async fn send_photo_bytes(
&self,
chat_id: &str,
thread_id: Option<&str>,
file_bytes: Vec<u8>,
file_name: &str,
caption: Option<&str>,
) -> anyhow::Result<()> {
let part = Part::bytes(file_bytes).file_name(file_name.to_string());
let mut form = Form::new()
.text("chat_id", chat_id.to_string())
.part("photo", part);
if let Some(tid) = thread_id {
form = form.text("message_thread_id", tid.to_string());
}
if let Some(cap) = caption {
form = form.text("caption", cap.to_string());
}
let resp = self
.http_client()
.post(self.api_url("sendPhoto"))
.multipart(form)
.send()
.await?;
if !resp.status().is_success() {
let err = resp.text().await?;
anyhow::bail!("Telegram sendPhoto failed: {err}");
}
tracing::info!("Telegram photo sent to {chat_id}: {file_name}");
Ok(())
}
pub async fn send_video(
&self,
chat_id: &str,
thread_id: Option<&str>,
file_path: &Path,
caption: Option<&str>,
) -> anyhow::Result<()> {
let file_name = file_path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("video.mp4");
let file_bytes = tokio::fs::read(file_path).await?;
let part = Part::bytes(file_bytes).file_name(file_name.to_string());
let mut form = Form::new()
.text("chat_id", chat_id.to_string())
.part("video", part);
if let Some(tid) = thread_id {
form = form.text("message_thread_id", tid.to_string());
}
if let Some(cap) = caption {
form = form.text("caption", cap.to_string());
}
let resp = self
.http_client()
.post(self.api_url("sendVideo"))
.multipart(form)
.send()
.await?;
if !resp.status().is_success() {
let err = resp.text().await?;
anyhow::bail!("Telegram sendVideo failed: {err}");
}
tracing::info!("Telegram video sent to {chat_id}: {file_name}");
Ok(())
}
pub async fn send_audio(
&self,
chat_id: &str,
thread_id: Option<&str>,
file_path: &Path,
caption: Option<&str>,
) -> anyhow::Result<()> {
let file_name = file_path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("audio.mp3");
let file_bytes = tokio::fs::read(file_path).await?;
let part = Part::bytes(file_bytes).file_name(file_name.to_string());
let mut form = Form::new()
.text("chat_id", chat_id.to_string())
.part("audio", part);
if let Some(tid) = thread_id {
form = form.text("message_thread_id", tid.to_string());
}
if let Some(cap) = caption {
form = form.text("caption", cap.to_string());
}
let resp = self
.http_client()
.post(self.api_url("sendAudio"))
.multipart(form)
.send()
.await?;
if !resp.status().is_success() {
let err = resp.text().await?;
anyhow::bail!("Telegram sendAudio failed: {err}");
}
tracing::info!("Telegram audio sent to {chat_id}: {file_name}");
Ok(())
}
pub async fn send_voice(
&self,
chat_id: &str,
thread_id: Option<&str>,
file_path: &Path,
caption: Option<&str>,
) -> anyhow::Result<()> {
let file_name = file_path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("voice.ogg");
let file_bytes = tokio::fs::read(file_path).await?;
let part = Part::bytes(file_bytes).file_name(file_name.to_string());
let mut form = Form::new()
.text("chat_id", chat_id.to_string())
.part("voice", part);
if let Some(tid) = thread_id {
form = form.text("message_thread_id", tid.to_string());
}
if let Some(cap) = caption {
form = form.text("caption", cap.to_string());
}
let resp = self
.http_client()
.post(self.api_url("sendVoice"))
.multipart(form)
.send()
.await?;
if !resp.status().is_success() {
let err = resp.text().await?;
anyhow::bail!("Telegram sendVoice failed: {err}");
}
tracing::info!("Telegram voice sent to {chat_id}: {file_name}");
Ok(())
}
pub async fn send_document_by_url(
&self,
chat_id: &str,
thread_id: Option<&str>,
url: &str,
caption: Option<&str>,
) -> anyhow::Result<()> {
let mut body = serde_json::json!({
"chat_id": chat_id,
"document": url
});
if let Some(tid) = thread_id {
body["message_thread_id"] = serde_json::Value::String(tid.to_string());
}
if let Some(cap) = caption {
body["caption"] = serde_json::Value::String(cap.to_string());
}
let resp = self
.http_client()
.post(self.api_url("sendDocument"))
.json(&body)
.send()
.await?;
if !resp.status().is_success() {
let err = resp.text().await?;
anyhow::bail!("Telegram sendDocument by URL failed: {err}");
}
tracing::info!("Telegram document (URL) sent to {chat_id}: {url}");
Ok(())
}
pub async fn send_photo_by_url(
&self,
chat_id: &str,
thread_id: Option<&str>,
url: &str,
caption: Option<&str>,
) -> anyhow::Result<()> {
let mut body = serde_json::json!({
"chat_id": chat_id,
"photo": url
});
if let Some(tid) = thread_id {
body["message_thread_id"] = serde_json::Value::String(tid.to_string());
}
if let Some(cap) = caption {
body["caption"] = serde_json::Value::String(cap.to_string());
}
let resp = self
.http_client()
.post(self.api_url("sendPhoto"))
.json(&body)
.send()
.await?;
if !resp.status().is_success() {
let err = resp.text().await?;
anyhow::bail!("Telegram sendPhoto by URL failed: {err}");
}
tracing::info!("Telegram photo (URL) sent to {chat_id}: {url}");
Ok(())
}
pub async fn send_video_by_url(
&self,
chat_id: &str,
thread_id: Option<&str>,
url: &str,
caption: Option<&str>,
) -> anyhow::Result<()> {
self.send_media_by_url("sendVideo", "video", chat_id, thread_id, url, caption)
.await
}
pub async fn send_audio_by_url(
&self,
chat_id: &str,
thread_id: Option<&str>,
url: &str,
caption: Option<&str>,
) -> anyhow::Result<()> {
self.send_media_by_url("sendAudio", "audio", chat_id, thread_id, url, caption)
.await
}
pub async fn send_voice_by_url(
&self,
chat_id: &str,
thread_id: Option<&str>,
url: &str,
caption: Option<&str>,
) -> anyhow::Result<()> {
self.send_media_by_url("sendVoice", "voice", chat_id, thread_id, url, caption)
.await
}
}
#[async_trait]
impl Channel for TelegramChannel {
fn name(&self) -> &str {
"telegram"
}
fn supports_draft_updates(&self) -> bool {
self.stream_mode != StreamMode::Off
}
async fn send_draft(&self, message: &SendMessage) -> anyhow::Result<Option<String>> {
if self.stream_mode == StreamMode::Off {
return Ok(None);
}
let (chat_id, thread_id) = Self::parse_reply_target(&message.recipient);
let initial_text = if message.content.is_empty() {
"...".to_string()
} else {
message.content.clone()
};
let mut body = serde_json::json!({
"chat_id": chat_id,
"text": initial_text,
});
if let Some(tid) = thread_id {
body["message_thread_id"] = serde_json::Value::String(tid.to_string());
}
let resp = self
.client
.post(self.api_url("sendMessage"))
.json(&body)
.send()
.await?;
if !resp.status().is_success() {
let err = resp.text().await.unwrap_or_default();
anyhow::bail!("Telegram sendMessage (draft) failed: {err}");
}
let resp_json: serde_json::Value = resp.json().await?;
let message_id = resp_json
.get("result")
.and_then(|r| r.get("message_id"))
.and_then(|id| id.as_i64())
.map(|id| id.to_string());
self.last_draft_edit
.lock()
.insert(chat_id.to_string(), std::time::Instant::now());
Ok(message_id)
}
async fn update_draft(
&self,
recipient: &str,
message_id: &str,
text: &str,
) -> anyhow::Result<()> {
let (chat_id, _) = Self::parse_reply_target(recipient);
{
let last_edits = self.last_draft_edit.lock();
if let Some(last_time) = last_edits.get(&chat_id) {
let elapsed = u64::try_from(last_time.elapsed().as_millis()).unwrap_or(u64::MAX);
if elapsed < self.draft_update_interval_ms {
return Ok(());
}
}
}
let display_text = if text.len() > TELEGRAM_MAX_MESSAGE_LENGTH {
let mut end = 0;
for (idx, ch) in text.char_indices() {
let next = idx + ch.len_utf8();
if next > TELEGRAM_MAX_MESSAGE_LENGTH {
break;
}
end = next;
}
&text[..end]
} else {
text
};
let message_id_parsed = match message_id.parse::<i64>() {
Ok(id) => id,
Err(e) => {
tracing::warn!("Invalid Telegram message_id '{message_id}': {e}");
return Ok(());
}
};
let body = serde_json::json!({
"chat_id": chat_id,
"message_id": message_id_parsed,
"text": display_text,
});
let resp = self
.client
.post(self.api_url("editMessageText"))
.json(&body)
.send()
.await?;
if resp.status().is_success() {
self.last_draft_edit
.lock()
.insert(chat_id.clone(), std::time::Instant::now());
} else {
let status = resp.status();
let err = resp.text().await.unwrap_or_default();
tracing::debug!("Telegram editMessageText failed ({status}): {err}");
}
Ok(())
}
async fn finalize_draft(
&self,
recipient: &str,
message_id: &str,
text: &str,
) -> anyhow::Result<()> {
let text = &strip_tool_call_tags(text);
let (chat_id, thread_id) = Self::parse_reply_target(recipient);
self.last_draft_edit.lock().remove(&chat_id);
let (text_without_markers, attachments) = parse_attachment_markers(text);
let msg_id = match message_id.parse::<i64>() {
Ok(id) => Some(id),
Err(e) => {
tracing::warn!("Invalid Telegram message_id '{message_id}': {e}");
None
}
};
if !attachments.is_empty() {
if let Some(id) = msg_id {
let _ = self
.client
.post(self.api_url("deleteMessage"))
.json(&serde_json::json!({
"chat_id": chat_id,
"message_id": id,
}))
.send()
.await;
}
if !text_without_markers.is_empty() {
self.send_text_chunks(&text_without_markers, &chat_id, thread_id.as_deref())
.await?;
}
for attachment in &attachments {
self.send_attachment(&chat_id, thread_id.as_deref(), attachment)
.await?;
}
return Ok(());
}
if text.len() > TELEGRAM_MAX_MESSAGE_LENGTH {
if let Some(id) = msg_id {
let _ = self
.client
.post(self.api_url("deleteMessage"))
.json(&serde_json::json!({
"chat_id": chat_id,
"message_id": id,
}))
.send()
.await;
}
return self
.send_text_chunks(text, &chat_id, thread_id.as_deref())
.await;
}
let Some(id) = msg_id else {
return self
.send_text_chunks(text, &chat_id, thread_id.as_deref())
.await;
};
let body = serde_json::json!({
"chat_id": chat_id,
"message_id": id,
"text": Self::markdown_to_telegram_html(text),
"parse_mode": "HTML",
});
let resp = self
.client
.post(self.api_url("editMessageText"))
.json(&body)
.send()
.await?;
if resp.status().is_success() {
return Ok(());
}
let plain_body = serde_json::json!({
"chat_id": chat_id,
"message_id": id,
"text": text,
});
let resp = self
.client
.post(self.api_url("editMessageText"))
.json(&plain_body)
.send()
.await?;
if resp.status().is_success() {
return Ok(());
}
tracing::warn!("Telegram finalize_draft edit failed; falling back to sendMessage");
self.send_text_chunks(text, &chat_id, thread_id.as_deref())
.await
}
async fn cancel_draft(&self, recipient: &str, message_id: &str) -> anyhow::Result<()> {
let (chat_id, _) = Self::parse_reply_target(recipient);
self.last_draft_edit.lock().remove(&chat_id);
let message_id = match message_id.parse::<i64>() {
Ok(id) => id,
Err(e) => {
tracing::debug!("Invalid Telegram draft message_id '{message_id}': {e}");
return Ok(());
}
};
let response = self
.client
.post(self.api_url("deleteMessage"))
.json(&serde_json::json!({
"chat_id": chat_id,
"message_id": message_id,
}))
.send()
.await?;
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
tracing::debug!("Telegram deleteMessage failed ({status}): {body}");
}
Ok(())
}
async fn send(&self, message: &SendMessage) -> anyhow::Result<()> {
let content = strip_tool_call_tags(&message.content);
let (chat_id, thread_id) = match message.recipient.split_once(':') {
Some((chat, thread)) => (chat, Some(thread)),
None => (message.recipient.as_str(), None),
};
let (text_without_markers, attachments) = parse_attachment_markers(&content);
if !attachments.is_empty() {
if !text_without_markers.is_empty() {
self.send_text_chunks(&text_without_markers, chat_id, thread_id)
.await?;
}
for attachment in &attachments {
self.send_attachment(chat_id, thread_id, attachment).await?;
}
return Ok(());
}
if let Some(attachment) = parse_path_only_attachment(&content) {
self.send_attachment(chat_id, thread_id, &attachment)
.await?;
return Ok(());
}
self.send_text_chunks(&content, chat_id, thread_id).await
}
async fn listen(&self, tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> anyhow::Result<()> {
let mut offset: i64 = 0;
if self.mention_only {
let _ = self.get_bot_username().await;
}
tracing::info!("Telegram channel listening for messages...");
loop {
let url = self.api_url("getUpdates");
let probe = serde_json::json!({
"offset": offset,
"timeout": 0,
"allowed_updates": ["message"]
});
match self.http_client().post(&url).json(&probe).send().await {
Err(e) => {
tracing::warn!("Telegram startup probe error: {e}; retrying in 5s");
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
Ok(resp) => {
match resp.json::<serde_json::Value>().await {
Err(e) => {
tracing::warn!(
"Telegram startup probe parse error: {e}; retrying in 5s"
);
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
Ok(data) => {
let ok = data
.get("ok")
.and_then(serde_json::Value::as_bool)
.unwrap_or(false);
if ok {
if let Some(results) =
data.get("result").and_then(serde_json::Value::as_array)
{
for update in results {
if let Some(uid) = update
.get("update_id")
.and_then(serde_json::Value::as_i64)
{
offset = uid + 1;
}
}
}
break; }
let error_code = data
.get("error_code")
.and_then(serde_json::Value::as_i64)
.unwrap_or_default();
if error_code == 409 {
tracing::debug!("Startup probe: slot busy (409), retrying in 5s");
} else {
let desc = data
.get("description")
.and_then(serde_json::Value::as_str)
.unwrap_or("unknown");
tracing::warn!(
"Startup probe: API error {error_code}: {desc}; retrying in 5s"
);
}
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
}
}
}
}
tracing::debug!("Startup probe succeeded; entering main long-poll loop.");
loop {
if self.mention_only {
let missing_username = self.bot_username.lock().is_none();
if missing_username {
let _ = self.get_bot_username().await;
}
}
let url = self.api_url("getUpdates");
let body = serde_json::json!({
"offset": offset,
"timeout": 30,
"allowed_updates": ["message"]
});
let resp = match self.http_client().post(&url).json(&body).send().await {
Ok(r) => r,
Err(e) => {
tracing::warn!("Telegram poll error: {e}");
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
continue;
}
};
let data: serde_json::Value = match resp.json().await {
Ok(d) => d,
Err(e) => {
tracing::warn!("Telegram parse error: {e}");
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
continue;
}
};
let ok = data
.get("ok")
.and_then(serde_json::Value::as_bool)
.unwrap_or(true);
if !ok {
let error_code = data
.get("error_code")
.and_then(serde_json::Value::as_i64)
.unwrap_or_default();
let description = data
.get("description")
.and_then(serde_json::Value::as_str)
.unwrap_or("unknown Telegram API error");
if error_code == 409 {
tracing::warn!(
"Telegram polling conflict (409): {description}. \
Ensure only one `zeroclaw` process is using this bot token."
);
tokio::time::sleep(std::time::Duration::from_secs(35)).await;
} else {
tracing::warn!(
"Telegram getUpdates API error (code={}): {description}",
error_code
);
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
continue;
}
if let Some(results) = data.get("result").and_then(serde_json::Value::as_array) {
for update in results {
if let Some(uid) = update.get("update_id").and_then(serde_json::Value::as_i64) {
offset = uid + 1;
}
let msg = if let Some(m) = self.parse_update_message(update) {
m
} else if let Some(m) = self.try_parse_voice_message(update).await {
m
} else if let Some(m) = self.try_parse_attachment_message(update).await {
m
} else {
self.handle_unauthorized_message(update).await;
continue;
};
if let Some((reaction_chat_id, reaction_message_id)) =
Self::extract_update_message_target(update)
{
self.try_add_ack_reaction_nonblocking(
reaction_chat_id,
reaction_message_id,
);
}
let typing_body = serde_json::json!({
"chat_id": &msg.reply_target,
"action": "typing"
});
let _ = self
.http_client()
.post(self.api_url("sendChatAction"))
.json(&typing_body)
.send()
.await;
if tx.send(msg).await.is_err() {
return Ok(());
}
}
}
}
}
async fn health_check(&self) -> bool {
let timeout_duration = Duration::from_secs(5);
match tokio::time::timeout(
timeout_duration,
self.http_client().get(self.api_url("getMe")).send(),
)
.await
{
Ok(Ok(resp)) => resp.status().is_success(),
Ok(Err(e)) => {
tracing::debug!("Telegram health check failed: {e}");
false
}
Err(_) => {
tracing::debug!("Telegram health check timed out after 5s");
false
}
}
}
async fn start_typing(&self, recipient: &str) -> anyhow::Result<()> {
self.stop_typing(recipient).await?;
let client = self.http_client();
let url = self.api_url("sendChatAction");
let chat_id = recipient.to_string();
let handle = tokio::spawn(async move {
loop {
let body = serde_json::json!({
"chat_id": &chat_id,
"action": "typing"
});
let _ = client.post(&url).json(&body).send().await;
tokio::time::sleep(Duration::from_secs(4)).await;
}
});
let mut guard = self.typing_handle.lock();
*guard = Some(handle);
Ok(())
}
async fn stop_typing(&self, _recipient: &str) -> anyhow::Result<()> {
let mut guard = self.typing_handle.lock();
if let Some(handle) = guard.take() {
handle.abort();
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn telegram_channel_name() {
let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()], false);
assert_eq!(ch.name(), "telegram");
}
#[test]
fn random_telegram_ack_reaction_is_from_pool() {
for _ in 0..128 {
let emoji = random_telegram_ack_reaction();
assert!(TELEGRAM_ACK_REACTIONS.contains(&emoji));
}
}
#[test]
fn telegram_ack_reaction_request_shape() {
let body = build_telegram_ack_reaction_request("-100200300", 42, "⚡️");
assert_eq!(body["chat_id"], "-100200300");
assert_eq!(body["message_id"], 42);
assert_eq!(body["reaction"][0]["type"], "emoji");
assert_eq!(body["reaction"][0]["emoji"], "⚡️");
}
#[test]
fn telegram_extract_update_message_target_parses_ids() {
let update = serde_json::json!({
"update_id": 1,
"message": {
"message_id": 99,
"chat": { "id": -100123456 }
}
});
let target = TelegramChannel::extract_update_message_target(&update);
assert_eq!(target, Some(("-100123456".to_string(), 99)));
}
#[test]
fn typing_handle_starts_as_none() {
let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()], false);
let guard = ch.typing_handle.lock();
assert!(guard.is_none());
}
#[tokio::test]
async fn stop_typing_clears_handle() {
let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()], false);
{
let mut guard = ch.typing_handle.lock();
*guard = Some(tokio::spawn(async {
tokio::time::sleep(Duration::from_secs(60)).await;
}));
}
ch.stop_typing("123").await.unwrap();
let guard = ch.typing_handle.lock();
assert!(guard.is_none());
}
#[tokio::test]
async fn start_typing_replaces_previous_handle() {
let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()], false);
{
let mut guard = ch.typing_handle.lock();
*guard = Some(tokio::spawn(async {
tokio::time::sleep(Duration::from_secs(60)).await;
}));
}
let _ = ch.start_typing("123").await;
let guard = ch.typing_handle.lock();
assert!(guard.is_some());
}
#[test]
fn supports_draft_updates_respects_stream_mode() {
let off = TelegramChannel::new("fake-token".into(), vec!["*".into()], false);
assert!(!off.supports_draft_updates());
let partial = TelegramChannel::new("fake-token".into(), vec!["*".into()], false)
.with_streaming(StreamMode::Partial, 750);
assert!(partial.supports_draft_updates());
assert_eq!(partial.draft_update_interval_ms, 750);
}
#[tokio::test]
async fn send_draft_returns_none_when_stream_mode_off() {
let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()], false);
let id = ch
.send_draft(&SendMessage::new("draft", "123"))
.await
.unwrap();
assert!(id.is_none());
}
#[tokio::test]
async fn update_draft_rate_limit_short_circuits_network() {
let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()], false)
.with_streaming(StreamMode::Partial, 60_000);
ch.last_draft_edit
.lock()
.insert("123".to_string(), std::time::Instant::now());
let result = ch.update_draft("123", "42", "delta text").await;
assert!(result.is_ok());
}
#[tokio::test]
async fn update_draft_utf8_truncation_is_safe_for_multibyte_text() {
let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()], false)
.with_streaming(StreamMode::Partial, 0);
let long_emoji_text = "😀".repeat(TELEGRAM_MAX_MESSAGE_LENGTH + 20);
let result = ch
.update_draft("123", "not-a-number", &long_emoji_text)
.await;
assert!(result.is_ok());
}
#[tokio::test]
async fn finalize_draft_invalid_message_id_falls_back_to_chunk_send() {
let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()], false)
.with_streaming(StreamMode::Partial, 0);
let long_text = "a".repeat(TELEGRAM_MAX_MESSAGE_LENGTH + 64);
let result = ch.finalize_draft("123", "not-a-number", &long_text).await;
assert!(result.is_err());
}
#[test]
fn telegram_api_url() {
let ch = TelegramChannel::new("123:ABC".into(), vec![], false);
assert_eq!(
ch.api_url("getMe"),
"https://api.telegram.org/bot123:ABC/getMe"
);
}
#[test]
fn telegram_markdown_to_html_escapes_quotes_in_link_href() {
let rendered = TelegramChannel::markdown_to_telegram_html(
"[click](https://example.com?q=\"x\"&a='b')",
);
assert_eq!(
rendered,
"<a href=\"https://example.com?q="x"&a='b'\">click</a>"
);
}
#[test]
fn telegram_markdown_to_html_escapes_quotes_in_plain_text() {
let rendered = TelegramChannel::markdown_to_telegram_html("say \"hi\" & <tag> 'ok'");
assert_eq!(
rendered,
"say "hi" & <tag> 'ok'"
);
}
#[test]
fn telegram_markdown_to_html_code_block_drops_language_attribute() {
let rendered = TelegramChannel::markdown_to_telegram_html(
"```rust\" onclick=\"alert(1)\nlet x = 1;\n```",
);
assert_eq!(rendered, "<pre><code>let x = 1;</code></pre>");
assert!(!rendered.contains("language-"));
assert!(!rendered.contains("onclick"));
}
#[test]
fn telegram_user_allowed_wildcard() {
let ch = TelegramChannel::new("t".into(), vec!["*".into()], false);
assert!(ch.is_user_allowed("anyone"));
}
#[test]
fn telegram_user_allowed_specific() {
let ch = TelegramChannel::new("t".into(), vec!["alice".into(), "bob".into()], false);
assert!(ch.is_user_allowed("alice"));
assert!(!ch.is_user_allowed("eve"));
}
#[test]
fn telegram_user_allowed_with_at_prefix_in_config() {
let ch = TelegramChannel::new("t".into(), vec!["@alice".into()], false);
assert!(ch.is_user_allowed("alice"));
}
#[test]
fn telegram_user_denied_empty() {
let ch = TelegramChannel::new("t".into(), vec![], false);
assert!(!ch.is_user_allowed("anyone"));
}
#[test]
fn telegram_user_exact_match_not_substring() {
let ch = TelegramChannel::new("t".into(), vec!["alice".into()], false);
assert!(!ch.is_user_allowed("alice_bot"));
assert!(!ch.is_user_allowed("alic"));
assert!(!ch.is_user_allowed("malice"));
}
#[test]
fn telegram_user_empty_string_denied() {
let ch = TelegramChannel::new("t".into(), vec!["alice".into()], false);
assert!(!ch.is_user_allowed(""));
}
#[test]
fn telegram_user_case_sensitive() {
let ch = TelegramChannel::new("t".into(), vec!["Alice".into()], false);
assert!(ch.is_user_allowed("Alice"));
assert!(!ch.is_user_allowed("alice"));
assert!(!ch.is_user_allowed("ALICE"));
}
#[test]
fn telegram_wildcard_with_specific_users() {
let ch = TelegramChannel::new("t".into(), vec!["alice".into(), "*".into()], false);
assert!(ch.is_user_allowed("alice"));
assert!(ch.is_user_allowed("bob"));
assert!(ch.is_user_allowed("anyone"));
}
#[test]
fn telegram_user_allowed_by_numeric_id_identity() {
let ch = TelegramChannel::new("t".into(), vec!["123456789".into()], false);
assert!(ch.is_any_user_allowed(["unknown", "123456789"]));
}
#[test]
fn telegram_user_denied_when_none_of_identities_match() {
let ch = TelegramChannel::new("t".into(), vec!["alice".into(), "987654321".into()], false);
assert!(!ch.is_any_user_allowed(["unknown", "123456789"]));
}
#[test]
fn telegram_pairing_enabled_with_empty_allowlist() {
let ch = TelegramChannel::new("t".into(), vec![], false);
assert!(ch.pairing_code_active());
}
#[test]
fn telegram_pairing_disabled_with_nonempty_allowlist() {
let ch = TelegramChannel::new("t".into(), vec!["alice".into()], false);
assert!(!ch.pairing_code_active());
}
#[test]
fn telegram_extract_bind_code_plain_command() {
assert_eq!(
TelegramChannel::extract_bind_code("/bind 123456"),
Some("123456")
);
}
#[test]
fn telegram_extract_bind_code_supports_bot_mention() {
assert_eq!(
TelegramChannel::extract_bind_code("/bind@zeroclaw_bot 654321"),
Some("654321")
);
}
#[test]
fn telegram_extract_bind_code_rejects_invalid_forms() {
assert_eq!(TelegramChannel::extract_bind_code("/bind"), None);
assert_eq!(TelegramChannel::extract_bind_code("/start"), None);
}
#[test]
fn parse_attachment_markers_extracts_multiple_types() {
let message = "Here are files [IMAGE:/tmp/a.png] and [DOCUMENT:https://example.com/a.pdf]";
let (cleaned, attachments) = parse_attachment_markers(message);
assert_eq!(cleaned, "Here are files and");
assert_eq!(attachments.len(), 2);
assert_eq!(attachments[0].kind, TelegramAttachmentKind::Image);
assert_eq!(attachments[0].target, "/tmp/a.png");
assert_eq!(attachments[1].kind, TelegramAttachmentKind::Document);
assert_eq!(attachments[1].target, "https://example.com/a.pdf");
}
#[test]
fn parse_attachment_markers_keeps_invalid_markers_in_text() {
let message = "Report [UNKNOWN:/tmp/a.bin]";
let (cleaned, attachments) = parse_attachment_markers(message);
assert_eq!(cleaned, "Report [UNKNOWN:/tmp/a.bin]");
assert!(attachments.is_empty());
}
#[test]
fn parse_path_only_attachment_detects_existing_file() {
let dir = tempfile::tempdir().unwrap();
let image_path = dir.path().join("snap.png");
std::fs::write(&image_path, b"fake-png").unwrap();
let parsed = parse_path_only_attachment(image_path.to_string_lossy().as_ref())
.expect("expected attachment");
assert_eq!(parsed.kind, TelegramAttachmentKind::Image);
assert_eq!(parsed.target, image_path.to_string_lossy());
}
#[test]
fn parse_path_only_attachment_rejects_sentence_text() {
assert!(parse_path_only_attachment("Screenshot saved to /tmp/snap.png").is_none());
}
#[test]
fn infer_attachment_kind_from_target_detects_document_extension() {
assert_eq!(
infer_attachment_kind_from_target("https://example.com/files/specs.pdf?download=1"),
Some(TelegramAttachmentKind::Document)
);
}
#[test]
fn parse_update_message_uses_chat_id_as_reply_target() {
let ch = TelegramChannel::new("token".into(), vec!["*".into()], false);
let update = serde_json::json!({
"update_id": 1,
"message": {
"message_id": 33,
"text": "hello",
"from": {
"id": 555,
"username": "alice"
},
"chat": {
"id": -100_200_300
}
}
});
let msg = ch
.parse_update_message(&update)
.expect("message should parse");
assert_eq!(msg.sender, "alice");
assert_eq!(msg.reply_target, "-100200300");
assert_eq!(msg.content, "hello");
assert_eq!(msg.id, "telegram_-100200300_33");
}
#[test]
fn parse_update_message_allows_numeric_id_without_username() {
let ch = TelegramChannel::new("token".into(), vec!["555".into()], false);
let update = serde_json::json!({
"update_id": 2,
"message": {
"message_id": 9,
"text": "ping",
"from": {
"id": 555
},
"chat": {
"id": 12345
}
}
});
let msg = ch
.parse_update_message(&update)
.expect("numeric allowlist should pass");
assert_eq!(msg.sender, "555");
assert_eq!(msg.reply_target, "12345");
}
#[test]
fn parse_update_message_extracts_thread_id_for_forum_topic() {
let ch = TelegramChannel::new("token".into(), vec!["*".into()], false);
let update = serde_json::json!({
"update_id": 3,
"message": {
"message_id": 42,
"text": "hello from topic",
"from": {
"id": 555,
"username": "alice"
},
"chat": {
"id": -100_200_300
},
"message_thread_id": 789
}
});
let msg = ch
.parse_update_message(&update)
.expect("message with thread_id should parse");
assert_eq!(msg.sender, "alice");
assert_eq!(msg.reply_target, "-100200300:789");
assert_eq!(msg.content, "hello from topic");
assert_eq!(msg.id, "telegram_-100200300_42");
}
#[test]
fn telegram_api_url_send_document() {
let ch = TelegramChannel::new("123:ABC".into(), vec![], false);
assert_eq!(
ch.api_url("sendDocument"),
"https://api.telegram.org/bot123:ABC/sendDocument"
);
}
#[test]
fn telegram_api_url_send_photo() {
let ch = TelegramChannel::new("123:ABC".into(), vec![], false);
assert_eq!(
ch.api_url("sendPhoto"),
"https://api.telegram.org/bot123:ABC/sendPhoto"
);
}
#[test]
fn telegram_api_url_send_video() {
let ch = TelegramChannel::new("123:ABC".into(), vec![], false);
assert_eq!(
ch.api_url("sendVideo"),
"https://api.telegram.org/bot123:ABC/sendVideo"
);
}
#[test]
fn telegram_api_url_send_audio() {
let ch = TelegramChannel::new("123:ABC".into(), vec![], false);
assert_eq!(
ch.api_url("sendAudio"),
"https://api.telegram.org/bot123:ABC/sendAudio"
);
}
#[test]
fn telegram_api_url_send_voice() {
let ch = TelegramChannel::new("123:ABC".into(), vec![], false);
assert_eq!(
ch.api_url("sendVoice"),
"https://api.telegram.org/bot123:ABC/sendVoice"
);
}
#[tokio::test]
async fn telegram_send_document_bytes_builds_correct_form() {
let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()], false);
let file_bytes = b"Hello, this is a test file content".to_vec();
let result = ch
.send_document_bytes("123456", None, file_bytes, "test.txt", Some("Test caption"))
.await;
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(
err.contains("error") || err.contains("failed") || err.contains("connect"),
"Expected network error, got: {err}"
);
}
#[tokio::test]
async fn telegram_send_photo_bytes_builds_correct_form() {
let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()], false);
let file_bytes = vec![0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A];
let result = ch
.send_photo_bytes("123456", None, file_bytes, "test.png", None)
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn telegram_send_document_by_url_builds_correct_json() {
let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()], false);
let result = ch
.send_document_by_url(
"123456",
None,
"https://example.com/file.pdf",
Some("PDF doc"),
)
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn telegram_send_photo_by_url_builds_correct_json() {
let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()], false);
let result = ch
.send_photo_by_url("123456", None, "https://example.com/image.jpg", None)
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn telegram_send_document_nonexistent_file() {
let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()], false);
let path = Path::new("/nonexistent/path/to/file.txt");
let result = ch.send_document("123456", None, path, None).await;
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(
err.contains("No such file") || err.contains("not found") || err.contains("os error"),
"Expected file not found error, got: {err}"
);
}
#[tokio::test]
async fn telegram_send_photo_nonexistent_file() {
let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()], false);
let path = Path::new("/nonexistent/path/to/photo.jpg");
let result = ch.send_photo("123456", None, path, None).await;
assert!(result.is_err());
}
#[tokio::test]
async fn telegram_send_video_nonexistent_file() {
let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()], false);
let path = Path::new("/nonexistent/path/to/video.mp4");
let result = ch.send_video("123456", None, path, None).await;
assert!(result.is_err());
}
#[tokio::test]
async fn telegram_send_audio_nonexistent_file() {
let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()], false);
let path = Path::new("/nonexistent/path/to/audio.mp3");
let result = ch.send_audio("123456", None, path, None).await;
assert!(result.is_err());
}
#[tokio::test]
async fn telegram_send_voice_nonexistent_file() {
let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()], false);
let path = Path::new("/nonexistent/path/to/voice.ogg");
let result = ch.send_voice("123456", None, path, None).await;
assert!(result.is_err());
}
#[test]
fn telegram_split_short_message() {
let msg = "Hello, world!";
let chunks = split_message_for_telegram(msg);
assert_eq!(chunks.len(), 1);
assert_eq!(chunks[0], msg);
}
#[test]
fn telegram_split_exact_limit() {
let msg = "a".repeat(TELEGRAM_MAX_MESSAGE_LENGTH);
let chunks = split_message_for_telegram(&msg);
assert_eq!(chunks.len(), 1);
assert_eq!(chunks[0].len(), TELEGRAM_MAX_MESSAGE_LENGTH);
}
#[test]
fn telegram_split_over_limit() {
let msg = "a".repeat(TELEGRAM_MAX_MESSAGE_LENGTH + 100);
let chunks = split_message_for_telegram(&msg);
assert_eq!(chunks.len(), 2);
assert!(chunks[0].len() <= TELEGRAM_MAX_MESSAGE_LENGTH);
assert!(chunks[1].len() <= TELEGRAM_MAX_MESSAGE_LENGTH);
}
#[test]
fn telegram_split_at_word_boundary() {
let msg = format!(
"{} more text here",
"word ".repeat(TELEGRAM_MAX_MESSAGE_LENGTH / 5)
);
let chunks = split_message_for_telegram(&msg);
assert!(chunks.len() >= 2);
for chunk in &chunks[..chunks.len() - 1] {
assert!(chunk.len() <= TELEGRAM_MAX_MESSAGE_LENGTH);
}
}
#[test]
fn telegram_split_at_newline() {
let text_block = "Line of text\n".repeat(TELEGRAM_MAX_MESSAGE_LENGTH / 13 + 1);
let chunks = split_message_for_telegram(&text_block);
assert!(chunks.len() >= 2);
for chunk in chunks {
assert!(chunk.len() <= TELEGRAM_MAX_MESSAGE_LENGTH);
}
}
#[test]
fn telegram_split_preserves_content() {
let msg = "test ".repeat(TELEGRAM_MAX_MESSAGE_LENGTH / 5 + 100);
let chunks = split_message_for_telegram(&msg);
let rejoined = chunks.join("");
assert_eq!(rejoined, msg);
}
#[test]
fn telegram_split_empty_message() {
let chunks = split_message_for_telegram("");
assert_eq!(chunks.len(), 1);
assert_eq!(chunks[0], "");
}
#[test]
fn telegram_split_very_long_message() {
let msg = "x".repeat(TELEGRAM_MAX_MESSAGE_LENGTH * 3);
let chunks = split_message_for_telegram(&msg);
assert!(chunks.len() >= 3);
for chunk in chunks {
assert!(chunk.len() <= TELEGRAM_MAX_MESSAGE_LENGTH);
}
}
#[tokio::test]
async fn telegram_send_document_bytes_with_caption() {
let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()], false);
let file_bytes = b"test content".to_vec();
let result = ch
.send_document_bytes(
"123456",
None,
file_bytes.clone(),
"test.txt",
Some("My caption"),
)
.await;
assert!(result.is_err());
let result = ch
.send_document_bytes("123456", None, file_bytes, "test.txt", None)
.await;
assert!(result.is_err()); }
#[tokio::test]
async fn telegram_send_photo_bytes_with_caption() {
let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()], false);
let file_bytes = vec![0x89, 0x50, 0x4E, 0x47];
let result = ch
.send_photo_bytes(
"123456",
None,
file_bytes.clone(),
"test.png",
Some("Photo caption"),
)
.await;
assert!(result.is_err());
let result = ch
.send_photo_bytes("123456", None, file_bytes, "test.png", None)
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn telegram_send_document_bytes_empty_file() {
let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()], false);
let file_bytes: Vec<u8> = vec![];
let result = ch
.send_document_bytes("123456", None, file_bytes, "empty.txt", None)
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn telegram_send_document_bytes_empty_filename() {
let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()], false);
let file_bytes = b"content".to_vec();
let result = ch
.send_document_bytes("123456", None, file_bytes, "", None)
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn telegram_send_document_bytes_empty_chat_id() {
let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()], false);
let file_bytes = b"content".to_vec();
let result = ch
.send_document_bytes("", None, file_bytes, "test.txt", None)
.await;
assert!(result.is_err());
}
#[test]
fn telegram_message_id_format_includes_chat_and_message_id() {
let chat_id = "123456";
let message_id = 789;
let expected_id = format!("telegram_{chat_id}_{message_id}");
assert_eq!(expected_id, "telegram_123456_789");
}
#[test]
fn telegram_message_id_is_deterministic() {
let chat_id = "123456";
let message_id = 789;
let id1 = format!("telegram_{chat_id}_{message_id}");
let id2 = format!("telegram_{chat_id}_{message_id}");
assert_eq!(id1, id2);
}
#[test]
fn telegram_message_id_different_message_different_id() {
let chat_id = "123456";
let id1 = format!("telegram_{chat_id}_789");
let id2 = format!("telegram_{chat_id}_790");
assert_ne!(id1, id2);
}
#[test]
fn telegram_message_id_different_chat_different_id() {
let message_id = 789;
let id1 = format!("telegram_123456_{message_id}");
let id2 = format!("telegram_789012_{message_id}");
assert_ne!(id1, id2);
}
#[test]
fn telegram_message_id_no_uuid_randomness() {
let chat_id = "123456";
let message_id = 789;
let id = format!("telegram_{chat_id}_{message_id}");
assert!(!id.contains('-')); assert!(id.starts_with("telegram_"));
}
#[test]
fn telegram_message_id_handles_zero_message_id() {
let chat_id = "123456";
let message_id = 0;
let id = format!("telegram_{chat_id}_{message_id}");
assert_eq!(id, "telegram_123456_0");
}
#[test]
fn strip_tool_call_tags_removes_standard_tags() {
let input =
"Hello <tool>{\"name\":\"shell\",\"arguments\":{\"command\":\"ls\"}}</tool> world";
let result = strip_tool_call_tags(input);
assert_eq!(result, "Hello world");
}
#[test]
fn strip_tool_call_tags_removes_alias_tags() {
let input = "Hello <toolcall>{\"name\":\"shell\",\"arguments\":{\"command\":\"ls\"}}</toolcall> world";
let result = strip_tool_call_tags(input);
assert_eq!(result, "Hello world");
}
#[test]
fn strip_tool_call_tags_removes_dash_tags() {
let input = "Hello <tool-call>{\"name\":\"shell\",\"arguments\":{\"command\":\"ls\"}}</tool-call> world";
let result = strip_tool_call_tags(input);
assert_eq!(result, "Hello world");
}
#[test]
fn strip_tool_call_tags_removes_tool_call_tags() {
let input = "Hello <tool_call>{\"name\":\"shell\",\"arguments\":{\"command\":\"ls\"}}</tool_call> world";
let result = strip_tool_call_tags(input);
assert_eq!(result, "Hello world");
}
#[test]
fn strip_tool_call_tags_removes_invoke_tags() {
let input = "Hello <invoke>{\"name\":\"shell\",\"arguments\":{\"command\":\"date\"}}</invoke> world";
let result = strip_tool_call_tags(input);
assert_eq!(result, "Hello world");
}
#[test]
fn strip_tool_call_tags_handles_multiple_tags() {
let input = "Start <tool>a</tool> middle <tool>b</tool> end";
let result = strip_tool_call_tags(input);
assert_eq!(result, "Start middle end");
}
#[test]
fn strip_tool_call_tags_handles_mixed_tags() {
let input = "A <tool>a</tool> B <toolcall>b</toolcall> C <tool-call>c</tool-call> D";
let result = strip_tool_call_tags(input);
assert_eq!(result, "A B C D");
}
#[test]
fn strip_tool_call_tags_preserves_normal_text() {
let input = "Hello world! This is a test.";
let result = strip_tool_call_tags(input);
assert_eq!(result, "Hello world! This is a test.");
}
#[test]
fn strip_tool_call_tags_handles_unclosed_tags() {
let input = "Hello <tool>world";
let result = strip_tool_call_tags(input);
assert_eq!(result, "Hello <tool>world");
}
#[test]
fn strip_tool_call_tags_handles_unclosed_tool_call_with_json() {
let input =
"Status:\n<tool_call>\n{\"name\":\"shell\",\"arguments\":{\"command\":\"uptime\"}}";
let result = strip_tool_call_tags(input);
assert_eq!(result, "Status:");
}
#[test]
fn strip_tool_call_tags_handles_mismatched_close_tag() {
let input =
"<tool_call>{\"name\":\"shell\",\"arguments\":{\"command\":\"uptime\"}}</arg_value>";
let result = strip_tool_call_tags(input);
assert_eq!(result, "");
}
#[test]
fn strip_tool_call_tags_cleans_extra_newlines() {
let input = "Hello\n\n<tool>\ntest\n</tool>\n\n\nworld";
let result = strip_tool_call_tags(input);
assert_eq!(result, "Hello\n\nworld");
}
#[test]
fn strip_tool_call_tags_handles_empty_input() {
let input = "";
let result = strip_tool_call_tags(input);
assert_eq!(result, "");
}
#[test]
fn strip_tool_call_tags_handles_only_tags() {
let input = "<tool>{\"name\":\"test\"}</tool>";
let result = strip_tool_call_tags(input);
assert_eq!(result, "");
}
#[test]
fn telegram_contains_bot_mention_finds_mention() {
assert!(TelegramChannel::contains_bot_mention(
"Hello @mybot",
"mybot"
));
assert!(TelegramChannel::contains_bot_mention(
"@mybot help",
"mybot"
));
assert!(TelegramChannel::contains_bot_mention(
"Hey @mybot how are you?",
"mybot"
));
assert!(TelegramChannel::contains_bot_mention(
"Hello @MyBot, can you help?",
"mybot"
));
}
#[test]
fn telegram_contains_bot_mention_no_false_positives() {
assert!(!TelegramChannel::contains_bot_mention(
"Hello @otherbot",
"mybot"
));
assert!(!TelegramChannel::contains_bot_mention(
"Hello mybot",
"mybot"
));
assert!(!TelegramChannel::contains_bot_mention(
"Hello @mybot2",
"mybot"
));
assert!(!TelegramChannel::contains_bot_mention("", "mybot"));
}
#[test]
fn telegram_normalize_incoming_content_strips_mention() {
let result = TelegramChannel::normalize_incoming_content("@mybot hello", "mybot");
assert_eq!(result, Some("hello".to_string()));
}
#[test]
fn telegram_normalize_incoming_content_handles_multiple_mentions() {
let result = TelegramChannel::normalize_incoming_content("@mybot @mybot test", "mybot");
assert_eq!(result, Some("test".to_string()));
}
#[test]
fn telegram_normalize_incoming_content_returns_none_for_empty() {
let result = TelegramChannel::normalize_incoming_content("@mybot", "mybot");
assert_eq!(result, None);
}
#[test]
fn parse_update_message_mention_only_group_requires_exact_mention() {
let ch = TelegramChannel::new("token".into(), vec!["*".into()], true);
{
let mut cache = ch.bot_username.lock();
*cache = Some("mybot".to_string());
}
let update = serde_json::json!({
"update_id": 10,
"message": {
"message_id": 44,
"text": "hello @mybot2",
"from": {
"id": 555,
"username": "alice"
},
"chat": {
"id": -100_200_300,
"type": "group"
}
}
});
assert!(ch.parse_update_message(&update).is_none());
}
#[test]
fn parse_update_message_mention_only_group_strips_mention_and_drops_empty() {
let ch = TelegramChannel::new("token".into(), vec!["*".into()], true);
{
let mut cache = ch.bot_username.lock();
*cache = Some("mybot".to_string());
}
let update = serde_json::json!({
"update_id": 11,
"message": {
"message_id": 45,
"text": "Hi @MyBot status please",
"from": {
"id": 555,
"username": "alice"
},
"chat": {
"id": -100_200_300,
"type": "group"
}
}
});
let parsed = ch
.parse_update_message(&update)
.expect("mention should parse");
assert_eq!(parsed.content, "Hi status please");
let empty_update = serde_json::json!({
"update_id": 12,
"message": {
"message_id": 46,
"text": "@mybot",
"from": {
"id": 555,
"username": "alice"
},
"chat": {
"id": -100_200_300,
"type": "group"
}
}
});
assert!(ch.parse_update_message(&empty_update).is_none());
}
#[test]
fn telegram_is_group_message_detects_groups() {
let group_msg = serde_json::json!({
"chat": { "type": "group" }
});
assert!(TelegramChannel::is_group_message(&group_msg));
let supergroup_msg = serde_json::json!({
"chat": { "type": "supergroup" }
});
assert!(TelegramChannel::is_group_message(&supergroup_msg));
let private_msg = serde_json::json!({
"chat": { "type": "private" }
});
assert!(!TelegramChannel::is_group_message(&private_msg));
}
#[test]
fn telegram_mention_only_enabled_by_config() {
let ch = TelegramChannel::new("token".into(), vec!["*".into()], true);
assert!(ch.mention_only);
let ch_disabled = TelegramChannel::new("token".into(), vec!["*".into()], false);
assert!(!ch_disabled.mention_only);
}
#[test]
fn telegram_split_code_block_at_boundary() {
let mut msg = String::new();
msg.push_str("```python\n");
msg.push_str(&"x".repeat(4085));
msg.push_str("\n```\nMore text after code block");
let parts = split_message_for_telegram(&msg);
assert!(
parts.len() >= 2,
"code block spanning boundary should split"
);
for part in &parts {
assert!(
part.len() <= TELEGRAM_MAX_MESSAGE_LENGTH,
"each part must be <= {TELEGRAM_MAX_MESSAGE_LENGTH}, got {}",
part.len()
);
}
}
#[test]
fn telegram_split_single_long_word() {
let long_word = "a".repeat(5000);
let parts = split_message_for_telegram(&long_word);
assert!(parts.len() >= 2, "word exceeding limit must be split");
for part in &parts {
assert!(
part.len() <= TELEGRAM_MAX_MESSAGE_LENGTH,
"hard-split part must be <= {TELEGRAM_MAX_MESSAGE_LENGTH}, got {}",
part.len()
);
}
let reassembled: String = parts.join("");
assert_eq!(reassembled, long_word);
}
#[test]
fn telegram_split_exactly_at_limit_no_split() {
let msg = "a".repeat(TELEGRAM_MAX_MESSAGE_LENGTH);
let parts = split_message_for_telegram(&msg);
assert_eq!(parts.len(), 1, "message exactly at limit should not split");
}
#[test]
fn telegram_split_one_over_limit() {
let msg = "a".repeat(TELEGRAM_MAX_MESSAGE_LENGTH + 1);
let parts = split_message_for_telegram(&msg);
assert!(parts.len() >= 2, "message 1 char over limit must split");
}
#[test]
fn telegram_split_many_short_lines() {
let msg: String = (0..1000).map(|i| format!("line {i}\n")).collect();
let parts = split_message_for_telegram(&msg);
for part in &parts {
assert!(
part.len() <= TELEGRAM_MAX_MESSAGE_LENGTH,
"short-line batch must be <= limit"
);
}
}
#[test]
fn telegram_split_only_whitespace() {
let msg = " \n\n\t ";
let parts = split_message_for_telegram(msg);
assert!(parts.len() <= 1);
}
#[test]
fn telegram_split_emoji_at_boundary() {
let mut msg = "a".repeat(4094);
msg.push_str("🎉🎊"); let parts = split_message_for_telegram(&msg);
for part in &parts {
assert!(
part.chars().count() <= TELEGRAM_MAX_MESSAGE_LENGTH,
"emoji boundary split must respect limit"
);
}
}
#[test]
fn telegram_split_consecutive_newlines() {
let mut msg = "a".repeat(4090);
msg.push_str("\n\n\n\n\n\n");
msg.push_str(&"b".repeat(100));
let parts = split_message_for_telegram(&msg);
for part in &parts {
assert!(part.len() <= TELEGRAM_MAX_MESSAGE_LENGTH);
}
}
#[test]
fn parse_voice_metadata_extracts_voice() {
let msg = serde_json::json!({
"voice": {
"file_id": "abc123",
"duration": 5
}
});
let (file_id, dur) = TelegramChannel::parse_voice_metadata(&msg).unwrap();
assert_eq!(file_id, "abc123");
assert_eq!(dur, 5);
}
#[test]
fn parse_voice_metadata_extracts_audio() {
let msg = serde_json::json!({
"audio": {
"file_id": "audio456",
"duration": 30
}
});
let (file_id, dur) = TelegramChannel::parse_voice_metadata(&msg).unwrap();
assert_eq!(file_id, "audio456");
assert_eq!(dur, 30);
}
#[test]
fn parse_voice_metadata_returns_none_for_text() {
let msg = serde_json::json!({
"text": "hello"
});
assert!(TelegramChannel::parse_voice_metadata(&msg).is_none());
}
#[test]
fn parse_voice_metadata_defaults_duration_to_zero() {
let msg = serde_json::json!({
"voice": {
"file_id": "no_dur"
}
});
let (_, dur) = TelegramChannel::parse_voice_metadata(&msg).unwrap();
assert_eq!(dur, 0);
}
#[test]
fn extract_sender_info_with_username() {
let msg = serde_json::json!({
"from": { "id": 123, "username": "alice" }
});
let (username, sender_id, identity) = TelegramChannel::extract_sender_info(&msg);
assert_eq!(username, "alice");
assert_eq!(sender_id, Some("123".to_string()));
assert_eq!(identity, "alice");
}
#[test]
fn extract_sender_info_without_username() {
let msg = serde_json::json!({
"from": { "id": 42 }
});
let (username, sender_id, identity) = TelegramChannel::extract_sender_info(&msg);
assert_eq!(username, "unknown");
assert_eq!(sender_id, Some("42".to_string()));
assert_eq!(identity, "42");
}
#[test]
fn extract_reply_context_text_message() {
let ch = TelegramChannel::new("t".into(), vec!["*".into()], false);
let msg = serde_json::json!({
"reply_to_message": {
"from": { "username": "alice" },
"text": "Hello world"
}
});
let ctx = ch.extract_reply_context(&msg).unwrap();
assert_eq!(ctx, "> @alice:\n> Hello world");
}
#[test]
fn extract_reply_context_voice_message() {
let ch = TelegramChannel::new("t".into(), vec!["*".into()], false);
let msg = serde_json::json!({
"reply_to_message": {
"from": { "username": "bob" },
"voice": { "file_id": "abc", "duration": 5 }
}
});
let ctx = ch.extract_reply_context(&msg).unwrap();
assert_eq!(ctx, "> @bob:\n> [Voice message]");
}
#[test]
fn extract_reply_context_no_reply() {
let ch = TelegramChannel::new("t".into(), vec!["*".into()], false);
let msg = serde_json::json!({
"text": "just a regular message"
});
assert!(ch.extract_reply_context(&msg).is_none());
}
#[test]
fn extract_reply_context_no_username_uses_first_name() {
let ch = TelegramChannel::new("t".into(), vec!["*".into()], false);
let msg = serde_json::json!({
"reply_to_message": {
"from": { "id": 999, "first_name": "Charlie" },
"text": "Hi there"
}
});
let ctx = ch.extract_reply_context(&msg).unwrap();
assert_eq!(ctx, "> @Charlie:\n> Hi there");
}
#[test]
fn extract_reply_context_voice_with_cached_transcription() {
let ch = TelegramChannel::new("t".into(), vec!["*".into()], false);
ch.voice_transcriptions
.lock()
.insert("100:42".to_string(), "Hello from voice".to_string());
let msg = serde_json::json!({
"chat": { "id": 100 },
"reply_to_message": {
"message_id": 42,
"from": { "username": "bob" },
"voice": { "file_id": "abc", "duration": 5 }
}
});
let ctx = ch.extract_reply_context(&msg).unwrap();
assert_eq!(ctx, "> @bob:\n> [Voice] Hello from voice");
}
#[test]
fn parse_update_message_includes_reply_context() {
let ch = TelegramChannel::new("t".into(), vec!["*".into()], false);
let update = serde_json::json!({
"message": {
"message_id": 10,
"text": "translate this",
"from": { "id": 1, "username": "alice" },
"chat": { "id": 100, "type": "private" },
"reply_to_message": {
"from": { "username": "bot" },
"text": "Bonjour le monde"
}
}
});
let parsed = ch.parse_update_message(&update).unwrap();
assert!(
parsed.content.starts_with("> @bot:"),
"content should start with quote: {}",
parsed.content
);
assert!(
parsed.content.contains("translate this"),
"content should contain user text"
);
assert!(
parsed.content.contains("Bonjour le monde"),
"content should contain quoted text"
);
}
#[test]
fn with_transcription_sets_config_when_enabled() {
let mut tc = crate::config::TranscriptionConfig::default();
tc.enabled = true;
let ch =
TelegramChannel::new("token".into(), vec!["*".into()], false).with_transcription(tc);
assert!(ch.transcription.is_some());
}
#[test]
fn with_transcription_skips_when_disabled() {
let tc = crate::config::TranscriptionConfig::default(); let ch =
TelegramChannel::new("token".into(), vec!["*".into()], false).with_transcription(tc);
assert!(ch.transcription.is_none());
}
#[tokio::test]
async fn try_parse_voice_message_returns_none_when_transcription_disabled() {
let ch = TelegramChannel::new("token".into(), vec!["*".into()], false);
let update = serde_json::json!({
"message": {
"message_id": 1,
"voice": { "file_id": "voice_file", "duration": 4 },
"from": { "id": 123, "username": "alice" },
"chat": { "id": 456, "type": "private" }
}
});
let parsed = ch.try_parse_voice_message(&update).await;
assert!(parsed.is_none());
}
#[tokio::test]
async fn try_parse_voice_message_skips_when_duration_exceeds_limit() {
let mut tc = crate::config::TranscriptionConfig::default();
tc.enabled = true;
tc.max_duration_secs = 5;
let ch =
TelegramChannel::new("token".into(), vec!["*".into()], false).with_transcription(tc);
let update = serde_json::json!({
"message": {
"message_id": 2,
"voice": { "file_id": "voice_file", "duration": 30 },
"from": { "id": 123, "username": "alice" },
"chat": { "id": 456, "type": "private" }
}
});
let parsed = ch.try_parse_voice_message(&update).await;
assert!(parsed.is_none());
}
#[tokio::test]
async fn try_parse_voice_message_rejects_unauthorized_sender_before_download() {
let mut tc = crate::config::TranscriptionConfig::default();
tc.enabled = true;
tc.max_duration_secs = 120;
let ch = TelegramChannel::new("token".into(), vec!["alice".into()], false)
.with_transcription(tc);
let update = serde_json::json!({
"message": {
"message_id": 3,
"voice": { "file_id": "voice_file", "duration": 4 },
"from": { "id": 999, "username": "bob" },
"chat": { "id": 456, "type": "private" }
}
});
let parsed = ch.try_parse_voice_message(&update).await;
assert!(parsed.is_none());
assert!(ch.voice_transcriptions.lock().is_empty());
}
#[tokio::test]
#[ignore]
async fn e2e_live_voice_transcription_and_reply_cache() {
if std::env::var("GROQ_API_KEY").is_err() {
eprintln!("GROQ_API_KEY not set — skipping live voice transcription test");
return;
}
let fixture_path =
std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("tests/fixtures/hello.mp3");
let audio_data = std::fs::read(&fixture_path)
.unwrap_or_else(|e| panic!("Failed to read fixture {}: {e}", fixture_path.display()));
assert!(
audio_data.len() > 1000,
"fixture too small ({} bytes), likely corrupt",
audio_data.len()
);
let config = crate::config::TranscriptionConfig {
enabled: true,
..Default::default()
};
let transcript: String =
crate::channels::transcription::transcribe_audio(audio_data, "hello.mp3", &config)
.await
.expect("transcribe_audio should succeed with valid GROQ_API_KEY");
assert!(
transcript.to_lowercase().contains("hello"),
"expected transcription to contain 'hello', got: '{transcript}'"
);
let ch = TelegramChannel::new("test_token".into(), vec!["*".into()], false);
let chat_id: i64 = 12345;
let message_id: i64 = 67;
let cache_key = format!("{chat_id}:{message_id}");
ch.voice_transcriptions
.lock()
.insert(cache_key, transcript.clone());
let msg = serde_json::json!({
"chat": { "id": chat_id },
"reply_to_message": {
"message_id": message_id,
"from": { "username": "zeroclaw_user" },
"voice": { "file_id": "test_file", "duration": 1 }
}
});
let ctx = ch
.extract_reply_context(&msg)
.expect("extract_reply_context should return Some for voice reply");
assert!(
ctx.contains(&format!("[Voice] {transcript}")),
"expected cached transcription in reply context, got: {ctx}"
);
assert!(
!ctx.contains("[Voice message]"),
"context should use cached transcription, not fallback placeholder, got: {ctx}"
);
}
#[test]
fn parse_attachment_metadata_detects_document() {
let message = serde_json::json!({
"document": {
"file_id": "BQACAgIAAxk",
"file_name": "report.pdf",
"file_size": 12345
}
});
let att = TelegramChannel::parse_attachment_metadata(&message).unwrap();
assert_eq!(att.kind, IncomingAttachmentKind::Document);
assert_eq!(att.file_id, "BQACAgIAAxk");
assert_eq!(att.file_name.as_deref(), Some("report.pdf"));
assert_eq!(att.file_size, Some(12345));
assert!(att.caption.is_none());
}
#[test]
fn parse_attachment_metadata_detects_photo() {
let message = serde_json::json!({
"photo": [
{"file_id": "small_id", "file_size": 100, "width": 90, "height": 90},
{"file_id": "medium_id", "file_size": 500, "width": 320, "height": 320},
{"file_id": "large_id", "file_size": 2000, "width": 800, "height": 800}
]
});
let att = TelegramChannel::parse_attachment_metadata(&message).unwrap();
assert_eq!(att.kind, IncomingAttachmentKind::Photo);
assert_eq!(att.file_id, "large_id");
assert_eq!(att.file_size, Some(2000));
assert!(att.file_name.is_none());
}
#[test]
fn parse_attachment_metadata_extracts_caption() {
let doc_msg = serde_json::json!({
"document": {
"file_id": "doc_id",
"file_name": "data.csv"
},
"caption": "Monthly report"
});
let att = TelegramChannel::parse_attachment_metadata(&doc_msg).unwrap();
assert_eq!(att.caption.as_deref(), Some("Monthly report"));
let photo_msg = serde_json::json!({
"photo": [
{"file_id": "photo_id", "file_size": 1000}
],
"caption": "Look at this"
});
let att = TelegramChannel::parse_attachment_metadata(&photo_msg).unwrap();
assert_eq!(att.caption.as_deref(), Some("Look at this"));
}
#[test]
fn parse_attachment_metadata_document_without_optional_fields() {
let message = serde_json::json!({
"document": {
"file_id": "doc_no_name"
}
});
let att = TelegramChannel::parse_attachment_metadata(&message).unwrap();
assert_eq!(att.kind, IncomingAttachmentKind::Document);
assert_eq!(att.file_id, "doc_no_name");
assert!(att.file_name.is_none());
assert!(att.file_size.is_none());
assert!(att.caption.is_none());
}
#[test]
fn parse_attachment_metadata_returns_none_for_text() {
let message = serde_json::json!({
"text": "Hello world"
});
assert!(TelegramChannel::parse_attachment_metadata(&message).is_none());
}
#[test]
fn parse_attachment_metadata_returns_none_for_voice() {
let message = serde_json::json!({
"voice": {
"file_id": "voice_id",
"duration": 5
}
});
assert!(TelegramChannel::parse_attachment_metadata(&message).is_none());
}
#[test]
fn parse_attachment_metadata_empty_photo_array() {
let message = serde_json::json!({
"photo": []
});
assert!(TelegramChannel::parse_attachment_metadata(&message).is_none());
}
#[test]
fn with_workspace_dir_sets_field() {
let ch = TelegramChannel::new("fake-token".into(), vec!["*".into()], false)
.with_workspace_dir(std::path::PathBuf::from("/tmp/test_workspace"));
assert_eq!(
ch.workspace_dir.as_deref(),
Some(std::path::Path::new("/tmp/test_workspace"))
);
}
#[test]
fn telegram_max_file_download_bytes_is_20mb() {
assert_eq!(TELEGRAM_MAX_FILE_DOWNLOAD_BYTES, 20 * 1024 * 1024);
}
#[test]
fn attachment_photo_content_uses_image_marker() {
let local_path = std::path::Path::new("/tmp/workspace/photo_123_45.jpg");
let local_filename = "photo_123_45.jpg";
let content =
format_attachment_content(IncomingAttachmentKind::Photo, local_filename, local_path);
assert_eq!(content, "[IMAGE:/tmp/workspace/photo_123_45.jpg]");
assert!(content.starts_with("[IMAGE:"));
assert!(content.ends_with(']'));
}
#[test]
fn attachment_document_content_uses_document_label() {
let local_path = std::path::Path::new("/tmp/workspace/report.pdf");
let local_filename = "report.pdf";
let content =
format_attachment_content(IncomingAttachmentKind::Document, local_filename, local_path);
assert_eq!(content, "[Document: report.pdf] /tmp/workspace/report.pdf");
assert!(!content.contains("[IMAGE:"));
}
#[test]
fn markdown_file_never_produces_image_marker() {
let local_path = std::path::Path::new("/tmp/workspace/telegram_files/notes.md");
let local_filename = "notes.md";
let content =
format_attachment_content(IncomingAttachmentKind::Photo, local_filename, local_path);
assert!(
!content.contains("[IMAGE:"),
"markdown must not get [IMAGE:] marker: {content}"
);
assert!(content.starts_with("[Document:"));
let content_doc =
format_attachment_content(IncomingAttachmentKind::Document, local_filename, local_path);
assert!(
!content_doc.contains("[IMAGE:"),
"markdown document must not get [IMAGE:] marker: {content_doc}"
);
}
#[test]
fn non_image_photo_falls_back_to_document_format() {
for (filename, ext_path) in [
("file.md", "/tmp/ws/file.md"),
("file.txt", "/tmp/ws/file.txt"),
("file.pdf", "/tmp/ws/file.pdf"),
("file.csv", "/tmp/ws/file.csv"),
("file.json", "/tmp/ws/file.json"),
("file.zip", "/tmp/ws/file.zip"),
("file", "/tmp/ws/file"),
] {
let path = std::path::Path::new(ext_path);
let content = format_attachment_content(IncomingAttachmentKind::Photo, filename, path);
assert!(
!content.contains("[IMAGE:"),
"{filename}: non-image file should not get [IMAGE:] marker, got: {content}"
);
assert!(
content.starts_with("[Document:"),
"{filename}: should use [Document:] format, got: {content}"
);
}
}
#[test]
fn image_extensions_produce_image_marker() {
for ext in ["png", "jpg", "jpeg", "gif", "webp", "bmp"] {
let filename = format!("photo_1_2.{ext}");
let path_str = format!("/tmp/ws/{filename}");
let path = std::path::Path::new(&path_str);
let content = format_attachment_content(IncomingAttachmentKind::Photo, &filename, path);
assert!(
content.starts_with("[IMAGE:"),
"{ext}: image should get [IMAGE:] marker, got: {content}"
);
}
}
#[test]
fn markdown_attachment_not_detected_by_multimodal_image_markers() {
let content = format_attachment_content(
IncomingAttachmentKind::Photo,
"notes.md",
std::path::Path::new("/tmp/ws/notes.md"),
);
let messages = vec![crate::providers::ChatMessage::user(content)];
assert_eq!(
crate::multimodal::count_image_markers(&messages),
0,
"markdown file must not trigger image marker detection"
);
}
#[test]
fn is_image_extension_recognizes_images() {
assert!(is_image_extension(std::path::Path::new("photo.png")));
assert!(is_image_extension(std::path::Path::new("photo.jpg")));
assert!(is_image_extension(std::path::Path::new("photo.jpeg")));
assert!(is_image_extension(std::path::Path::new("photo.gif")));
assert!(is_image_extension(std::path::Path::new("photo.webp")));
assert!(is_image_extension(std::path::Path::new("photo.bmp")));
assert!(is_image_extension(std::path::Path::new("PHOTO.PNG")));
assert!(!is_image_extension(std::path::Path::new("file.md")));
assert!(!is_image_extension(std::path::Path::new("file.txt")));
assert!(!is_image_extension(std::path::Path::new("file.pdf")));
assert!(!is_image_extension(std::path::Path::new("file.csv")));
assert!(!is_image_extension(std::path::Path::new("file")));
}
#[test]
fn photo_image_marker_detected_by_multimodal() {
let photo_content = "[IMAGE:/tmp/workspace/photo_1_2.jpg]";
let messages = vec![crate::providers::ChatMessage::user(
photo_content.to_string(),
)];
let count = crate::multimodal::count_image_markers(&messages);
assert_eq!(
count, 1,
"multimodal should detect exactly one image marker"
);
}
#[test]
fn photo_image_marker_with_caption() {
let local_path = std::path::Path::new("/tmp/workspace/photo_1_2.jpg");
let mut content = format!("[IMAGE:{}]", local_path.display());
let caption = "Look at this screenshot";
use std::fmt::Write;
let _ = write!(content, "\n\n{caption}");
assert_eq!(
content,
"[IMAGE:/tmp/workspace/photo_1_2.jpg]\n\nLook at this screenshot"
);
let messages = vec![crate::providers::ChatMessage::user(content)];
assert_eq!(crate::multimodal::count_image_markers(&messages), 1);
}
#[test]
fn e2e_attachment_saves_file_and_formats_content() {
let workspace = tempfile::tempdir().expect("create temp workspace");
let doc_filename = "report.pdf";
let doc_path = workspace.path().join(doc_filename);
std::fs::write(&doc_path, b"%PDF-1.4 fake").expect("write doc fixture");
assert!(doc_path.exists(), "document file must exist on disk");
let doc_content =
format_attachment_content(IncomingAttachmentKind::Document, doc_filename, &doc_path);
assert!(
doc_content.starts_with("[Document: report.pdf]"),
"document label format mismatch: {doc_content}"
);
let doc_msgs = vec![crate::providers::ChatMessage::user(doc_content)];
assert_eq!(
crate::multimodal::count_image_markers(&doc_msgs),
0,
"document content must not contain image markers"
);
let photo_filename = "photo_99_1.jpg";
let photo_path = workspace.path().join(photo_filename);
let fixture =
std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("tests/fixtures/test_photo.jpg");
std::fs::copy(&fixture, &photo_path).expect("copy photo fixture");
assert!(photo_path.exists(), "photo file must exist on disk");
let photo_content =
format_attachment_content(IncomingAttachmentKind::Photo, photo_filename, &photo_path);
assert!(
photo_content.starts_with("[IMAGE:"),
"photo must use [IMAGE:] marker: {photo_content}"
);
assert!(
photo_content.ends_with(']'),
"photo marker must close with ]: {photo_content}"
);
let photo_msgs = vec![crate::providers::ChatMessage::user(photo_content.clone())];
assert_eq!(
crate::multimodal::count_image_markers(&photo_msgs),
1,
"multimodal must detect exactly one image marker in photo content"
);
let mut captioned = photo_content;
use std::fmt::Write;
let _ = write!(captioned, "\n\nCheck this out");
let cap_msgs = vec![crate::providers::ChatMessage::user(captioned.clone())];
assert_eq!(
crate::multimodal::count_image_markers(&cap_msgs),
1,
"caption must not break image marker detection"
);
assert!(
captioned.contains("Check this out"),
"caption text must be present in content"
);
let md_filename = "notes.md";
let md_path = workspace.path().join(md_filename);
std::fs::write(&md_path, b"# Hello\nSome markdown").expect("write md fixture");
let md_content =
format_attachment_content(IncomingAttachmentKind::Photo, md_filename, &md_path);
assert!(
!md_content.contains("[IMAGE:"),
"markdown must not get [IMAGE:] marker: {md_content}"
);
let md_msgs = vec![crate::providers::ChatMessage::user(md_content)];
assert_eq!(
crate::multimodal::count_image_markers(&md_msgs),
0,
"markdown file must not trigger image marker detection"
);
}
#[test]
fn groq_provider_rejects_photo_with_vision_error() {
use crate::providers::compatible::{AuthStyle, OpenAiCompatibleProvider};
use crate::providers::Provider;
let groq = OpenAiCompatibleProvider::new(
"Groq",
"https://api.groq.com/openai",
Some("fake_key"),
AuthStyle::Bearer,
);
assert!(
!groq.supports_vision(),
"Groq provider must not support vision"
);
let messages = vec![crate::providers::ChatMessage::user(
"[IMAGE:/tmp/photo.jpg]\n\nDescribe this image".to_string(),
)];
let marker_count = crate::multimodal::count_image_markers(&messages);
assert_eq!(marker_count, 1, "must detect image marker in photo content");
}
}