1#[macro_use]
2extern crate log;
3
4extern crate crossbeam_channel;
5extern crate http;
6extern crate indexmap;
7extern crate reqwest;
8extern crate serde;
9extern crate serde_json;
10extern crate snafu;
11extern crate uuid;
12extern crate ws;
13extern crate xio_hwdb;
14extern crate xio_jobset;
15extern crate xio_webapi;
16
17mod error;
18
19pub use error::Error;
20
21use crossbeam_channel::{unbounded, Receiver, Sender};
22use error::AsResult;
23use http::status::StatusCode;
24use indexmap::IndexMap;
25use snafu::ResultExt;
26use std::thread;
27use uuid::Uuid;
28use xio_hwdb as hwdb;
29use xio_jobset as jobset;
30use xio_webapi as webapi;
31
32pub type Result<T, E = Error> = std::result::Result<T, E>;
33
34#[derive(Clone, Debug)]
35pub struct Client {
36 base_url: String,
37}
38
39struct DeviceEventHandler {
40 tx: Option<Sender<webapi::DeviceEvent>>,
41 ws_sender: ws::Sender,
42}
43
44struct ControllerEventHandler {
45 tx: Option<Sender<webapi::ControllerEvent>>,
46 ws_sender: ws::Sender,
47}
48
49struct JobEventHandler {
50 tx: Option<Sender<webapi::JobEvent>>,
51 ws_sender: ws::Sender,
52}
53
54impl ws::Handler for DeviceEventHandler {
55 fn on_open(&mut self, handshake: ws::Handshake) -> ws::Result<()> {
56 if handshake.response.status() != StatusCode::SWITCHING_PROTOCOLS {
57 warn!("Not connected: {:?}", handshake.response);
58 } else {
59 debug!("Connected.");
60 }
61 Ok(())
62 }
63
64 fn on_message(&mut self, msg: ws::Message) -> ws::Result<()> {
65 if let Some(ref tx) = self.tx.clone() {
66 match msg {
67 ws::Message::Text(s) => {
68 if let Ok(event) = serde_json::from_str(&s) {
69 if tx.send(event).is_err() {
70 self.tx = None;
71 if let Err(e) = self.ws_sender.shutdown() {
72 warn!(
73 "Couldn't shut down properly: {:?}",
74 e
75 );
76 }
77 }
78 } else {
79 warn!(
80 "Unknown message received over websocket: \
81 {:?}",
82 s
83 );
84 }
85 }
86 ws::Message::Binary(_) => {
87 warn!("Received unexpected binary message over websocket");
88 }
89 }
90 } else {
91 warn!("Received websockets message during shutdown");
92 }
93 Ok(())
94 }
95
96 fn on_close(&mut self, _code: ws::CloseCode, _reason: &str) {
97 self.tx = None;
99 if let Err(e) = self.ws_sender.shutdown() {
100 warn!("Couldn't shut down properly: {:?}", e);
101 }
102 }
103 fn on_error(&mut self, err: ws::Error) {
104 warn!("WebSocket error: {:?}", err);
105 self.tx = None;
107 if let Err(e) = self.ws_sender.shutdown() {
108 warn!("Couldn't shut down properly: {:?}", e);
109 }
110 }
111}
112
113impl ws::Handler for ControllerEventHandler {
114 fn on_open(&mut self, handshake: ws::Handshake) -> ws::Result<()> {
115 if handshake.response.status() != StatusCode::SWITCHING_PROTOCOLS {
116 warn!("Not connected: {:?}", handshake.response);
117 } else {
118 debug!("Connected.");
119 }
120 Ok(())
121 }
122
123 fn on_message(&mut self, msg: ws::Message) -> ws::Result<()> {
124 if let Some(ref tx) = self.tx.clone() {
125 match msg {
126 ws::Message::Text(s) => {
127 if let Ok(event) = serde_json::from_str(&s) {
128 if tx.send(event).is_err() {
129 self.tx = None;
130 if let Err(e) = self.ws_sender.shutdown() {
131 warn!(
132 "Couldn't shut down properly: {:?}",
133 e
134 );
135 }
136 }
137 } else {
138 warn!(
139 "Unknown message received over websocket: \
140 {:?}",
141 s
142 );
143 }
144 }
145 ws::Message::Binary(_) => {
146 warn!("Received unexpected binary message over websocket");
147 }
148 }
149 } else {
150 warn!("Received websockets message during shutdown");
151 }
152 Ok(())
153 }
154
155 fn on_close(&mut self, _code: ws::CloseCode, _reason: &str) {
156 self.tx = None;
158 if let Err(e) = self.ws_sender.shutdown() {
159 warn!("Couldn't shut down properly: {:?}", e);
160 }
161 }
162
163 fn on_error(&mut self, err: ws::Error) {
164 warn!("WebSocket error: {:?}", err);
165 self.tx = None;
167 if let Err(e) = self.ws_sender.shutdown() {
168 warn!("Couldn't shut down properly: {:?}", e);
169 }
170 }
171}
172
173impl ws::Handler for JobEventHandler {
174 fn on_open(&mut self, handshake: ws::Handshake) -> ws::Result<()> {
175 if handshake.response.status() != StatusCode::SWITCHING_PROTOCOLS {
176 warn!("Not connected: {:?}", handshake.response);
177 } else {
178 debug!("Connected.");
179 }
180 Ok(())
181 }
182
183 fn on_message(&mut self, msg: ws::Message) -> ws::Result<()> {
184 if let Some(ref tx) = self.tx.clone() {
185 match msg {
186 ws::Message::Text(s) => {
187 if let Ok(event) = serde_json::from_str(&s) {
188 if tx.send(event).is_err() {
189 self.tx = None;
190 if let Err(e) = self.ws_sender.shutdown() {
191 warn!(
192 "Couldn't shut down properly: {:?}",
193 e
194 );
195 }
196 }
197 } else {
198 warn!(
199 "Unknown message received over websocket: \
200 {:?}",
201 s
202 );
203 }
204 }
205 ws::Message::Binary(_) => {
206 warn!("Received unexpected binary message over websocket");
207 }
208 }
209 } else {
210 warn!("Received websockets message during shutdown");
211 }
212 Ok(())
213 }
214
215 fn on_close(&mut self, _code: ws::CloseCode, _reason: &str) {
216 self.tx = None;
218 if let Err(e) = self.ws_sender.shutdown() {
219 warn!("Couldn't shut down properly: {:?}", e);
220 }
221 }
222
223 fn on_error(&mut self, err: ws::Error) {
224 warn!("WebSocket error: {:?}", err);
225 self.tx = None;
227 if let Err(e) = self.ws_sender.shutdown() {
228 warn!("Couldn't shut down properly: {:?}", e);
229 }
230 }
231}
232
233impl Client {
234 pub fn new(url: &str) -> Self {
235 Client {
236 base_url: url.trim_end_matches('/').to_string(),
237 }
238 }
239
240 pub fn url(&self) -> &str {
241 &self.base_url
242 }
243
244 fn suburl(&self, suburl: &str) -> String {
245 format!("{}/{}", self.base_url, suburl)
246 }
247
248 fn get(&self, url: &str) -> Result<reqwest::Response> {
249 debug!("Downloading from URL \"{}\"", url);
250 reqwest::Client::new()
251 .get(url)
252 .send()
253 .context(error::Reqwest)
254 }
255
256 fn get_json<T>(&self, url: &str) -> Result<T>
257 where
258 for<'de> T: serde::Deserialize<'de>,
259 {
260 self.get(url)?.json().context(error::Reqwest)
261 }
262
263 fn get_json_response<T: Default + webapi::MayBeSkipped>(
264 &self,
265 url: &str,
266 ) -> Result<T>
267 where
268 for<'de> T: serde::Deserialize<'de>,
269 {
270 self.get_json::<webapi::Response<T>>(url)?.as_result()
271 }
272
273 fn post_json<S, T>(&self, url: &str, data: &S) -> Result<T>
274 where
275 S: serde::Serialize + std::fmt::Debug,
276 for<'de> T: serde::Deserialize<'de>,
277 {
278 debug!("Posting to URL \"{}\"", url);
279 debug!("JSON: {:#?}", data);
280 reqwest::Client::new()
281 .post(url)
282 .json(data)
283 .send()
284 .context(error::Reqwest)?
285 .json()
286 .context(error::Reqwest)
287 }
288
289 fn post_json_response<S, T: Default + webapi::MayBeSkipped>(
290 &self,
291 url: &str,
292 data: &S,
293 ) -> Result<T>
294 where
295 S: serde::Serialize + std::fmt::Debug,
296 for<'de> T: serde::Deserialize<'de>,
297 {
298 let response: webapi::Response<T> =
299 self.post_json::<S, webapi::Response<T>>(url, data)?;
300 response.as_result()
301 }
302
303 fn delete<T>(&self, url: &str) -> Result<T>
304 where
305 for<'de> T: serde::Deserialize<'de>,
306 {
307 debug!("Deleting on URL \"{}\"", url);
308 reqwest::Client::new()
309 .delete(url)
310 .send()
311 .context(error::Reqwest)?
312 .json()
313 .context(error::Reqwest)
314 }
315
316 fn delete_response<T: Default + webapi::MayBeSkipped>(
317 &self,
318 url: &str,
319 ) -> Result<T>
320 where
321 for<'de> T: serde::Deserialize<'de>,
322 {
323 let response: webapi::Response<T> = self.delete(url)?;
324 response.as_result()
325 }
326
327 pub fn description_api(&self) -> Result<webapi::ApiDescription> {
328 self.get_json_response(&self.suburl("description/api"))
329 }
330
331 pub fn description_controllers(
332 &self,
333 ) -> Result<IndexMap<String, hwdb::HardwareBoardDescription>> {
334 self.get_json_response(&self.suburl("description/controller"))
335 }
336
337 pub fn description_modules(
338 &self,
339 ) -> Result<IndexMap<String, hwdb::Module>> {
340 self.get_json_response(&self.suburl("description/module"))
341 }
342
343 pub fn controllers(
344 &self,
345 ) -> Result<IndexMap<Uuid, webapi::ControllerStatus>> {
346 self.get_json_response(&self.suburl("controller"))
347 }
348
349 pub fn controllers_events(
350 &self,
351 ) -> Result<Receiver<webapi::DeviceEvent>> {
352 let url = self.suburl("controller/eventlog");
353 let url = url.replace("http", "ws");
354 let (tx, rx) = unbounded();
355 thread::spawn(move || {
356 debug!("Connecting to websockets at {:?}", url);
357 match ws::connect(url, |ws_sender| DeviceEventHandler {
358 tx: Some(tx.clone()),
359 ws_sender,
360 }) {
361 Ok(()) => debug!("Connected."),
362 Err(e) => warn!("Not connected: {:?}", e),
363 }
364 });
365 Ok(rx)
366 }
367
368 pub fn controller_events(
369 &self,
370 uuid: &Uuid,
371 ) -> Result<Receiver<webapi::ControllerEvent>> {
372 let url = self.suburl(&format!("controller/{}/eventlog", uuid));
373 let url = url.replace("http", "ws");
374 let (tx, rx) = unbounded();
375 thread::spawn(move || {
376 debug!("Connecting to websockets at {:?}", url);
377 match ws::connect(url, |ws_sender| ControllerEventHandler {
378 tx: Some(tx.clone()),
379 ws_sender,
380 }) {
381 Ok(()) => debug!("Connected."),
382 Err(e) => warn!("Not connected: {:?}", e),
383 }
384 });
385 Ok(rx)
386 }
387
388 pub fn joblog_events(
389 &self,
390 uuid: &Uuid,
391 ) -> Result<Receiver<webapi::JobEvent>> {
392 let url = self.suburl(&format!("controller/{}/joblog", uuid));
393 let url = url.replace("http", "ws");
394 let (tx, rx) = unbounded();
395 thread::spawn(move || {
396 debug!("Connecting to websockets at {:?}", url);
397 match ws::connect(url, |ws_sender| JobEventHandler {
398 tx: Some(tx.clone()),
399 ws_sender,
400 }) {
401 Ok(()) => debug!("Connected."),
402 Err(e) => warn!("Not connected: {:?}", e),
403 }
404 });
405 Ok(rx)
406 }
407
408 pub fn controller(
409 &self,
410 uuid: &Uuid,
411 ) -> Result<webapi::ControllerStatus> {
412 self.get_json_response(
413 &self.suburl(&format!("controller/{}", uuid)),
414 )
415 }
416
417 pub fn clear_job_set(&self, uuid: &Uuid) -> Result<()> {
418 self.delete_response(
419 &self.suburl(&format!("controller/{}/jobset", uuid)),
420 )
421 }
422
423 pub fn post_job_set(
424 &self,
425 uuid: &Uuid,
426 jobset: &jobset::JobSet,
427 ) -> Result<()> {
428 self.post_json_response(
429 &self.suburl(&format!("controller/{}/jobset", uuid)),
430 jobset,
431 )
432 }
433
434 pub fn post_job_control(
435 &self,
436 uuid: &Uuid,
437 message: &webapi::JobControlAction,
438 ) -> Result<webapi::JobControlAction> {
439 self.post_json_response(
440 &self.suburl(&format!("controller/{}/jobcontrol", uuid)),
441 message,
442 )
443 }
444
445 pub fn post_module_statecontrol(
446 &self,
447 uuid: &Uuid,
448 module_id: &str,
449 message: &webapi::ModuleAction,
450 ) -> Result<webapi::ModuleAction> {
451 self.post_json_response(
452 &self.suburl(&format!(
453 "controller/{}/module/{}/statecontrol",
454 uuid, module_id
455 )),
456 message,
457 )
458 }
459
460 pub fn post_flash_firmware(&self, uuid: &Uuid) -> Result<()> {
461 self.post_json_response(
462 &self.suburl(&format!("controller/{}/flash", uuid)),
463 &webapi::FirmwareFlashRequest {},
464 )
465 }
466}