use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use log::{error, info, warn};
use crate::Attachment;
use crate::Message;
use crate::SendMessageParamsBuilder;
use crate::api::messages::EditMessageParams;
use crate::attachments::InlineKeyboard;
use crate::client::MaxClient;
use crate::error::Result;
use crate::types::{NewMessageBody, Update};
#[derive(Clone)]
pub struct Context {
pub(crate) bot: Arc<MaxClient>,
pub update: Update,
}
impl Context {
pub fn new(bot: Arc<MaxClient>, update: Update) -> Self {
Self { bot, update }
}
pub fn bot(&self) -> &MaxClient {
&self.bot
}
pub fn message(&self) -> Option<&Message> {
match &self.update {
Update::MessageCreated { message, .. } => Some(message),
Update::MessageEdited { message, .. } => Some(message),
_ => None,
}
}
pub fn chat_id(&self) -> Option<i64> {
match &self.update {
Update::MessageCreated { message, .. } => message.recipient.chat_id,
Update::MessageEdited { message, .. } => message.recipient.chat_id,
Update::MessageCallback { message, .. } => message.as_ref().and_then(|m| m.recipient.chat_id),
Update::BotStarted { chat_id, .. } => Some(*chat_id),
Update::BotAdded { chat_id, .. } => Some(*chat_id),
Update::BotRemoved { chat_id, .. } => Some(*chat_id),
Update::UserAdded { chat_id, .. } => Some(*chat_id),
Update::UserRemoved { chat_id, .. } => Some(*chat_id),
Update::ChatTitleChanged { chat_id, .. } => Some(*chat_id),
_ => None,
}
}
pub fn user_id(&self) -> Option<i64> {
match &self.update {
Update::MessageCreated { message, .. } => message.sender.as_ref().map(|u| u.user_id),
Update::MessageEdited { message, .. } => message.sender.as_ref().map(|u| u.user_id),
Update::MessageCallback { callback, .. } => Some(callback.user.user_id),
Update::BotStarted { user, .. } => Some(user.user_id),
Update::BotAdded { user, .. } => Some(user.user_id),
Update::BotRemoved { user, .. } => Some(user.user_id),
Update::UserAdded { user, .. } => Some(user.user_id),
Update::UserRemoved { user, .. } => Some(user.user_id),
Update::ChatTitleChanged { user, .. } => Some(user.user_id),
_ => None,
}
}
pub fn text(&self) -> Option<&str> {
match &self.update {
Update::MessageCreated { message, .. } => message.body.as_ref().and_then(|b| b.text.as_deref()),
Update::MessageEdited { message, .. } => message.body.as_ref().and_then(|b| b.text.as_deref()),
_ => None,
}
}
pub async fn reply_text(&self, text: &str) -> Result<Vec<String>> {
let chat_id = self.chat_id().ok_or_else(|| {
crate::error::Error::InvalidInput("Cannot reply: update has no chat_id".into())
})?;
let params = SendMessageParamsBuilder::new()
.text(text)
.chat_id(chat_id)
.build();
self.bot.send_message(params).await
}
pub async fn reply_markdown(&self, text: &str) -> Result<Vec<String>> {
let chat_id = self.chat_id().ok_or_else(|| {
crate::error::Error::InvalidInput("Cannot reply: update has no chat_id".into())
})?;
let params = SendMessageParamsBuilder::new()
.text(text)
.chat_id(chat_id)
.format_markdown()
.build();
self.bot.send_message(params).await
}
pub async fn answer_callback(
&self,
notification: Option<&str>,
new_message: Option<crate::api::messages::SendMessageParams>,
) -> Result<bool> {
match &self.update {
Update::MessageCallback { callback, .. } => {
let message_json = if let Some(params) = new_message {
let mut body = serde_json::Map::new();
if !params.text.is_empty() {
body.insert("text".into(), params.text.into());
}
if let Some(fmt) = params.format {
body.insert("format".into(), fmt.into());
}
if let Some(disable) = params.disable_link_preview {
body.insert("disable_link_preview".into(), disable.into());
}
if !params.attachments.is_empty() {
let attachments_json = crate::api::messages::prepare_attachments_json(&self.bot, ¶ms.attachments).await?;
body.insert("attachments".into(), attachments_json);
}
Some(serde_json::Value::Object(body))
} else {
None
};
self.bot
.answer_callback_query(&callback.callback_id, message_json, notification)
.await
}
_ => Err(crate::error::Error::InvalidInput(
"answer_callback only works on MessageCallback updates".into(),
)),
}
}
pub async fn answer_callback_raw(
&self,
notification: Option<&str>,
message_json: Option<serde_json::Value>,
) -> Result<bool> {
match &self.update {
Update::MessageCallback { callback, .. } => {
self.bot
.answer_callback_query(&callback.callback_id, message_json, notification)
.await
}
_ => Err(crate::error::Error::InvalidInput(
"answer_callback_raw only works on MessageCallback updates".into(),
)),
}
}
pub async fn edit_text(&self, new_text: &str) -> Result<()> {
let message = self
.message()
.ok_or_else(|| crate::error::Error::InvalidInput(
"edit_text can only be used on MessageCreated or MessageEdited updates".into()
))?;
let message_id = message.message_id().ok_or_else(|| {
crate::error::Error::InvalidInput("Message has no body or mid".into())
})?;
let params = EditMessageParams {
text: Some(new_text.to_string()),
..Default::default()
};
self.bot.edit_message(message_id, params).await
}
pub async fn edit_keyboard(&self, keyboard: InlineKeyboard) -> Result<()> {
let message = self
.message()
.ok_or_else(|| crate::error::Error::InvalidInput(
"edit_keyboard can only be used on MessageCreated or MessageEdited updates".into()
))?;
let message_id = message.message_id().ok_or_else(|| {
crate::error::Error::InvalidInput("Message has no body or mid".into())
})?;
let params = EditMessageParams {
inline_keyboard: Some(keyboard),
..Default::default()
};
self.bot.edit_message(message_id, params).await
}
pub async fn edit_message(&self, params: EditMessageParams) -> Result<()> {
let message = self
.message()
.ok_or_else(|| crate::error::Error::InvalidInput(
"edit_message can only be used on MessageCreated or MessageEdited updates".into()
))?;
let message_id = message.message_id().ok_or_else(|| {
crate::error::Error::InvalidInput("Message has no body or mid".into())
})?;
self.bot.edit_message(message_id, params).await
}
pub async fn answer_callback_notification(&self, notification: &str) -> Result<bool> {
match &self.update {
Update::MessageCallback { callback, .. } => {
self.bot
.answer_callback_query(&callback.callback_id, None, Some(notification))
.await
}
_ => Err(crate::error::Error::InvalidInput(
"answer_callback_* only works on MessageCallback updates".into(),
)),
}
}
pub async fn answer_callback_replace(&self, message: NewMessageBody) -> Result<bool> {
match &self.update {
Update::MessageCallback { callback, .. } => {
let value = serde_json::to_value(message).map_err(crate::error::Error::Json)?;
self.bot
.answer_callback_query(&callback.callback_id, Some(value), None)
.await
}
_ => Err(crate::error::Error::InvalidInput(
"answer_callback_* only works on MessageCallback updates".into(),
)),
}
}
pub async fn answer_callback_edit_text(&self, new_text: &str) -> Result<bool> {
match &self.update {
Update::MessageCallback { callback, .. } => {
let new_message = serde_json::json!({ "text": new_text });
self.bot
.answer_callback_query(&callback.callback_id, Some(new_message), None)
.await
}
_ => Err(crate::error::Error::InvalidInput(
"answer_callback_* only works on MessageCallback updates".into(),
)),
}
}
pub async fn answer_callback_edit_keyboard(&self, keyboard: InlineKeyboard) -> Result<bool> {
match &self.update {
Update::MessageCallback { callback, .. } => {
let new_keyboard = Attachment::inline_keyboard(keyboard);
let attachments = vec![new_keyboard];
let new_message = serde_json::json!({ "attachments": attachments });
self.bot
.answer_callback_query(&callback.callback_id, Some(new_message), None)
.await
}
_ => Err(crate::error::Error::InvalidInput(
"answer_callback_* only works on MessageCallback updates".into(),
)),
}
}
}
#[derive(Clone)]
pub struct StartContext {
pub(crate) bot: Arc<MaxClient>,
}
impl StartContext {
pub fn new(bot: Arc<MaxClient>) -> Self {
Self { bot }
}
pub fn bot(&self) -> &MaxClient {
&self.bot
}
}
#[derive(Clone)]
pub struct ScheduledTaskContext {
pub(crate) bot: Arc<MaxClient>,
}
impl ScheduledTaskContext {
pub fn new(bot: Arc<MaxClient>) -> Self {
Self { bot }
}
pub fn bot(&self) -> &MaxClient {
&self.bot
}
}
pub type HandlerFn = Arc<
dyn Fn(Context) -> std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send>> + Send + Sync,
>;
pub type StartHandlerFn = Arc<
dyn Fn(StartContext) -> std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send>> + Send + Sync,
>;
pub type ScheduledTaskFn = Arc<
dyn Fn(ScheduledTaskContext) -> std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send>>
+ Send
+ Sync,
>;
fn make_handler<H, F>(handler: H) -> HandlerFn
where
H: Fn(Context) -> F + Send + Sync + 'static,
F: Future<Output = Result<()>> + Send + 'static,
{
Arc::new(move |ctx| Box::pin(handler(ctx)))
}
fn make_start_handler<H, F>(handler: H) -> StartHandlerFn
where
H: Fn(StartContext) -> F + Send + Sync + 'static,
F: Future<Output = Result<()>> + Send + 'static,
{
Arc::new(move |ctx| Box::pin(handler(ctx)))
}
fn make_scheduled_task<H, F>(handler: H) -> ScheduledTaskFn
where
H: Fn(ScheduledTaskContext) -> F + Send + Sync + 'static,
F: Future<Output = Result<()>> + Send + 'static,
{
Arc::new(move |ctx| Box::pin(handler(ctx)))
}
#[derive(Clone)]
pub enum Filter {
Any,
Message,
EditedMessage,
Callback,
BotStarted,
BotAdded,
Command(String),
CallbackPayload(String),
Custom(Arc<dyn Fn(&Update) -> bool + Send + Sync>),
}
impl Filter {
fn matches(&self, update: &Update) -> bool {
match self {
Filter::Any => true,
Filter::Message => matches!(update, Update::MessageCreated { .. }),
Filter::EditedMessage => matches!(update, Update::MessageEdited { .. }),
Filter::Callback => matches!(update, Update::MessageCallback { .. }),
Filter::BotStarted => matches!(update, Update::BotStarted { .. }),
Filter::BotAdded => matches!(update, Update::BotAdded { .. }),
Filter::Command(cmd) => {
if let Update::MessageCreated { message, .. } = update {
message
.body
.as_ref()
.and_then(|b| b.text.as_deref())
.map(|t| t.starts_with(cmd))
.unwrap_or(false)
} else {
false
}
}
Filter::CallbackPayload(payload) => {
if let Update::MessageCallback { callback, .. } = update {
callback.payload == *payload
} else {
false
}
}
Filter::Custom(pred) => pred(update),
}
}
}
pub struct Dispatcher {
bot: Arc<MaxClient>,
handlers: Vec<(Filter, HandlerFn)>,
error_handler: Option<Arc<dyn Fn(crate::error::Error) + Send + Sync>>,
poll_timeout: u32,
poll_limit: u32,
start_handlers: Vec<StartHandlerFn>,
scheduled_tasks: Vec<(Duration, ScheduledTaskFn)>,
}
impl Dispatcher {
pub fn new(bot: MaxClient) -> Self {
Self {
bot: Arc::new(bot),
handlers: Vec::new(),
error_handler: None,
poll_timeout: 30,
poll_limit: 100,
start_handlers: Vec::new(),
scheduled_tasks: Vec::new(),
}
}
pub fn on_error<F>(mut self, f: F) -> Self
where
F: Fn(crate::error::Error) + Send + Sync + 'static,
{
self.error_handler = Some(Arc::new(f));
self
}
pub fn poll_timeout(mut self, secs: u32) -> Self {
self.poll_timeout = secs;
self
}
pub fn poll_limit(mut self, limit: u32) -> Self {
self.poll_limit = limit;
self
}
pub fn on<H, F>(&mut self, handler: H) -> &mut Self
where
H: Fn(Context) -> F + Send + Sync + 'static,
F: Future<Output = Result<()>> + Send + 'static,
{
self.handlers.push((Filter::Any, make_handler(handler)));
self
}
pub fn on_message<H, F>(&mut self, handler: H) -> &mut Self
where
H: Fn(Context) -> F + Send + Sync + 'static,
F: Future<Output = Result<()>> + Send + 'static,
{
self.handlers.push((Filter::Message, make_handler(handler)));
self
}
pub fn on_edited_message<H, F>(&mut self, handler: H) -> &mut Self
where
H: Fn(Context) -> F + Send + Sync + 'static,
F: Future<Output = Result<()>> + Send + 'static,
{
self.handlers.push((Filter::EditedMessage, make_handler(handler)));
self
}
pub fn on_callback<H, F>(&mut self, handler: H) -> &mut Self
where
H: Fn(Context) -> F + Send + Sync + 'static,
F: Future<Output = Result<()>> + Send + 'static,
{
self.handlers.push((Filter::Callback, make_handler(handler)));
self
}
pub fn on_bot_started<H, F>(&mut self, handler: H) -> &mut Self
where
H: Fn(Context) -> F + Send + Sync + 'static,
F: Future<Output = Result<()>> + Send + 'static,
{
self.handlers.push((Filter::BotStarted, make_handler(handler)));
self
}
pub fn on_command<H, F>(&mut self, command: impl Into<String>, handler: H) -> &mut Self
where
H: Fn(Context) -> F + Send + Sync + 'static,
F: Future<Output = Result<()>> + Send + 'static,
{
self.handlers
.push((Filter::Command(command.into()), make_handler(handler)));
self
}
pub fn on_callback_payload<H, F>(&mut self, payload: impl Into<String>, handler: H) -> &mut Self
where
H: Fn(Context) -> F + Send + Sync + 'static,
F: Future<Output = Result<()>> + Send + 'static,
{
self.handlers.push((
Filter::CallbackPayload(payload.into()),
make_handler(handler),
));
self
}
pub fn on_filter<P, H, F>(&mut self, predicate: P, handler: H) -> &mut Self
where
P: Fn(&Update) -> bool + Send + Sync + 'static,
H: Fn(Context) -> F + Send + Sync + 'static,
F: Future<Output = Result<()>> + Send + 'static,
{
self.handlers
.push((Filter::Custom(Arc::new(predicate)), make_handler(handler)));
self
}
pub fn on_start<H, F>(&mut self, handler: H) -> &mut Self
where
H: Fn(StartContext) -> F + Send + Sync + 'static,
F: Future<Output = Result<()>> + Send + 'static,
{
self.start_handlers.push(make_start_handler(handler));
self
}
pub fn task<H, F>(&mut self, interval: Duration, handler: H) -> &mut Self
where
H: Fn(ScheduledTaskContext) -> F + Send + Sync + 'static,
F: Future<Output = Result<()>> + Send + 'static,
{
self.scheduled_tasks
.push((interval, make_scheduled_task(handler)));
self
}
pub async fn dispatch(&self, update: Update) {
for (filter, handler) in &self.handlers {
if filter.matches(&update) {
let ctx = Context::new(self.bot.clone(), update);
if let Err(e) = handler(ctx).await {
self.handle_error(e);
}
break;
}
}
}
fn handle_error(&self, error: crate::error::Error) {
if let Some(eh) = &self.error_handler {
eh(error);
} else {
error!("Handler error: {}", error);
}
}
async fn run_start_handlers(&self) {
for handler in &self.start_handlers {
let ctx = StartContext::new(self.bot.clone());
if let Err(e) = handler(ctx).await {
self.handle_error(e);
}
}
}
fn spawn_scheduled_tasks(&self) {
for (interval, handler) in &self.scheduled_tasks {
let interval = *interval;
let handler = handler.clone();
let bot = self.bot.clone();
let error_handler = self.error_handler.clone();
tokio::spawn(async move {
loop {
tokio::time::sleep(interval).await;
let ctx = ScheduledTaskContext::new(bot.clone());
if let Err(e) = handler(ctx).await {
if let Some(eh) = &error_handler {
eh(e);
} else {
error!("Scheduled task error: {}", e);
}
}
}
});
}
}
pub async fn start_polling(self) {
let me = match self.bot.get_me().await {
Ok(me) => me,
Err(e) => {
error!("Failed to fetch bot info: {}", e);
return;
}
};
info!(
"Bot @{} started (long polling)",
me.user.username.as_deref().unwrap_or("unknown")
);
self.run_start_handlers().await;
self.spawn_scheduled_tasks();
let mut marker = None;
loop {
let params = crate::api::updates::GetUpdatesParams {
marker,
limit: Some(self.poll_limit),
timeout: Some(self.poll_timeout),
types: vec![], };
match self.bot.get_updates(params).await {
Ok((updates, new_marker)) => {
marker = new_marker;
for update in updates {
self.dispatch(update).await;
}
}
Err(e) => {
warn!("Polling error: {} — retrying in 5s", e);
tokio::time::sleep(Duration::from_secs(5)).await;
}
}
}
}
pub async fn dispatch_raw(&self, raw: serde_json::Value) {
match serde_json::from_value::<Update>(raw) {
Ok(update) => self.dispatch(update).await,
Err(e) => error!("Failed to parse update JSON: {}", e),
}
}
}