#[macro_use]
extern crate serde_derive;
#[macro_use]
extern crate log;
pub use slack_api::sync as api;
pub mod error;
pub use crate::error::Error;
pub use crate::api::{Channel, Group, Im, Message, Team, User};
mod events;
pub use crate::events::Event;
use crate::events::{MessageError, MessageSent};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc;
use std::sync::Arc;
pub trait EventHandler {
fn on_event(&mut self, cli: &RtmClient, event: Event);
fn on_close(&mut self, cli: &RtmClient);
fn on_connect(&mut self, cli: &RtmClient);
}
#[derive(Debug)]
enum WsMessage {
Close,
Text(String),
}
pub struct RtmClient {
start_response: api::rtm::StartResponse,
sender: Sender,
rx: mpsc::Receiver<WsMessage>,
}
#[derive(Clone)]
pub struct Sender {
tx: mpsc::Sender<WsMessage>,
msg_num: Arc<AtomicUsize>,
}
impl Sender {
pub fn get_msg_uid(&self) -> usize {
self.msg_num.fetch_add(1, Ordering::SeqCst)
}
pub fn send(&self, raw: &str) -> Result<(), Error> {
self.tx
.send(WsMessage::Text(raw.to_string()))
.map_err(|err| Error::Internal(format!("{}", err)))?;
Ok(())
}
pub fn send_message(&self, channel_id: &str, msg: &str) -> Result<usize, Error> {
let n = self.get_msg_uid();
let msg_json = serde_json::to_string(&msg)?;
let mstr = format!(
r#"{{"id": {},"type": "message", "channel": "{}","text": "{}"}}"#,
n,
channel_id,
&msg_json[1..msg_json.len() - 1]
);
self.send(&mstr[..])
.map_err(|err| Error::Internal(format!("{}", err)))?;
Ok(n)
}
pub fn send_typing(&self, channel_id: &str) -> Result<usize, Error> {
let n = self.get_msg_uid();
let mstr = format!(
r#"{{"id": {}, "type": "typing", "channel": "{}"}}"#,
n, channel_id
);
self.send(&mstr)
.map_err(|err| Error::Internal(format!("{:?}", err)))?;
Ok(n)
}
pub fn subscribe_presence(&self, user_list: &[&str]) -> Result<usize, Error> {
let n = self.get_msg_uid();
let mstr = format!(r#"{{"type": "presence_sub", "ids": {:?}}}"#, user_list);
self.send(&mstr)
.map_err(|err| Error::Internal(format!("{:?}", err)))?;
Ok(n)
}
pub fn shutdown(&self) -> Result<(), Error> {
self.tx
.send(WsMessage::Close)
.map_err(|_| Error::Internal("Error sending shutdown message".into()))
}
}
impl RtmClient {
pub fn login(token: &str) -> Result<RtmClient, Error> {
let client = api::default_client()?;
let start_response = api::rtm::start(&client, token, &Default::default())?;
let (tx, rx) = mpsc::channel::<WsMessage>();
let sender = Sender {
tx,
msg_num: Arc::new(AtomicUsize::new(0)),
};
Ok(RtmClient {
start_response,
sender,
rx,
})
}
pub fn run<T: EventHandler>(&self, handler: &mut T) -> Result<(), Error> {
let start_url = self
.start_response
.url
.as_ref()
.ok_or_else(|| Error::Api("Slack did not provide a URL".into()))?;
let wss_url = url::Url::parse_with_params(&start_url, &[("batch_presence_aware", "1")])?;
let (mut websocket, _resp) = tungstenite::client::connect(wss_url)?;
{
let socket = match *websocket.get_mut() {
tungstenite::stream::Stream::Plain(ref s) => s,
tungstenite::stream::Stream::Tls(ref mut t) => t.get_mut(),
};
socket.set_read_timeout(Some(std::time::Duration::from_secs(30)))?;
socket.set_write_timeout(Some(std::time::Duration::from_secs(25)))?;
}
handler.on_connect(self);
let mut prev_ = ::std::time::Instant::now();
loop {
loop {
match self.rx.try_recv() {
Ok(msg) => match msg {
WsMessage::Text(text) => {
websocket.write_message(tungstenite::Message::Text(text))?
}
WsMessage::Close => {
handler.on_close(self);
return websocket.close(None).map_err(|e| e.into());
}
},
Err(mpsc::TryRecvError::Disconnected) => {
handler.on_close(self);
return Err(Error::Internal("rx disconnected".into()));
}
Err(mpsc::TryRecvError::Empty) => break,
}
}
let message = match websocket.read_message() {
Err(e) => {
debug!("{:?}", e);
websocket.write_message(tungstenite::Message::Ping(vec![]))?;
continue;
}
Ok(m) => m,
};
let received = ::std::time::Instant::now();
{
let print_recieved = |var: &str| {
debug!(
"RTM WS {} recieved {:?} since last msg",
var,
received - prev_
);
};
match message {
tungstenite::Message::Text(text) => match Event::from_json(&text[..]) {
Ok(event) => handler.on_event(self, event),
Err(err) => {
info!(
"Unable to deserialize slack message, error: {}: json: {}",
err, text
);
}
},
tungstenite::Message::Binary(_) => print_recieved("Binary"),
tungstenite::Message::Ping(_) => print_recieved("Ping"),
tungstenite::Message::Pong(_) => print_recieved("Pong"),
tungstenite::Message::Close(_) => print_recieved("Close"),
}
}
prev_ = received;
}
}
pub fn login_and_run<T: EventHandler>(token: &str, handler: &mut T) -> Result<(), Error> {
let client = RtmClient::login(token)?;
client.run(handler)
}
pub fn sender(&self) -> &Sender {
&self.sender
}
pub fn start_response(&self) -> &api::rtm::StartResponse {
&self.start_response
}
}
impl Event {
fn from_json(s: &str) -> Result<Event, Error> {
match serde_json::from_str::<Event>(s) {
Ok(ev) => Ok(ev),
Err(e) => {
if let Ok(ev) = serde_json::from_str::<MessageSent>(s) {
Ok(Event::MessageSent(ev))
} else if let Ok(ev) = serde_json::from_str::<MessageError>(s) {
Ok(Event::MessageError(ev))
} else {
Err(e.into())
}
}
}
}
}