#[macro_use]
extern crate log;
extern crate crossbeam_channel;
extern crate http;
extern crate indexmap;
extern crate reqwest;
extern crate serde;
extern crate serde_json;
extern crate snafu;
extern crate uuid;
extern crate ws;
extern crate xio_hwdb;
extern crate xio_jobset;
extern crate xio_webapi;
mod error;
pub use error::Error;
use crossbeam_channel::{unbounded, Receiver, Sender};
use error::AsResult;
use http::status::StatusCode;
use indexmap::IndexMap;
use snafu::ResultExt;
use std::thread;
use uuid::Uuid;
use xio_hwdb as hwdb;
use xio_jobset as jobset;
use xio_webapi as webapi;
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Clone, Debug)]
pub struct Client {
base_url: String,
}
struct DeviceEventHandler {
tx: Option<Sender<webapi::DeviceEvent>>,
}
struct ControllerEventHandler {
tx: Option<Sender<webapi::ControllerEvent>>,
}
struct JobEventHandler {
tx: Option<Sender<webapi::JobEvent>>,
}
impl ws::Handler for DeviceEventHandler {
fn on_open(&mut self, handshake: ws::Handshake) -> ws::Result<()> {
if handshake.response.status() != StatusCode::SWITCHING_PROTOCOLS {
warn!("Not connected: {:?}", handshake.response);
} else {
debug!("Connected.");
}
Ok(())
}
fn on_message(&mut self, msg: ws::Message) -> ws::Result<()> {
if let Some(ref tx) = self.tx.clone() {
match msg {
ws::Message::Text(s) => {
if let Ok(event) = serde_json::from_str(&s) {
if tx.send(event).is_err() {
self.tx = None;
}
} else {
warn!(
"Unknown message received over websocket: \
{:?}",
s
);
}
}
ws::Message::Binary(_) => {
warn!("Received unexpected binary message over websocket");
}
}
} else {
warn!("Received websockets message during shutdown");
}
Ok(())
}
fn on_close(&mut self, _code: ws::CloseCode, _reason: &str) {
self.tx = None;
}
fn on_error(&mut self, err: ws::Error) {
warn!("WebSocket error: {:?}", err);
self.tx = None;
}
}
impl ws::Handler for ControllerEventHandler {
fn on_open(&mut self, handshake: ws::Handshake) -> ws::Result<()> {
if handshake.response.status() != StatusCode::SWITCHING_PROTOCOLS {
warn!("Not connected: {:?}", handshake.response);
} else {
debug!("Connected.");
}
Ok(())
}
fn on_message(&mut self, msg: ws::Message) -> ws::Result<()> {
if let Some(ref tx) = self.tx.clone() {
match msg {
ws::Message::Text(s) => {
if let Ok(event) = serde_json::from_str(&s) {
if tx.send(event).is_err() {
self.tx = None;
}
} else {
warn!(
"Unknown message received over websocket: \
{:?}",
s
);
}
}
ws::Message::Binary(_) => {
warn!("Received unexpected binary message over websocket");
}
}
} else {
warn!("Received websockets message during shutdown");
}
Ok(())
}
fn on_close(&mut self, _code: ws::CloseCode, _reason: &str) {
self.tx = None;
}
fn on_error(&mut self, err: ws::Error) {
warn!("WebSocket error: {:?}", err);
self.tx = None;
}
}
impl ws::Handler for JobEventHandler {
fn on_open(&mut self, handshake: ws::Handshake) -> ws::Result<()> {
if handshake.response.status() != StatusCode::SWITCHING_PROTOCOLS {
warn!("Not connected: {:?}", handshake.response);
} else {
debug!("Connected.");
}
Ok(())
}
fn on_message(&mut self, msg: ws::Message) -> ws::Result<()> {
if let Some(ref tx) = self.tx.clone() {
match msg {
ws::Message::Text(s) => {
if let Ok(event) = serde_json::from_str(&s) {
if tx.send(event).is_err() {
self.tx = None;
}
} else {
warn!(
"Unknown message received over websocket: \
{:?}",
s
);
}
}
ws::Message::Binary(_) => {
warn!("Received unexpected binary message over websocket");
}
}
} else {
warn!("Received websockets message during shutdown");
}
Ok(())
}
fn on_close(&mut self, _code: ws::CloseCode, _reason: &str) {
self.tx = None;
}
fn on_error(&mut self, err: ws::Error) {
warn!("WebSocket error: {:?}", err);
self.tx = None;
}
}
impl Client {
pub fn new(url: &str) -> Self {
Client {
base_url: url.trim_end_matches('/').to_string(),
}
}
pub fn url(&self) -> &str {
&self.base_url
}
fn suburl(&self, suburl: &str) -> String {
format!("{}/{}", self.base_url, suburl)
}
fn get(&self, url: &str) -> Result<reqwest::Response> {
debug!("Downloading from URL \"{}\"", url);
reqwest::Client::new()
.get(url)
.send()
.context(error::Reqwest)
}
fn get_json<T>(&self, url: &str) -> Result<T>
where
for<'de> T: serde::Deserialize<'de>,
{
self.get(url)?.json().context(error::Reqwest)
}
fn get_json_response<T: Default + webapi::MayBeSkipped>(
&self,
url: &str,
) -> Result<T>
where
for<'de> T: serde::Deserialize<'de>,
{
self.get_json::<webapi::Response<T>>(url)?.as_result()
}
fn post_json<S, T>(&self, url: &str, data: &S) -> Result<T>
where
S: serde::Serialize + std::fmt::Debug,
for<'de> T: serde::Deserialize<'de>,
{
debug!("Posting to URL \"{}\"", url);
debug!("JSON: {:#?}", data);
reqwest::Client::new()
.post(url)
.json(data)
.send()
.context(error::Reqwest)?
.json()
.context(error::Reqwest)
}
fn post_json_response<S, T: Default + webapi::MayBeSkipped>(
&self,
url: &str,
data: &S,
) -> Result<T>
where
S: serde::Serialize + std::fmt::Debug,
for<'de> T: serde::Deserialize<'de>,
{
let response: webapi::Response<T> =
self.post_json::<S, webapi::Response<T>>(url, data)?;
response.as_result()
}
fn delete<T>(&self, url: &str) -> Result<T>
where
for<'de> T: serde::Deserialize<'de>,
{
debug!("Deleting on URL \"{}\"", url);
reqwest::Client::new()
.delete(url)
.send()
.context(error::Reqwest)?
.json()
.context(error::Reqwest)
}
fn delete_response<T: Default + webapi::MayBeSkipped>(
&self,
url: &str,
) -> Result<T>
where
for<'de> T: serde::Deserialize<'de>,
{
let response: webapi::Response<T> = self.delete(url)?;
response.as_result()
}
pub fn description_api(&self) -> Result<webapi::ApiDescription> {
self.get_json_response(&self.suburl("description/api"))
}
pub fn description_controllers(
&self,
) -> Result<IndexMap<String, hwdb::HardwareBoardDescription>> {
self.get_json_response(&self.suburl("description/controller"))
}
pub fn description_modules(
&self,
) -> Result<IndexMap<String, hwdb::Module>> {
self.get_json_response(&self.suburl("description/module"))
}
pub fn controllers(
&self,
) -> Result<IndexMap<Uuid, webapi::ControllerStatus>> {
self.get_json_response(&self.suburl("controller"))
}
pub fn controllers_events(
&self,
) -> Result<Receiver<webapi::DeviceEvent>> {
let url = self.suburl("controller/eventlog");
let url = url.replace("http", "ws");
let (tx, rx) = unbounded();
thread::spawn(move || {
debug!("Connecting to websockets at {:?}", url);
match ws::connect(url, |_| DeviceEventHandler {
tx: Some(tx.clone()),
}) {
Ok(()) => debug!("Connected."),
Err(e) => warn!("Not connected: {:?}", e),
}
});
Ok(rx)
}
pub fn controller_events(
&self,
uuid: &Uuid,
) -> Result<Receiver<webapi::ControllerEvent>> {
let url = self.suburl(&format!("controller/{}/eventlog", uuid));
let url = url.replace("http", "ws");
let (tx, rx) = unbounded();
thread::spawn(move || {
debug!("Connecting to websockets at {:?}", url);
match ws::connect(url, |_| ControllerEventHandler {
tx: Some(tx.clone()),
}) {
Ok(()) => debug!("Connected."),
Err(e) => warn!("Not connected: {:?}", e),
}
});
Ok(rx)
}
pub fn joblog_events(
&self,
uuid: &Uuid,
) -> Result<Receiver<webapi::JobEvent>> {
let url = self.suburl(&format!("controller/{}/joblog", uuid));
let url = url.replace("http", "ws");
let (tx, rx) = unbounded();
thread::spawn(move || {
debug!("Connecting to websockets at {:?}", url);
match ws::connect(url, |_| JobEventHandler {
tx: Some(tx.clone()),
}) {
Ok(()) => debug!("Connected."),
Err(e) => warn!("Not connected: {:?}", e),
}
});
Ok(rx)
}
pub fn controller(
&self,
uuid: &Uuid,
) -> Result<webapi::ControllerStatus> {
self.get_json_response(
&self.suburl(&format!("controller/{}", uuid)),
)
}
pub fn clear_job_set(&self, uuid: &Uuid) -> Result<()> {
self.delete_response(
&self.suburl(&format!("controller/{}/jobset", uuid)),
)
}
pub fn post_job_set(
&self,
uuid: &Uuid,
jobset: &jobset::JobSet,
) -> Result<()> {
self.post_json_response(
&self.suburl(&format!("controller/{}/jobset", uuid)),
jobset,
)
}
pub fn post_job_control(
&self,
uuid: &Uuid,
message: &webapi::JobControlAction,
) -> Result<webapi::JobControlAction> {
self.post_json_response(
&self.suburl(&format!("controller/{}/jobcontrol", uuid)),
message,
)
}
pub fn post_module_statecontrol(
&self,
uuid: &Uuid,
module_id: &str,
message: &webapi::ModuleAction,
) -> Result<webapi::ModuleAction> {
self.post_json_response(
&self.suburl(&format!(
"controller/{}/module/{}/statecontrol",
uuid, module_id
)),
message,
)
}
pub fn post_flash_firmware(&self, uuid: &Uuid) -> Result<()> {
self.post_json_response(
&self.suburl(&format!("controller/{}/flash", uuid)),
&webapi::FirmwareFlashRequest {},
)
}
}