use crate::objects;
use crate::error::{ErrorKind, TelegramError};
use crate::file::File;
use std::{str, time::{Duration, Instant}, collections::HashMap, sync::Arc};
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::timer::Interval;
use hyper::{Body as Body2, Client, Request, Uri, header::CONTENT_TYPE, client::{HttpConnector, ResponseFuture}, rt::Stream};
use hyper_tls::HttpsConnector;
use hyper_multipart::client::multipart;
use serde_json::{self, value::Value};
use futures::{stream, Future, future::IntoFuture, sync::mpsc::{self, UnboundedSender}};
use failure::{Error, Fail, ResultExt};
use hyper_multipart_rfc7578::client::multipart::Body;
#[derive(Clone)]
pub struct RequestHandle {
key: String,
pub inner: Arc<hyper::Client<HttpsConnector<HttpConnector>, Body2>>
}
impl RequestHandle {
pub fn fetch_json(
&self,
func: &'static str,
msg: &str,
) -> impl Future<Item = String, Error = Error> {
debug!("Send JSON {}: {}", func, msg);
let request = self.build_json(func, String::from(msg)).unwrap();
_fetch(self.inner.request(request))
}
fn build_json(
&self,
func: &'static str,
msg: String,
) -> Result<Request<Body2>, Error> {
let url: Result<Uri, _> =
format!("https://api.telegram.org/bot{}/{}", self.key, func).parse();
let req = Request::post(url.context(ErrorKind::Uri)?)
.header(CONTENT_TYPE, "application/json")
.body(msg.into())
.context(ErrorKind::Hyper)?;
Ok(req)
}
pub fn fetch_formdata(
&self,
func: &'static str,
msg: &Value,
files: Vec<File>,
kind: &str,
) -> impl Future<Item = String, Error = Error> {
debug!("Send formdata {}: {}", func, msg.to_string());
let request = self.build_formdata(func, msg, files, kind).unwrap();
_fetch(self.inner.request(request))
}
fn build_formdata(
&self,
func: &'static str,
msg: &Value,
files: Vec<File>,
_kind: &str,
) -> Result<Request<Body2>,Error> {
let url: Result<Uri, _> =
format!("https://api.telegram.org/bot{}/{}", self.key, func).parse();
let mut req_builder = Request::post(url.context(ErrorKind::Uri)?);
let mut form = multipart::Form::default();
let msg = msg.as_object().ok_or(ErrorKind::JsonNotMap)?;
for (key, val) in msg.iter() {
let val = match val {
&Value::String(ref val) => format!("{}", val),
etc => format!("{}", etc),
};
form.add_text(key, val);
}
for file in files {
match file {
File::Memory { name, source } => {
form.add_reader_file(name.clone(), source, name);
}
File::Disk { path } => {
form.add_file(path.clone().file_name().unwrap().to_str().unwrap(), path).context(ErrorKind::NoFile)?;
},
_ => {}
}
}
let req = form.set_body_convert::<Body2, Body>(&mut req_builder).context(ErrorKind::Hyper)?;
Ok(req)
}
}
pub fn _fetch(fut_res: ResponseFuture) -> impl Future<Item = String, Error = Error> {
fut_res
.and_then(move |res| res.into_body().concat2())
.map_err(|e| Error::from(e.context(ErrorKind::Hyper)))
.and_then(move |response_chunks| {
let s = str::from_utf8(&response_chunks)?;
debug!("Got a result from telegram: {}", s);
let req = serde_json::from_str::<Value>(&s).context(ErrorKind::JsonParse)?;
let ok = req.get("ok")
.and_then(Value::as_bool)
.ok_or(ErrorKind::Json)?;
if ok {
if let Some(result) = req.get("result") {
return Ok(serde_json::to_string(result).context(ErrorKind::JsonSerialize)?);
}
}
let e = match req.get("description").and_then(Value::as_str) {
Some(err) => {
Error::from(TelegramError::new(err.into()).context(ErrorKind::Telegram))
}
None => Error::from(ErrorKind::Telegram),
};
Err(Error::from(e.context(ErrorKind::Telegram)))
})
}
#[derive(Clone)]
pub struct Bot {
pub request: RequestHandle,
key: String,
name: Option<String>,
update_interval: u64,
timeout: u64,
pub handlers: HashMap<String, UnboundedSender<(RequestHandle, objects::Message)>>,
pub unknown_handler: Option<UnboundedSender<(RequestHandle, objects::Message)>>,
pub callback_handler: Option<UnboundedSender<(RequestHandle, objects::CallbackQuery)>>,
pub inline_handler: Option<UnboundedSender<(RequestHandle, objects::InlineQuery)>>
}
impl Bot {
pub fn new(key: &str) -> Bot {
let client = Client::builder()
.keep_alive(true)
.keep_alive_timeout(None)
.build(HttpsConnector::new(4).unwrap());
Bot {
request: RequestHandle { inner: Arc::new(client), key: key.into() },
key: key.into(),
name: None,
update_interval: 2000,
timeout: 100,
handlers: HashMap::new(),
unknown_handler: None,
callback_handler: None,
inline_handler: None
}
}
pub fn update_interval(mut self, interval: u64) -> Bot {
self.update_interval = interval;
self
}
pub fn timeout(mut self, timeout: u64) -> Bot {
self.timeout = timeout;
self
}
pub fn new_cmd(
&mut self,
cmd: &str,
) -> impl Stream<Item = (RequestHandle, objects::Message), Error = Error> {
let (sender, receiver) = mpsc::unbounded();
let cmd = if cmd.starts_with("/") {
cmd.into()
} else {
format!("/{}", cmd)
};
self.handlers.insert(cmd.into(), sender);
receiver.map_err(|_| Error::from(ErrorKind::Channel))
}
pub fn unknown_cmd(&mut self) -> impl Stream<Item = (RequestHandle, objects::Message), Error = Error> {
let (sender, receiver) = mpsc::unbounded();
self.unknown_handler = Some(sender);
receiver.then(|x| x.map_err(|_| Error::from(ErrorKind::Channel)))
}
pub fn callback(&mut self) -> impl Stream<Item = (RequestHandle, objects::CallbackQuery), Error = Error> {
let (sender, receiver) = mpsc::unbounded();
self.callback_handler = Some(sender);
receiver.then(|x| x.map_err(|_| Error::from(ErrorKind::Channel)))
}
pub fn inline(&mut self) -> impl Stream<Item = (RequestHandle, objects::InlineQuery), Error = Error> {
let (sender, receiver) = mpsc::unbounded();
self.inline_handler = Some(sender);
receiver.then(|x| x.map_err(|_| Error::from(ErrorKind::Channel)))
}
pub fn resolve_name(&self) -> impl Future<Item = Option<String>, Error = Error> {
use crate::functions::FunctionGetMe;
let resolve_name = self.request.get_me().send()
.map(move |user| {
if let Some(name) = user.1.username {
return Some(format!("@{}", name));
} else {
return None;
}
});
resolve_name
}
pub fn process_updates(self, last_id: Arc<AtomicUsize>) -> impl Stream<Item = (RequestHandle, objects::Update), Error = Error> {
use crate::functions::FunctionGetUpdates;
self.request.get_updates()
.offset(last_id.load(Ordering::Relaxed) as i64)
.timeout(self.timeout as i64)
.send()
.into_stream()
.map(|(_, x)| {
stream::iter_result(
x.0
.into_iter()
.map(|x| Ok(x))
.collect::<Vec<Result<objects::Update, Error>>>(),
)
})
.flatten()
.and_then(move |x| {
if last_id.load(Ordering::Relaxed) < x.update_id as usize + 1 {
last_id.store(x.update_id as usize + 1, Ordering::Relaxed);
}
Ok(x)
})
.filter_map(move |mut val| {
debug!("Got an update from Telegram: {:?}", val);
if let Some(_) = val.callback_query {
if let Some(sender) = self.callback_handler.clone() {
sender
.unbounded_send((self.request.clone(), val.callback_query.unwrap()))
.unwrap_or_else(|e| error!("Error: {}", e));
return None;
}
}
if let Some(_) = val.inline_query {
if let Some(sender) = self.inline_handler.clone() {
sender
.unbounded_send((self.request.clone(), val.inline_query.unwrap()))
.unwrap_or_else(|e| error!("Error: {}", e));
return None;
}
}
let mut sndr: Option<UnboundedSender<(RequestHandle, objects::Message)>> = None;
if let Some(ref mut message) = val.message {
if let Some(text) = message.text.clone() {
let mut content = text.split_whitespace();
if let Some(mut cmd) = content.next() {
if cmd.starts_with("/") {
if let Some(name) = self.name.as_ref() {
if cmd.ends_with(name.as_str()) {
cmd = cmd.rsplitn(2, '@').skip(1).next().unwrap();
}
}
if let Some(sender) = self.handlers.get(cmd)
{
sndr = Some(sender.clone());
message.text = Some(content.collect::<Vec<&str>>().join(" "));
} else if let Some(ref sender) =
self.unknown_handler
{
sndr = Some(sender.clone());
}
}
}
}
}
if let Some(sender) = sndr {
sender
.unbounded_send((self.request.clone(), val.message.unwrap()))
.unwrap_or_else(|e| error!("Error: {}", e));
return None;
} else {
return Some((self.request.clone(), val));
}
})
}
pub fn get_stream(
mut self,
name: Option<String>
) -> impl Stream<Item = (RequestHandle, objects::Update), Error = Error> {
self.name = name;
let last_id = Arc::new(AtomicUsize::new(0));
let duration = Duration::from_millis(self.update_interval);
Interval::new(Instant::now(), duration)
.map_err(|x| Error::from(x.context(ErrorKind::IntervalTimer)))
.map(move |_| self.clone().process_updates(last_id.clone()))
.flatten()
}
pub fn into_future(&self) -> impl Future<Item = (), Error = Error> {
let bot = self.clone();
self.resolve_name()
.and_then(|name| bot.get_stream(name).for_each(|_| Ok(())))
.map(|_| ())
}
pub fn run_with<I>(self, other: I)
where
I: IntoFuture<Error = Error>,
<I as IntoFuture>::Future: Send + 'static,
<I as IntoFuture>::Item: Send
{
tokio::run(
self.into_future().join(other)
.map(|_| ())
.map_err(|e| {
eprintln!("Error: could not resolve the bot name!");
for (i, cause) in e.iter_causes().enumerate() {
println!(" => {}: {}", i, cause);
}
})
);
}
pub fn run(self) {
self.run_with(Ok(()));
}
}