#![deny(missing_docs)]
#![deny(clippy::all, clippy::pedantic, clippy::nursery)]
#![allow(clippy::use_self)]
#![allow(clippy::future_not_send)]
#![allow(clippy::missing_errors_doc)]
#![allow(clippy::option_if_let_else)]
#![cfg_attr(test, deny(warnings))]
#![doc(html_root_url = "https://docs.rs/webex/latest/webex/")]
extern crate lazy_static;
pub mod adaptive_card;
#[allow(missing_docs)]
pub mod error;
pub mod types;
pub use types::*;
pub mod auth;
use error::Error;
use crate::adaptive_card::AdaptiveCard;
use futures::{future::try_join_all, try_join};
use futures_util::{SinkExt, StreamExt};
use log::{debug, error, trace, warn};
use reqwest::StatusCode;
use serde::{de::DeserializeOwned, Serialize};
use std::{
collections::{hash_map::DefaultHasher, HashMap},
hash::{self, Hasher},
sync::Mutex,
time::Duration,
};
use tokio::net::TcpStream;
use tokio_tungstenite::{
connect_async,
tungstenite::{Error as TErr, Message as TMessage},
MaybeTlsStream, WebSocketStream,
};
const REST_HOST_PREFIX: &str = "https://api.ciscospark.com/v1";
const U2C_HOST_PREFIX: &str = "https://u2c.wbx2.com/u2c/api/v1";
const DEFAULT_REGISTRATION_HOST_PREFIX: &str = "https://wdm-a.wbx2.com/wdm/api/v1";
const CRATE_VERSION: &str = env!("CARGO_PKG_VERSION");
const DEFAULT_DEVICE_NAME: &str = "rust-client";
const DEVICE_SYSTEM_NAME: &str = "rust-spark-client";
pub type WStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
#[derive(Clone)]
#[must_use]
pub struct Webex {
id: u64,
client: RestClient,
token: String,
pub device: DeviceData,
}
pub struct WebexEventStream {
ws_stream: WStream,
timeout: Duration,
pub is_open: bool,
}
impl WebexEventStream {
pub async fn next(&mut self) -> Result<Event, Error> {
loop {
let next = self.ws_stream.next();
match tokio::time::timeout(self.timeout, next).await {
Err(_) => {
self.is_open = false;
return Err(format!("no activity for at least {:?}", self.timeout).into());
}
Ok(next_result) => match next_result {
None => continue,
Some(msg) => match msg {
Ok(msg) => {
if let Some(h_msg) = self.handle_message(msg)? {
return Ok(h_msg);
}
}
Err(TErr::Protocol(_) | TErr::Io(_)) => {
self.is_open = false;
return Err(msg.unwrap_err().to_string().into());
}
Err(e) => {
return Err(Error::Tungstenite(e, "Error getting next_result".into()))
}
},
},
}
}
}
fn handle_message(&mut self, msg: TMessage) -> Result<Option<Event>, Error> {
match msg {
TMessage::Binary(bytes) => {
let json = std::str::from_utf8(&bytes)?;
match serde_json::from_str(json) {
Ok(ev) => Ok(Some(ev)),
Err(e) => {
warn!("Couldn't deserialize: {:?}. Original JSON:\n{}", e, &json);
Err(e.into())
}
}
}
TMessage::Text(t) => {
debug!("text: {}", t);
Ok(None)
}
TMessage::Ping(_) => {
trace!("Ping!");
Ok(None)
}
TMessage::Close(t) => {
debug!("close: {:?}", t);
self.is_open = false;
Err(Error::Closed("Web Socket Closed".to_string()))
}
TMessage::Pong(_) => {
debug!("Pong!");
Ok(None)
}
TMessage::Frame(_) => {
debug!("Frame");
Ok(None)
}
}
}
pub(crate) async fn auth(ws_stream: &mut WStream, token: &str) -> Result<(), Error> {
let auth = types::Authorization::new(token);
debug!("Authenticating to stream");
match ws_stream
.send(TMessage::Text(serde_json::to_string(&auth).unwrap()))
.await
{
Ok(()) => {
match ws_stream.next().await {
Some(msg) => match msg {
Ok(msg) => match msg {
TMessage::Ping(_) | TMessage::Pong(_) => {
debug!("Authentication succeeded");
Ok(())
}
_ => Err(format!("Received {msg:?} in reply to auth message").into()),
},
Err(e) => Err(format!("Received error from websocket: {e}").into()),
},
None => Err("Websocket closed".to_string().into()),
}
}
Err(e) => Err(Error::Tungstenite(
e,
"failed to send authentication".to_string(),
)),
}
}
}
enum AuthorizationType<'a> {
None,
Bearer(&'a str),
Basic {
username: &'a str,
password: &'a str,
},
}
enum Body<T: Serialize> {
Json(T),
UrlEncoded(T),
}
const BODY_NONE: Option<Body<()>> = None;
#[derive(Clone)]
struct RestClient {
host_prefix: HashMap<String, String>,
web_client: reqwest::Client,
}
impl RestClient {
pub fn new() -> Self {
Self {
host_prefix: HashMap::new(),
web_client: reqwest::Client::new(),
}
}
async fn api_get<'a, T: DeserializeOwned>(
&self,
rest_method: &str,
params: Option<impl Serialize>,
auth: AuthorizationType<'a>,
) -> Result<T, Error> {
self.rest_api(reqwest::Method::GET, rest_method, auth, params, BODY_NONE)
.await
}
async fn api_delete<'a>(
&self,
rest_method: &str,
params: Option<impl Serialize>,
auth: AuthorizationType<'a>,
) -> Result<(), Error> {
self.rest_api(
reqwest::Method::DELETE,
rest_method,
auth,
params,
BODY_NONE,
)
.await
}
async fn api_post<'a, T: DeserializeOwned>(
&self,
rest_method: &str,
body: impl Serialize,
params: Option<impl Serialize>,
auth: AuthorizationType<'a>,
) -> Result<T, Error>
where {
self.rest_api(
reqwest::Method::POST,
rest_method,
auth,
params,
Some(Body::Json(body)),
)
.await
}
async fn api_post_form_urlencoded<'a, T: DeserializeOwned>(
&self,
rest_method: &str,
body: impl Serialize,
params: Option<impl Serialize>,
auth: AuthorizationType<'a>,
) -> Result<T, Error> {
self.rest_api(
reqwest::Method::POST,
rest_method,
auth,
params,
Some(Body::UrlEncoded(body)),
)
.await
}
async fn api_put<'a, T: DeserializeOwned>(
&self,
rest_method: &str,
body: impl Serialize,
params: Option<impl Serialize>,
auth: AuthorizationType<'a>,
) -> Result<T, Error> {
self.rest_api(
reqwest::Method::PUT,
rest_method,
auth,
params,
Some(Body::Json(body)),
)
.await
}
async fn rest_api<T: DeserializeOwned>(
&self,
http_method: reqwest::Method,
url: &str,
auth: AuthorizationType<'_>,
params: Option<impl Serialize>,
body: Option<Body<impl Serialize>>,
) -> Result<T, Error> {
let url_trimmed = url.split('?').next().unwrap_or(url);
let prefix = self
.host_prefix
.get(url_trimmed)
.map_or(REST_HOST_PREFIX, String::as_str);
let url = format!("{prefix}/{url}");
let mut request_builder = self.web_client.request(http_method, url);
if let Some(params) = params {
request_builder = request_builder.query(¶ms);
}
match body {
Some(Body::Json(body)) => {
request_builder = request_builder.json(&body);
}
Some(Body::UrlEncoded(body)) => {
request_builder = request_builder.form(&body);
}
None => {}
}
match auth {
AuthorizationType::None => {}
AuthorizationType::Bearer(token) => {
request_builder = request_builder.bearer_auth(token);
}
AuthorizationType::Basic { username, password } => {
request_builder = request_builder.basic_auth(username, Some(password));
}
}
let res = request_builder.send().await?;
Ok(res.json().await?)
}
}
impl Webex {
pub async fn new(token: &str) -> Self {
Self::new_with_device_name(DEFAULT_DEVICE_NAME, token).await
}
pub async fn new_with_device_name(device_name: &str, token: &str) -> Self {
let mut client: RestClient = RestClient {
host_prefix: HashMap::new(),
web_client: reqwest::Client::new(),
};
let mut hasher = DefaultHasher::new();
hash::Hash::hash_slice(token.as_bytes(), &mut hasher);
let id = hasher.finish();
client
.host_prefix
.insert("limited/catalog".to_string(), U2C_HOST_PREFIX.to_string());
let mut webex = Self {
id,
client,
token: token.to_string(),
device: DeviceData {
device_name: Some(DEFAULT_DEVICE_NAME.to_string()),
device_type: Some("DESKTOP".to_string()),
localized_model: Some("rust".to_string()),
model: Some(format!("rust-v{CRATE_VERSION}")),
name: Some(device_name.to_owned()),
system_name: Some(DEVICE_SYSTEM_NAME.to_string()),
system_version: Some(CRATE_VERSION.to_string()),
..DeviceData::default()
},
};
let devices_url = match webex.get_mercury_url().await {
Ok(url) => {
trace!("Fetched mercury url {}", url);
url
}
Err(e) => {
debug!("Failed to fetch devices url, falling back to default");
debug!("Error: {:?}", e);
DEFAULT_REGISTRATION_HOST_PREFIX.to_string()
}
};
webex
.client
.host_prefix
.insert("devices".to_string(), devices_url);
webex
}
pub async fn event_stream(&self) -> Result<WebexEventStream, Error> {
async fn connect_device(s: &Webex, device: DeviceData) -> Result<WebexEventStream, Error> {
trace!("Attempting connection with device named {:?}", device.name);
let Some(ws_url) = device.ws_url else {
return Err("Device has no ws_url".into());
};
let url = url::Url::parse(ws_url.as_str())
.map_err(|_| Error::from("Failed to parse ws_url"))?;
debug!("Connecting to {:?}", url);
match connect_async(url.as_str()).await {
Ok((mut ws_stream, _response)) => {
debug!("Connected to {}", url);
WebexEventStream::auth(&mut ws_stream, &s.token).await?;
debug!("Authenticated");
let timeout = Duration::from_secs(20);
Ok(WebexEventStream {
ws_stream,
timeout,
is_open: true,
})
}
Err(e) => {
warn!("Failed to connect to {:?}: {:?}", url, e);
Err(Error::Tungstenite(
e,
"Failed to connect to ws_url".to_string(),
))
}
}
}
let mut devices: Vec<DeviceData> = self
.get_devices()
.await?
.iter()
.filter(|d| d.name == self.device.name)
.inspect(|d| trace!("Kept device: {}", d))
.cloned()
.collect();
devices.sort_by(|a: &DeviceData, b: &DeviceData| {
b.modification_time
.unwrap_or_else(chrono::Utc::now)
.cmp(&a.modification_time.unwrap_or_else(chrono::Utc::now))
});
for device in devices {
if let Ok(event_stream) = connect_device(self, device).await {
trace!("Successfully connected to device.");
return Ok(event_stream);
}
}
connect_device(self, self.setup_devices().await?).await
}
async fn get_mercury_url(&self) -> Result<String, Option<error::Error>> {
lazy_static::lazy_static! {
static ref MERCURY_CACHE: Mutex<HashMap<u64, Result<String, ()>>> = Mutex::new(HashMap::new());
}
if let Ok(Some(result)) = MERCURY_CACHE
.lock()
.map(|cache| cache.get(&self.id).cloned())
{
trace!("Found mercury URL in cache!");
return result.map_err(|()| None);
}
let mercury_url = self.get_mercury_url_uncached().await;
if let Ok(mut cache) = MERCURY_CACHE.lock() {
let result = mercury_url.as_ref().map_or(Err(()), |url| Ok(url.clone()));
trace!("Saving mercury url to cache: {}=>{:?}", self.id, &result);
cache.insert(self.id, result);
}
mercury_url.map_err(Some)
}
async fn get_mercury_url_uncached(&self) -> Result<String, error::Error> {
let orgs = self.list::<Organization>().await?;
if orgs.is_empty() {
return Err("Can't get mercury URL with no orgs".into());
}
let org_id = &orgs[0].id;
let api_url = "limited/catalog";
let params = [("format", "hostmap"), ("orgId", org_id.as_str())];
let catalogs = self
.client
.api_get::<CatalogReply>(
api_url,
Some(params),
AuthorizationType::Bearer(&self.token),
)
.await?;
let mercury_url = catalogs.service_links.wdm;
Ok(mercury_url)
}
#[deprecated(
since = "0.6.3",
note = "Please use `webex::list::<Organization>()` instead"
)]
pub async fn get_orgs(&self) -> Result<Vec<Organization>, Error> {
self.list().await
}
#[deprecated(
since = "0.6.3",
note = "Please use `webex::get::<AttachmentAction>(id)` instead"
)]
pub async fn get_attachment_action(&self, id: &GlobalId) -> Result<AttachmentAction, Error> {
self.get(id).await
}
#[deprecated(
since = "0.6.3",
note = "Please use `webex::get::<Message>(id)` instead"
)]
pub async fn get_message(&self, id: &GlobalId) -> Result<Message, Error> {
self.get(id).await
}
#[deprecated(
since = "0.6.3",
note = "Please use `webex::delete::<Message>(id)` instead"
)]
pub async fn delete_message(&self, id: &GlobalId) -> Result<(), Error> {
self.delete::<Message>(id).await
}
#[deprecated(since = "0.6.3", note = "Please use `webex::list::<Room>()` instead")]
pub async fn get_rooms(&self) -> Result<Vec<Room>, Error> {
self.list().await
}
pub async fn get_all_rooms(&self) -> Result<Vec<Room>, Error> {
let (mut all_rooms, teams) = try_join!(self.list(), self.list::<Team>())?;
let futures: Vec<_> = teams
.into_iter()
.map(|team| {
let params = [("teamId", team.id)];
self.client.api_get::<ListResult<Room>>(
Room::API_ENDPOINT,
Some(params),
AuthorizationType::Bearer(&self.token),
)
})
.collect();
let teams_rooms = try_join_all(futures).await?;
for room in teams_rooms {
all_rooms.extend(room.items);
}
Ok(all_rooms)
}
#[deprecated(since = "0.6.3", note = "Please use `webex::get::<Room>(id)` instead")]
pub async fn get_room(&self, id: &GlobalId) -> Result<Room, Error> {
self.get(id).await
}
#[deprecated(
since = "0.6.3",
note = "Please use `webex::get::<Person>(id)` instead"
)]
pub async fn get_person(&self, id: &GlobalId) -> Result<Person, Error> {
self.get(id).await
}
pub async fn send_message(&self, message: &MessageOut) -> Result<Message, Error> {
self.client
.api_post(
"messages",
message,
None::<()>,
AuthorizationType::Bearer(&self.token),
)
.await
}
pub async fn edit_message(
&self,
message_id: &GlobalId,
params: &MessageEditParams<'_>,
) -> Result<Message, Error> {
let rest_method = format!("messages/{}", message_id.id());
self.client
.api_put(
&rest_method,
params,
None::<()>,
AuthorizationType::Bearer(&self.token),
)
.await
}
pub async fn get<T: Gettable + DeserializeOwned>(&self, id: &GlobalId) -> Result<T, Error> {
let rest_method = format!("{}/{}", T::API_ENDPOINT, id.id());
self.client
.api_get::<T>(
rest_method.as_str(),
None::<()>,
AuthorizationType::Bearer(&self.token),
)
.await
}
pub async fn delete<T: Gettable + DeserializeOwned>(&self, id: &GlobalId) -> Result<(), Error> {
let rest_method = format!("{}/{}", T::API_ENDPOINT, id.id());
self.client
.api_delete(
rest_method.as_str(),
None::<()>,
AuthorizationType::Bearer(&self.token),
)
.await
}
pub async fn list<T: Gettable + DeserializeOwned>(&self) -> Result<Vec<T>, Error> {
self.client
.api_get::<ListResult<T>>(
T::API_ENDPOINT,
None::<()>,
AuthorizationType::Bearer(&self.token),
)
.await
.map(|result| result.items)
}
pub async fn list_with_params<T: Gettable + DeserializeOwned>(
&self,
list_params: T::ListParams<'_>,
) -> Result<Vec<T>, Error> {
self.client
.api_get::<ListResult<T>>(
T::API_ENDPOINT,
Some(list_params),
AuthorizationType::Bearer(&self.token),
)
.await
.map(|result| result.items)
}
async fn get_devices(&self) -> Result<Vec<DeviceData>, Error> {
match self
.client
.api_get::<DevicesReply>(
"devices",
None::<()>,
AuthorizationType::Bearer(&self.token),
)
.await
{
#[rustfmt::skip]
Ok(DevicesReply { devices: Some(devices), .. }) => Ok(devices),
Ok(DevicesReply { devices: None, .. }) => {
debug!("Chaining one-time device setup from devices query");
self.setup_devices().await.map(|device| vec![device])
}
Err(e) => match e {
Error::Status(s) | Error::StatusText(s, _) => {
if s == StatusCode::NOT_FOUND {
debug!("No devices found, creating new one");
self.setup_devices().await.map(|device| vec![device])
} else {
Err(e)
}
}
Error::Limited(_, _) => Err(e),
_ => Err(format!("Can't decode devices reply: {e}").into()),
},
}
}
async fn setup_devices(&self) -> Result<DeviceData, Error> {
trace!("Setting up new device: {}", &self.device);
self.client
.api_post(
"devices",
&self.device,
None::<()>,
AuthorizationType::Bearer(&self.token),
)
.await
}
}
impl From<&AttachmentAction> for MessageOut {
fn from(action: &AttachmentAction) -> Self {
Self {
room_id: action.room_id.clone(),
..Self::default()
}
}
}
impl From<&Message> for MessageOut {
fn from(msg: &Message) -> Self {
let mut new_msg = Self::default();
if msg.room_type == Some(RoomType::Group) {
new_msg.room_id.clone_from(&msg.room_id);
} else if let Some(_person_id) = &msg.person_id {
new_msg.to_person_id.clone_from(&msg.person_id);
} else {
new_msg.to_person_email.clone_from(&msg.person_email);
}
new_msg
}
}
impl Message {
#[must_use]
pub fn reply(&self) -> MessageOut {
MessageOut {
room_id: self.room_id.clone(),
parent_id: self
.parent_id
.as_deref()
.or(self.id.as_deref())
.map(ToOwned::to_owned),
..Default::default()
}
}
}
impl MessageOut {
#[deprecated(since = "0.2.0", note = "Please use the from instead")]
#[must_use]
pub fn from_msg(msg: &Message) -> Self {
Self::from(msg)
}
pub fn add_attachment(&mut self, card: AdaptiveCard) -> &Self {
self.attachments = Some(vec![Attachment {
content_type: "application/vnd.microsoft.card.adaptive".to_string(),
content: card,
}]);
self
}
}