mod agent;
pub(crate) mod follow_up_question;
pub(crate) mod handler;
pub(crate) mod session_resolve;
pub use agent::TelegramAgent;
use std::collections::HashMap;
use teloxide::prelude::Bot;
use tokio::sync::{Mutex, oneshot};
use tokio_util::sync::CancellationToken;
use uuid::Uuid;
type PendingQuestion = (oneshot::Sender<String>, Vec<String>);
type PhotoEntry = (String, Option<String>);
type PhotoBufferKey = (i64, i64, String);
pub struct TelegramState {
bot: Mutex<Option<Bot>>,
owner_chat_id: Mutex<Option<i64>>,
bot_username: Mutex<Option<String>>,
session_chats: Mutex<HashMap<Uuid, i64>>,
chat_sessions: Mutex<HashMap<i64, Uuid>>,
pending_approvals: Mutex<HashMap<String, oneshot::Sender<(bool, bool)>>>,
pending_questions: Mutex<HashMap<String, PendingQuestion>>,
cancel_tokens: Mutex<HashMap<Uuid, CancellationToken>>,
photo_buffer: Mutex<HashMap<PhotoBufferKey, Vec<PhotoEntry>>>,
photo_debounce: Mutex<HashMap<PhotoBufferKey, CancellationToken>>,
}
impl Default for TelegramState {
fn default() -> Self {
Self::new()
}
}
impl TelegramState {
pub fn new() -> Self {
Self {
bot: Mutex::new(None),
owner_chat_id: Mutex::new(None),
bot_username: Mutex::new(None),
session_chats: Mutex::new(HashMap::new()),
chat_sessions: Mutex::new(HashMap::new()),
pending_approvals: Mutex::new(HashMap::new()),
pending_questions: Mutex::new(HashMap::new()),
cancel_tokens: Mutex::new(HashMap::new()),
photo_buffer: Mutex::new(HashMap::new()),
photo_debounce: Mutex::new(HashMap::new()),
}
}
pub async fn set_bot(&self, bot: Bot) {
*self.bot.lock().await = Some(bot);
}
pub async fn set_owner_chat_id(&self, chat_id: i64) {
*self.owner_chat_id.lock().await = Some(chat_id);
}
pub async fn bot(&self) -> Option<Bot> {
self.bot.lock().await.clone()
}
pub async fn owner_chat_id(&self) -> Option<i64> {
*self.owner_chat_id.lock().await
}
pub async fn set_bot_username(&self, username: String) {
*self.bot_username.lock().await = Some(username);
}
pub async fn bot_username(&self) -> Option<String> {
self.bot_username.lock().await.clone()
}
pub async fn is_connected(&self) -> bool {
self.bot.lock().await.is_some()
}
pub async fn register_session_chat(&self, session_id: Uuid, chat_id: i64) {
self.session_chats.lock().await.insert(session_id, chat_id);
self.chat_sessions.lock().await.insert(chat_id, session_id);
}
pub async fn session_chat(&self, session_id: Uuid) -> Option<i64> {
self.session_chats.lock().await.get(&session_id).copied()
}
pub async fn chat_session(&self, chat_id: i64) -> Option<Uuid> {
self.chat_sessions.lock().await.get(&chat_id).copied()
}
pub async fn register_pending_approval(&self, id: String, tx: oneshot::Sender<(bool, bool)>) {
self.pending_approvals.lock().await.insert(id, tx);
}
pub async fn resolve_pending_approval(&self, id: &str, approved: bool, always: bool) -> bool {
if let Some(tx) = self.pending_approvals.lock().await.remove(id) {
let _ = tx.send((approved, always));
true
} else {
false
}
}
pub async fn register_pending_question(
&self,
id: String,
tx: oneshot::Sender<String>,
options: Vec<String>,
) {
self.pending_questions
.lock()
.await
.insert(id, (tx, options));
}
pub async fn resolve_pending_question(&self, id: &str, idx: usize) -> Option<String> {
let entry = self.pending_questions.lock().await.remove(id);
let (tx, options) = entry?;
let answer = options.get(idx)?.clone();
let _ = tx.send(answer.clone());
Some(answer)
}
pub async fn store_cancel_token(&self, session_id: Uuid, token: CancellationToken) {
let mut tokens = self.cancel_tokens.lock().await;
if let Some(old) = tokens.remove(&session_id) {
tracing::warn!(
"Telegram: cancelling previous in-flight agent call for session {}",
session_id
);
old.cancel();
}
tokens.insert(session_id, token);
}
pub async fn cancel_session(&self, session_id: Uuid) -> bool {
if let Some(token) = self.cancel_tokens.lock().await.remove(&session_id) {
token.cancel();
true
} else {
false
}
}
pub async fn remove_cancel_token(&self, session_id: Uuid) {
let mut tokens = self.cancel_tokens.lock().await;
if let Some(token) = tokens.get(&session_id)
&& token.is_cancelled()
{
tokens.remove(&session_id);
}
}
pub async fn buffer_photo(
&self,
chat_id: i64,
user_id: i64,
media_group_id: &str,
img_marker: String,
caption: Option<String>,
) -> usize {
let key = (chat_id, user_id, media_group_id.to_string());
let mut buffer = self.photo_buffer.lock().await;
buffer
.entry(key.clone())
.or_default()
.push((img_marker, caption));
buffer.get(&key).map(|v| v.len()).unwrap_or(0)
}
pub async fn reset_photo_debounce(
&self,
chat_id: i64,
user_id: i64,
media_group_id: &str,
) -> CancellationToken {
let key = (chat_id, user_id, media_group_id.to_string());
let token = CancellationToken::new();
let mut debounce = self.photo_debounce.lock().await;
if let Some(old) = debounce.remove(&key) {
old.cancel();
}
debounce.insert(key, token.clone());
token
}
pub async fn wait_photo_debounce(&self, token: CancellationToken) -> bool {
tokio::select! {
_ = token.cancelled() => false,
_ = tokio::time::sleep(std::time::Duration::from_secs(3)) => true,
}
}
pub async fn drain_photo_buffer(
&self,
chat_id: i64,
user_id: i64,
media_group_id: &str,
) -> Vec<(String, Option<String>)> {
let key = (chat_id, user_id, media_group_id.to_string());
let mut buffer = self.photo_buffer.lock().await;
buffer.remove(&key).unwrap_or_default()
}
pub async fn cleanup_photo_debounce(&self, chat_id: i64, user_id: i64, media_group_id: &str) {
let key = (chat_id, user_id, media_group_id.to_string());
self.photo_debounce.lock().await.remove(&key);
}
}