use std::any::{Any, TypeId};
use std::collections::{HashMap, HashSet, VecDeque};
use std::fs;
use std::future::Future;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::{Arc, RwLock as StdRwLock};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use serde::Serialize;
use serde::de::DeserializeOwned;
use tokio::sync::{Mutex, RwLock, Semaphore, mpsc, oneshot};
use tokio::task::JoinSet;
use tokio::time::sleep;
#[cfg(feature = "axum")]
pub mod axum;
use crate::api::{
AdvancedService, BotService, ChatsService, FilesService, MessagesService, PaymentsService,
StickersService, UpdatesService,
};
use crate::client::{BootstrapOutcome, BootstrapPlan, BootstrapRetryPolicy, WebAppQueryPayload};
use crate::types::bot::User;
use crate::types::chat::{ChatAdministratorCapability, ChatMember, GetChatMemberRequest};
use crate::types::common::{ChatId, UserId};
use crate::types::message::{Chat, Message, MessageKind, SendMessageRequest, WriteAccessAllowed};
use crate::types::telegram::{
CallbackCodec, CallbackPayload, CallbackPayloadCodec, CompactCallbackCodec,
CompactCallbackPayload, WebAppData,
};
use crate::types::update::{GetUpdatesRequest, Update, UpdateKind};
use crate::types::webhook::{DeleteWebhookRequest, SetWebhookRequest};
use crate::{Client, Error, ErrorClass, Result};
type HandlerFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
type GuardFuture<'a> = Pin<Box<dyn Future<Output = HandlerResult> + Send + 'a>>;
type SessionFuture<'a, T> = Pin<Box<dyn Future<Output = Result<T>> + Send + 'a>>;
type SourceFuture<'a> = Pin<Box<dyn Future<Output = Result<Vec<Update>>> + Send + 'a>>;
pub type HandlerFn = Arc<dyn Fn(BotContext, Update) -> HandlerFuture + Send + Sync + 'static>;
pub type MiddlewareFn =
Arc<dyn Fn(BotContext, Update, HandlerFn) -> HandlerFuture + Send + Sync + 'static>;
type GuardFn =
Arc<dyn for<'a> Fn(&'a BotContext, &'a Update) -> GuardFuture<'a> + Send + Sync + 'static>;
pub type SourceErrorHook = Arc<dyn Fn(&Error) + Send + Sync + 'static>;
pub type AsyncSourceErrorHook = Arc<
dyn Fn(&Error) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> + Send + Sync + 'static,
>;
pub type HandlerErrorHook = Arc<dyn Fn(i64, &Error) + Send + Sync + 'static>;
pub type AsyncHandlerErrorHook = Arc<
dyn Fn(i64, &Error) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>
+ Send
+ Sync
+ 'static,
>;
pub type EngineEventHook = Arc<dyn Fn(&EngineEvent) + Send + Sync + 'static>;
pub type AsyncEngineEventHook = Arc<
dyn Fn(&EngineEvent) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>
+ Send
+ Sync
+ 'static,
>;
pub type EngineMetricHook = Arc<dyn Fn(&EngineMetric) + Send + Sync + 'static>;
pub type AsyncEngineMetricHook = Arc<
dyn Fn(&EngineMetric) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>
+ Send
+ Sync
+ 'static,
>;
#[derive(Clone, Debug, Eq, PartialEq)]
#[non_exhaustive]
pub enum EngineEvent {
PollStarted,
PollCompleted {
update_count: usize,
},
PollFailed {
classification: ErrorClass,
retryable: bool,
status: Option<u16>,
error_code: Option<i64>,
request_id: Option<String>,
message: String,
},
DispatchStarted {
update_id: i64,
},
UnknownKindsDetected {
update_id: i64,
update_kind: UpdateKind,
message_kind: Option<MessageKind>,
},
DispatchCompleted {
outcome: DispatchOutcome,
},
DispatchFailed {
update_id: i64,
classification: ErrorClass,
},
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum DispatchMetricOutcome {
Handled,
Ignored,
Failed,
}
#[derive(Clone, Debug, Eq, PartialEq)]
#[non_exhaustive]
pub enum EngineMetric {
PollLatency {
update_count: usize,
latency: Duration,
},
DispatchLatency {
update_id: i64,
outcome: DispatchMetricOutcome,
latency: Duration,
},
SourceError {
classification: ErrorClass,
retryable: bool,
streak: usize,
},
SourceBackoff {
streak: usize,
delay: Duration,
},
}
fn invalid_request(reason: impl Into<String>) -> Error {
Error::InvalidRequest {
reason: reason.into(),
}
}
async fn run_blocking_io<T, F>(task: F) -> Result<T>
where
T: Send + 'static,
F: FnOnce() -> Result<T> + Send + 'static,
{
tokio::task::spawn_blocking(task)
.await
.map_err(|error| invalid_request(format!("blocking I/O task failed: {error}")))?
}
mod app;
mod context;
mod context_app;
mod handler_error;
mod outbox;
mod request_state;
mod routing;
mod runtime;
mod session;
pub mod testing;
pub use app::*;
pub use context::*;
pub use context_app::*;
pub use handler_error::*;
pub use outbox::*;
pub use request_state::*;
pub use routing::*;
pub use runtime::*;
pub use session::*;
fn write_file_atomic(path: &Path, contents: &[u8], subject: &str) -> Result<()> {
let parent = path.parent().unwrap_or_else(|| Path::new("."));
fs::create_dir_all(parent).map_err(|source| {
invalid_request(format!(
"failed to create directory for {subject} `{}`: {source}",
parent.display()
))
})?;
let file_name = path
.file_name()
.and_then(|value| value.to_str())
.unwrap_or("snapshot");
let nonce = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_or(0_u128, |duration| duration.as_nanos());
let process_id = std::process::id();
for attempt in 0..16 {
let temp_path = parent.join(format!(".{file_name}.tmp-{process_id}-{nonce}-{attempt}"));
match fs::OpenOptions::new()
.create_new(true)
.write(true)
.open(&temp_path)
{
Ok(mut file) => {
let write_result = (|| -> Result<()> {
file.write_all(contents).map_err(|source| {
invalid_request(format!(
"failed to write temp file for {subject} `{}`: {source}",
temp_path.display()
))
})?;
file.sync_all().map_err(|source| {
invalid_request(format!(
"failed to sync temp file for {subject} `{}`: {source}",
temp_path.display()
))
})?;
Ok(())
})();
if let Err(error) = write_result {
let _ = fs::remove_file(&temp_path);
return Err(error);
}
fs::rename(&temp_path, path).map_err(|source| {
let _ = fs::remove_file(&temp_path);
invalid_request(format!(
"failed to replace {subject} `{}` atomically: {source}",
path.display()
))
})?;
return Ok(());
}
Err(source) if source.kind() == std::io::ErrorKind::AlreadyExists => {}
Err(source) => {
return Err(invalid_request(format!(
"failed to create temp file for {subject} `{}`: {source}",
temp_path.display()
)));
}
}
}
Err(invalid_request(format!(
"failed to allocate unique temp file for {subject} `{}`",
path.display()
)))
}
fn exponential_backoff(base: Duration, max: Duration, attempt: usize) -> Duration {
let exponent = attempt.saturating_sub(1).min(16);
let factor = 2u32.saturating_pow(exponent as u32);
let delay = base.saturating_mul(factor);
delay.min(max)
}
fn jitter_duration(delay: Duration, jitter_ratio: f32) -> Duration {
if delay.is_zero() || jitter_ratio <= 0.0 {
return delay;
}
let ratio = f64::from(jitter_ratio.clamp(0.0, 1.0));
let now_nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_or(0_u128, |duration| duration.as_nanos());
let unit = (now_nanos % 10_000) as f64 / 10_000.0;
let multiplier = (1.0 - ratio) + (2.0 * ratio * unit);
Duration::from_secs_f64(delay.as_secs_f64() * multiplier)
}