xio_webclient/
lib.rs

1#[macro_use]
2extern crate log;
3
4extern crate crossbeam_channel;
5extern crate http;
6extern crate indexmap;
7extern crate reqwest;
8extern crate serde;
9extern crate serde_json;
10extern crate snafu;
11extern crate uuid;
12extern crate ws;
13extern crate xio_hwdb;
14extern crate xio_jobset;
15extern crate xio_webapi;
16
17mod error;
18
19pub use error::Error;
20
21use crossbeam_channel::{unbounded, Receiver, Sender};
22use error::AsResult;
23use http::status::StatusCode;
24use indexmap::IndexMap;
25use snafu::ResultExt;
26use std::thread;
27use uuid::Uuid;
28use xio_hwdb as hwdb;
29use xio_jobset as jobset;
30use xio_webapi as webapi;
31
32pub type Result<T, E = Error> = std::result::Result<T, E>;
33
34#[derive(Clone, Debug)]
35pub struct Client {
36    base_url: String,
37}
38
39struct DeviceEventHandler {
40    tx: Option<Sender<webapi::DeviceEvent>>,
41    ws_sender: ws::Sender,
42}
43
44struct ControllerEventHandler {
45    tx: Option<Sender<webapi::ControllerEvent>>,
46    ws_sender: ws::Sender,
47}
48
49struct JobEventHandler {
50    tx: Option<Sender<webapi::JobEvent>>,
51    ws_sender: ws::Sender,
52}
53
54impl ws::Handler for DeviceEventHandler {
55    fn on_open(&mut self, handshake: ws::Handshake) -> ws::Result<()> {
56        if handshake.response.status() != StatusCode::SWITCHING_PROTOCOLS {
57            warn!("Not connected: {:?}", handshake.response);
58        } else {
59            debug!("Connected.");
60        }
61        Ok(())
62    }
63
64    fn on_message(&mut self, msg: ws::Message) -> ws::Result<()> {
65        if let Some(ref tx) = self.tx.clone() {
66            match msg {
67                ws::Message::Text(s) => {
68                    if let Ok(event) = serde_json::from_str(&s) {
69                        if tx.send(event).is_err() {
70                            self.tx = None;
71                            if let Err(e) = self.ws_sender.shutdown() {
72                                warn!(
73                                    "Couldn't shut down properly: {:?}",
74                                    e
75                                );
76                            }
77                        }
78                    } else {
79                        warn!(
80                            "Unknown message received over websocket: \
81                             {:?}",
82                            s
83                        );
84                    }
85                }
86                ws::Message::Binary(_) => {
87                    warn!("Received unexpected binary message over websocket");
88                }
89            }
90        } else {
91            warn!("Received websockets message during shutdown");
92        }
93        Ok(())
94    }
95
96    fn on_close(&mut self, _code: ws::CloseCode, _reason: &str) {
97        // drop the sender
98        self.tx = None;
99        if let Err(e) = self.ws_sender.shutdown() {
100            warn!("Couldn't shut down properly: {:?}", e);
101        }
102    }
103    fn on_error(&mut self, err: ws::Error) {
104        warn!("WebSocket error: {:?}", err);
105        // drop the sender
106        self.tx = None;
107        if let Err(e) = self.ws_sender.shutdown() {
108            warn!("Couldn't shut down properly: {:?}", e);
109        }
110    }
111}
112
113impl ws::Handler for ControllerEventHandler {
114    fn on_open(&mut self, handshake: ws::Handshake) -> ws::Result<()> {
115        if handshake.response.status() != StatusCode::SWITCHING_PROTOCOLS {
116            warn!("Not connected: {:?}", handshake.response);
117        } else {
118            debug!("Connected.");
119        }
120        Ok(())
121    }
122
123    fn on_message(&mut self, msg: ws::Message) -> ws::Result<()> {
124        if let Some(ref tx) = self.tx.clone() {
125            match msg {
126                ws::Message::Text(s) => {
127                    if let Ok(event) = serde_json::from_str(&s) {
128                        if tx.send(event).is_err() {
129                            self.tx = None;
130                            if let Err(e) = self.ws_sender.shutdown() {
131                                warn!(
132                                    "Couldn't shut down properly: {:?}",
133                                    e
134                                );
135                            }
136                        }
137                    } else {
138                        warn!(
139                            "Unknown message received over websocket: \
140                             {:?}",
141                            s
142                        );
143                    }
144                }
145                ws::Message::Binary(_) => {
146                    warn!("Received unexpected binary message over websocket");
147                }
148            }
149        } else {
150            warn!("Received websockets message during shutdown");
151        }
152        Ok(())
153    }
154
155    fn on_close(&mut self, _code: ws::CloseCode, _reason: &str) {
156        // drop the sender
157        self.tx = None;
158        if let Err(e) = self.ws_sender.shutdown() {
159            warn!("Couldn't shut down properly: {:?}", e);
160        }
161    }
162
163    fn on_error(&mut self, err: ws::Error) {
164        warn!("WebSocket error: {:?}", err);
165        // drop the sender
166        self.tx = None;
167        if let Err(e) = self.ws_sender.shutdown() {
168            warn!("Couldn't shut down properly: {:?}", e);
169        }
170    }
171}
172
173impl ws::Handler for JobEventHandler {
174    fn on_open(&mut self, handshake: ws::Handshake) -> ws::Result<()> {
175        if handshake.response.status() != StatusCode::SWITCHING_PROTOCOLS {
176            warn!("Not connected: {:?}", handshake.response);
177        } else {
178            debug!("Connected.");
179        }
180        Ok(())
181    }
182
183    fn on_message(&mut self, msg: ws::Message) -> ws::Result<()> {
184        if let Some(ref tx) = self.tx.clone() {
185            match msg {
186                ws::Message::Text(s) => {
187                    if let Ok(event) = serde_json::from_str(&s) {
188                        if tx.send(event).is_err() {
189                            self.tx = None;
190                            if let Err(e) = self.ws_sender.shutdown() {
191                                warn!(
192                                    "Couldn't shut down properly: {:?}",
193                                    e
194                                );
195                            }
196                        }
197                    } else {
198                        warn!(
199                            "Unknown message received over websocket: \
200                             {:?}",
201                            s
202                        );
203                    }
204                }
205                ws::Message::Binary(_) => {
206                    warn!("Received unexpected binary message over websocket");
207                }
208            }
209        } else {
210            warn!("Received websockets message during shutdown");
211        }
212        Ok(())
213    }
214
215    fn on_close(&mut self, _code: ws::CloseCode, _reason: &str) {
216        // drop the sender
217        self.tx = None;
218        if let Err(e) = self.ws_sender.shutdown() {
219            warn!("Couldn't shut down properly: {:?}", e);
220        }
221    }
222
223    fn on_error(&mut self, err: ws::Error) {
224        warn!("WebSocket error: {:?}", err);
225        // drop the sender
226        self.tx = None;
227        if let Err(e) = self.ws_sender.shutdown() {
228            warn!("Couldn't shut down properly: {:?}", e);
229        }
230    }
231}
232
233impl Client {
234    pub fn new(url: &str) -> Self {
235        Client {
236            base_url: url.trim_end_matches('/').to_string(),
237        }
238    }
239
240    pub fn url(&self) -> &str {
241        &self.base_url
242    }
243
244    fn suburl(&self, suburl: &str) -> String {
245        format!("{}/{}", self.base_url, suburl)
246    }
247
248    fn get(&self, url: &str) -> Result<reqwest::Response> {
249        debug!("Downloading from URL \"{}\"", url);
250        reqwest::Client::new()
251            .get(url)
252            .send()
253            .context(error::Reqwest)
254    }
255
256    fn get_json<T>(&self, url: &str) -> Result<T>
257    where
258        for<'de> T: serde::Deserialize<'de>,
259    {
260        self.get(url)?.json().context(error::Reqwest)
261    }
262
263    fn get_json_response<T: Default + webapi::MayBeSkipped>(
264        &self,
265        url: &str,
266    ) -> Result<T>
267    where
268        for<'de> T: serde::Deserialize<'de>,
269    {
270        self.get_json::<webapi::Response<T>>(url)?.as_result()
271    }
272
273    fn post_json<S, T>(&self, url: &str, data: &S) -> Result<T>
274    where
275        S: serde::Serialize + std::fmt::Debug,
276        for<'de> T: serde::Deserialize<'de>,
277    {
278        debug!("Posting to URL \"{}\"", url);
279        debug!("JSON: {:#?}", data);
280        reqwest::Client::new()
281            .post(url)
282            .json(data)
283            .send()
284            .context(error::Reqwest)?
285            .json()
286            .context(error::Reqwest)
287    }
288
289    fn post_json_response<S, T: Default + webapi::MayBeSkipped>(
290        &self,
291        url: &str,
292        data: &S,
293    ) -> Result<T>
294    where
295        S: serde::Serialize + std::fmt::Debug,
296        for<'de> T: serde::Deserialize<'de>,
297    {
298        let response: webapi::Response<T> =
299            self.post_json::<S, webapi::Response<T>>(url, data)?;
300        response.as_result()
301    }
302
303    fn delete<T>(&self, url: &str) -> Result<T>
304    where
305        for<'de> T: serde::Deserialize<'de>,
306    {
307        debug!("Deleting on URL \"{}\"", url);
308        reqwest::Client::new()
309            .delete(url)
310            .send()
311            .context(error::Reqwest)?
312            .json()
313            .context(error::Reqwest)
314    }
315
316    fn delete_response<T: Default + webapi::MayBeSkipped>(
317        &self,
318        url: &str,
319    ) -> Result<T>
320    where
321        for<'de> T: serde::Deserialize<'de>,
322    {
323        let response: webapi::Response<T> = self.delete(url)?;
324        response.as_result()
325    }
326
327    pub fn description_api(&self) -> Result<webapi::ApiDescription> {
328        self.get_json_response(&self.suburl("description/api"))
329    }
330
331    pub fn description_controllers(
332        &self,
333    ) -> Result<IndexMap<String, hwdb::HardwareBoardDescription>> {
334        self.get_json_response(&self.suburl("description/controller"))
335    }
336
337    pub fn description_modules(
338        &self,
339    ) -> Result<IndexMap<String, hwdb::Module>> {
340        self.get_json_response(&self.suburl("description/module"))
341    }
342
343    pub fn controllers(
344        &self,
345    ) -> Result<IndexMap<Uuid, webapi::ControllerStatus>> {
346        self.get_json_response(&self.suburl("controller"))
347    }
348
349    pub fn controllers_events(
350        &self,
351    ) -> Result<Receiver<webapi::DeviceEvent>> {
352        let url = self.suburl("controller/eventlog");
353        let url = url.replace("http", "ws");
354        let (tx, rx) = unbounded();
355        thread::spawn(move || {
356            debug!("Connecting to websockets at {:?}", url);
357            match ws::connect(url, |ws_sender| DeviceEventHandler {
358                tx: Some(tx.clone()),
359                ws_sender,
360            }) {
361                Ok(()) => debug!("Connected."),
362                Err(e) => warn!("Not connected: {:?}", e),
363            }
364        });
365        Ok(rx)
366    }
367
368    pub fn controller_events(
369        &self,
370        uuid: &Uuid,
371    ) -> Result<Receiver<webapi::ControllerEvent>> {
372        let url = self.suburl(&format!("controller/{}/eventlog", uuid));
373        let url = url.replace("http", "ws");
374        let (tx, rx) = unbounded();
375        thread::spawn(move || {
376            debug!("Connecting to websockets at {:?}", url);
377            match ws::connect(url, |ws_sender| ControllerEventHandler {
378                tx: Some(tx.clone()),
379                ws_sender,
380            }) {
381                Ok(()) => debug!("Connected."),
382                Err(e) => warn!("Not connected: {:?}", e),
383            }
384        });
385        Ok(rx)
386    }
387
388    pub fn joblog_events(
389        &self,
390        uuid: &Uuid,
391    ) -> Result<Receiver<webapi::JobEvent>> {
392        let url = self.suburl(&format!("controller/{}/joblog", uuid));
393        let url = url.replace("http", "ws");
394        let (tx, rx) = unbounded();
395        thread::spawn(move || {
396            debug!("Connecting to websockets at {:?}", url);
397            match ws::connect(url, |ws_sender| JobEventHandler {
398                tx: Some(tx.clone()),
399                ws_sender,
400            }) {
401                Ok(()) => debug!("Connected."),
402                Err(e) => warn!("Not connected: {:?}", e),
403            }
404        });
405        Ok(rx)
406    }
407
408    pub fn controller(
409        &self,
410        uuid: &Uuid,
411    ) -> Result<webapi::ControllerStatus> {
412        self.get_json_response(
413            &self.suburl(&format!("controller/{}", uuid)),
414        )
415    }
416
417    pub fn clear_job_set(&self, uuid: &Uuid) -> Result<()> {
418        self.delete_response(
419            &self.suburl(&format!("controller/{}/jobset", uuid)),
420        )
421    }
422
423    pub fn post_job_set(
424        &self,
425        uuid: &Uuid,
426        jobset: &jobset::JobSet,
427    ) -> Result<()> {
428        self.post_json_response(
429            &self.suburl(&format!("controller/{}/jobset", uuid)),
430            jobset,
431        )
432    }
433
434    pub fn post_job_control(
435        &self,
436        uuid: &Uuid,
437        message: &webapi::JobControlAction,
438    ) -> Result<webapi::JobControlAction> {
439        self.post_json_response(
440            &self.suburl(&format!("controller/{}/jobcontrol", uuid)),
441            message,
442        )
443    }
444
445    pub fn post_module_statecontrol(
446        &self,
447        uuid: &Uuid,
448        module_id: &str,
449        message: &webapi::ModuleAction,
450    ) -> Result<webapi::ModuleAction> {
451        self.post_json_response(
452            &self.suburl(&format!(
453                "controller/{}/module/{}/statecontrol",
454                uuid, module_id
455            )),
456            message,
457        )
458    }
459
460    pub fn post_flash_firmware(&self, uuid: &Uuid) -> Result<()> {
461        self.post_json_response(
462            &self.suburl(&format!("controller/{}/flash", uuid)),
463            &webapi::FirmwareFlashRequest {},
464        )
465    }
466}