use super::{
APIConnector,
ClientBuilder,
Context,
EventHandlerFunc,
RawEventHandlerFunc,
UpdatesStream,
Webhook,
WebhookOptions,
};
use crate::{
api::{
types::{SetWebhook, UpdateType},
APIClient,
},
framework::Framework,
model::Update,
Result,
};
use futures::StreamExt;
use parking_lot::RwLock;
use std::sync::Arc;
use typemap_rev::TypeMap;
#[derive(Clone)]
pub struct Client {
pub api_client: Arc<Box<APIConnector>>,
pub data: Arc<RwLock<TypeMap>>,
pub(super) event_handlers: Vec<EventHandlerFunc>,
pub(super) raw_event_handlers: Vec<RawEventHandlerFunc>,
pub(super) framework: Option<Arc<Framework>>,
pub(super) webhook_opts: Option<WebhookOptions>,
pub allowed_updates: Vec<UpdateType>,
}
impl Client {
pub fn new(token: impl ToString) -> Self {
Self {
api_client: Arc::new(Box::new(APIClient::new(None, token))),
event_handlers: Vec::new(),
raw_event_handlers: Vec::new(),
data: Arc::new(RwLock::new(TypeMap::custom())),
framework: None,
webhook_opts: None,
allowed_updates: Vec::new(),
}
}
pub fn with_framework(fr: Arc<Framework>, token: impl ToString) -> Self {
Self {
api_client: Arc::new(Box::new(APIClient::new(None, token))),
event_handlers: Vec::new(),
raw_event_handlers: Vec::new(),
data: Arc::new(RwLock::new(TypeMap::custom())),
webhook_opts: None,
framework: Some(fr),
allowed_updates: Vec::new(),
}
}
pub fn builder() -> ClientBuilder {
ClientBuilder::new()
}
pub async fn start(&self) -> Result<()> {
if let Some(opts) = &self.webhook_opts {
self.start_with_webhook(opts).await
} else {
let mut stream = UpdatesStream::new(self.api_client.clone());
stream.set_allowed_updates(self.allowed_updates.clone());
self.start_with_stream(&mut stream).await
}
}
pub async fn start_with_stream(&self, stream: &mut UpdatesStream) -> Result<()> {
if let Some(fr) = self.framework.clone() {
self.api_client
.set_my_commands(fr.get_commands().into())
.await?;
}
log::info!("starting long polling to listen for updates from telegram api");
while let Some(poll) = stream.next().await {
match poll {
Ok(update) => {
self.fire_handlers(update);
},
Err(err) => return Err(err),
}
}
Ok(())
}
pub async fn start_with_webhook(&self, opts: &WebhookOptions) -> Result<()> {
if let Some(fr) = self.framework.clone() {
self.api_client
.set_my_commands(fr.get_commands().into())
.await?;
}
if let Some(webhook_url) = &opts.url {
self.api_client
.set_webhook(SetWebhook {
url: webhook_url.to_string(),
certificate: None,
max_connections: None,
allowed_updates: Some(self.allowed_updates.clone()),
drop_pending_updates: None,
ip_address: None, secret_token: opts.secret_token.clone(),
})
.await?;
}
log::info!("starting to listen on the webhook");
let mut receiver = Webhook::new(opts).start();
while let Some(u) = receiver.recv().await {
match u {
Ok(update) => {
self.fire_handlers(update);
},
Err(err) => return Err(err),
}
}
Ok(())
}
pub fn subscribe_handler_func(&mut self, handler: EventHandlerFunc) {
self.event_handlers.push(handler);
}
pub fn subscribe_raw_handler(&mut self, handler: RawEventHandlerFunc) {
self.raw_event_handlers.push(handler);
}
#[doc(hidden)]
pub fn fire_handlers(&self, update: Update) {
for h in self.raw_event_handlers.clone() {
let ctx = Context::new(self.api_client.clone(), self.data.clone());
let u = update.clone();
tokio::spawn(h(ctx, u.into()));
}
for h in self.event_handlers.clone() {
let ctx = Context::new(self.api_client.clone(), self.data.clone());
let u = update.clone();
tokio::spawn(h(ctx, u));
}
if self.framework.is_some() {
let ctx = Context::new(self.api_client.clone(), self.data.clone());
let fr = self.framework.clone();
fr.as_ref()
.expect("Framework needs to be set before trying to fire commands")
.fire_commands(ctx, update);
}
}
}
impl From<Box<APIConnector>> for Client {
fn from(api: Box<APIConnector>) -> Self {
Self {
api_client: Arc::new(api),
event_handlers: Vec::new(),
raw_event_handlers: Vec::new(),
data: Arc::new(RwLock::new(TypeMap::custom())),
framework: None,
webhook_opts: None,
allowed_updates: Vec::new(),
}
}
}