1use tokio_core::reactor;
24use super::super::models::*;
25use super::Client;
26use futures::sync::{mpsc, oneshot};
27use futures::{Future, Stream};
28use std::sync::mpsc::SendError;
29use std::sync::Mutex;
30use std::thread;
31use swagger::{AuthData, ContextBuilder, EmptyContext, Has, Push, XSpanIdString, make_context, make_context_ty};
32use log::{warn, info, debug};
33
34#[allow(unused_imports)]
35use crate::{
36 Api, ApiError, ApiNoContext, ContextWrapperExt, I2cBusApiResponse, I2cBusListResponse,
37 I2cBusReadByteResponse, I2cBusReadBytesResponse, I2cBusReadRegResponse,
38 I2cBusWriteByteRegResponse, I2cBusWriteByteResponse, I2cBusWriteBytesRegResponse,
39 I2cBusWriteBytesResponse,
40};
41
42#[derive(Debug)]
43pub enum Response {
44 Error(ApiError),
45 Api(I2cBusApiResponse),
46 List(I2cBusListResponse),
47 ReadByte(I2cBusReadByteResponse),
48 ReadBytes(I2cBusReadBytesResponse),
49 ReadReg(I2cBusReadRegResponse),
50 WriteByte(I2cBusWriteByteResponse),
51 WriteByteReg(I2cBusWriteByteRegResponse),
52 WriteBytes(I2cBusWriteBytesResponse),
53 WriteBytesReg(I2cBusWriteBytesRegResponse),
54}
55
56#[derive(Clone)]
57pub enum RequestType {
58 Api,
59 List,
60 ReadByte {
61 bus_id: BusId,
62 addr: Addr,
63 },
64 ReadBytes {
65 bus_id: BusId,
66 addr: Addr,
67 num_bytes: NumBytes,
68 },
69 ReadReg {
70 bus_id: BusId,
71 addr: Addr,
72 reg: Reg,
73 num_bytes: NumBytes,
74 },
75 WriteByte {
76 bus_id: BusId,
77 addr: Addr,
78 value: Value,
79 },
80 WriteByteReg {
81 bus_id: BusId,
82 addr: Addr,
83 reg: Reg,
84 value: Value,
85 },
86 WriteBytes {
87 bus_id: BusId,
88 addr: Addr,
89 values: Values,
90 },
91 WriteBytesReg {
92 bus_id: BusId,
93 addr: Addr,
94 reg: Reg,
95 values: Values,
96 },
97}
98
99struct Request {
100 ty: RequestType,
101 sender: oneshot::Sender<Response>,
102}
103
104#[derive(Clone)]
105pub struct Handle {
106 sender: mpsc::UnboundedSender<Request>,
107}
108
109macro_rules! make_api_call {
114 ( $fn:ident, $fn_ret:ident, $ty:ident, $( $arg:tt, $arg_ty:ident),* ) => {
115 pub fn $fn(
116 &self,
117 $( $arg: $arg_ty, )*
118 ) -> Box<dyn Future<Item = $fn_ret, Error = ApiError> + Send> {
119 let (rsp_tx, rsp_rx) = oneshot::channel::<Response>();
120 match self.sender.unbounded_send(Request {
121 ty: RequestType::$ty {
122 $( $arg ),*
123 },
124 sender: rsp_tx,
125 }) {
126 Ok(_) => {
127 Box::new(rsp_rx
128 .map(|rsp| match rsp {
129 Response::$ty(x) => x,
130 Response::Error(e) => {
131 $fn_ret::TransactionFailed(I2cBusError {
132 error: None,
133 description: Some(format!("Client API Error: {}", e).to_string()), })
135 },
136 _ => panic!("Hit invalid match arm"),
137 })
138 .map_err(|rsp| match rsp {
139 oneshot::Canceled => ApiError("oneshot cancelled error".to_string()), })
141 )
142 },
143 Err(e) => Box::new(futures::future::err(ApiError(
144 format!("unbounded_send failure {:?}", e).to_string(),
145 ))),
146 }
147 }
148 }
149}
150
151impl Handle {
152 pub fn new(url: &'static str) -> Handle {
153
154 let (s, r) = mpsc::unbounded::<Request>();
155
156 thread::spawn(move || {
157 let mut core = reactor::Core::new().unwrap();
159 let core_handle = core.handle();
160 let i2cbus = Client::try_new_http(url)
161 .unwrap_or_else(|_| panic!("Failed to connect to I2C bus at {}", url));
162 info!("Created connection to I2C bus at {}", url);
163 let context: make_context_ty!(
164 ContextBuilder,
165 EmptyContext,
166 Option<AuthData>,
167 XSpanIdString
168 ) = make_context!(
169 ContextBuilder,
170 EmptyContext,
171 None as Option<AuthData>,
172 XSpanIdString::default()
173 );
174 let i2cbus_loop = r
175 .map_err(|e| warn!("I2C bus API error = {:?}", e))
176 .for_each(move |request| {
177 let future = handle_request(request, &i2cbus, &context);
178 core_handle.spawn(future);
180 Ok(())
181 });
182
183 core.run(i2cbus_loop)
184 .expect("Failed to start i2cbus reactor core loop.");
185 });
186
187 Handle { sender: s }
188 }
189
190 make_api_call!(
191 i2c_bus_read_byte,
192 I2cBusReadByteResponse,
193 ReadByte,
194 bus_id,
195 BusId,
196 addr,
197 Addr
198 );
199
200 make_api_call!(
201 i2c_bus_read_bytes,
202 I2cBusReadBytesResponse,
203 ReadBytes,
204 bus_id,
205 BusId,
206 addr,
207 Addr,
208 num_bytes,
209 NumBytes
210 );
211
212 make_api_call!(
213 i2c_bus_read_reg,
214 I2cBusReadRegResponse,
215 ReadReg,
216 bus_id,
217 BusId,
218 addr,
219 Addr,
220 reg,
221 Reg,
222 num_bytes,
223 NumBytes
224 );
225
226 make_api_call!(
227 i2c_bus_write_byte,
228 I2cBusWriteByteResponse,
229 WriteByte,
230 bus_id,
231 BusId,
232 addr,
233 Addr,
234 value,
235 Value
236 );
237
238 make_api_call!(
239 i2c_bus_write_byte_reg,
240 I2cBusWriteByteRegResponse,
241 WriteByteReg,
242 bus_id,
243 BusId,
244 addr,
245 Addr,
246 reg,
247 Reg,
248 value,
249 Value
250 );
251
252 make_api_call!(
253 i2c_bus_write_bytes,
254 I2cBusWriteBytesResponse,
255 WriteBytes,
256 bus_id,
257 BusId,
258 addr,
259 Addr,
260 values,
261 Values
262 );
263
264 make_api_call!(
265 i2c_bus_write_bytes_reg,
266 I2cBusWriteBytesRegResponse,
267 WriteBytesReg,
268 bus_id,
269 BusId,
270 addr,
271 Addr,
272 reg,
273 Reg,
274 values,
275 Values
276 );
277
278}
279
280fn handle_receiver(
281 rsp: oneshot::Receiver<Response>,
282) -> Box<dyn Future<Item = I2cBusWriteBytesResponse, Error = ApiError> + Send> {
283 Box::new(rsp.then(|r| {
284 if true {
285 return Box::new(futures::future::ok(
286 I2cBusWriteBytesResponse::TransactionFailed(I2cBusError {
287 error: None,
288 description: None,
289 }),
290 ));
291 } else {
292 return Box::new(futures::future::err(ApiError(
293 "unbounded_send failure".to_string(), )));
295 }
296 }))
297}
298
299fn handle_request_api(
300 request: Request,
301 i2cbus: &Client<hyper::client::ResponseFuture>,
302 context: &make_context_ty!(
303 ContextBuilder,
304 EmptyContext,
305 Option<AuthData>,
306 XSpanIdString
307 ),
308) -> Box<dyn Future<Item = (), Error = ()>> {
309 Box::new(i2cbus.i2c_bus_api(context).then(|result| {
310 let response = match result {
311 Ok(x) => Response::Api(x),
312 Err(e) => Response::Error(e),
313 };
314 match request.sender.send(response) {
315 Ok(_) => Ok(()),
316 Err(e) => {
317 warn!("Failed to return Api call {:?}", e);
318 Err(()) }
320 }
321 }))
322}
323
324fn handle_request_list(
325 request: Request,
326 i2cbus: &Client<hyper::client::ResponseFuture>,
327 context: &make_context_ty!(
328 ContextBuilder,
329 EmptyContext,
330 Option<AuthData>,
331 XSpanIdString
332 ),
333) -> Box<dyn Future<Item = (), Error = ()>> {
334 Box::new(i2cbus.i2c_bus_list(context).then(|result| {
335 let response = match result {
336 Ok(x) => Response::List(x),
337 Err(e) => Response::Error(e),
338 };
339 match request.sender.send(response) {
340 Ok(_) => Ok(()),
341 Err(e) => {
342 warn!("Failed to return List call {:?}", e);
343 Err(()) }
345 }
346 }))
347}
348
349fn handle_request_read_byte(
350 request: Request,
351 i2cbus: &Client<hyper::client::ResponseFuture>,
352 context: &make_context_ty!(
353 ContextBuilder,
354 EmptyContext,
355 Option<AuthData>,
356 XSpanIdString
357 ),
358 bus_id: BusId,
359 addr: Addr,
360) -> Box<dyn Future<Item = (), Error = ()>> {
361 Box::new(
362 i2cbus
363 .i2c_bus_read_byte(*bus_id, *addr, context)
364 .then(|result| {
365 let response = match result {
366 Ok(x) => Response::ReadByte(x),
367 Err(e) => Response::Error(e),
368 };
369 match request.sender.send(response) {
370 Ok(_) => Ok(()),
371 Err(e) => {
372 warn!("Failed to return ReadByte call {:?}", e);
373 Err(()) }
375 }
376 }),
377 )
378}
379
380fn handle_request_read_bytes(
381 request: Request,
382 i2cbus: &Client<hyper::client::ResponseFuture>,
383 context: &make_context_ty!(
384 ContextBuilder,
385 EmptyContext,
386 Option<AuthData>,
387 XSpanIdString
388 ),
389 bus_id: BusId,
390 addr: Addr,
391 num_bytes: NumBytes,
392) -> Box<dyn Future<Item = (), Error = ()>> {
393 Box::new(
394 i2cbus
395 .i2c_bus_read_bytes(*bus_id, *addr, *num_bytes, context)
396 .then(|result| {
397 let response = match result {
398 Ok(x) => Response::ReadBytes(x),
399 Err(e) => Response::Error(e),
400 };
401 match request.sender.send(response) {
402 Ok(_) => Ok(()),
403 Err(e) => {
404 warn!("Failed to return ReadBytes call {:?}", e);
405 Err(()) }
407 }
408 }),
409 )
410}
411
412fn handle_request_read_reg(
413 request: Request,
414 i2cbus: &Client<hyper::client::ResponseFuture>,
415 context: &make_context_ty!(
416 ContextBuilder,
417 EmptyContext,
418 Option<AuthData>,
419 XSpanIdString
420 ),
421 bus_id: BusId,
422 addr: Addr,
423 reg: Reg,
424 num_bytes: NumBytes,
425) -> Box<dyn Future<Item = (), Error = ()>> {
426 Box::new(
427 i2cbus
428 .i2c_bus_read_reg(*bus_id, *addr, *reg, *num_bytes, context)
429 .then(|result| {
430 let response = match result {
431 Ok(x) => Response::ReadReg(x),
432 Err(e) => Response::Error(e),
433 };
434 match request.sender.send(response) {
435 Ok(_) => Ok(()),
436 Err(e) => {
437 warn!("Failed to return ReadReg call {:?}", e);
438 Err(()) }
440 }
441 }),
442 )
443}
444
445fn handle_request_write_byte(
446 request: Request,
447 i2cbus: &Client<hyper::client::ResponseFuture>,
448 context: &make_context_ty!(
449 ContextBuilder,
450 EmptyContext,
451 Option<AuthData>,
452 XSpanIdString
453 ),
454 bus_id: BusId,
455 addr: Addr,
456 value: Value,
457) -> Box<dyn Future<Item = (), Error = ()>> {
458 Box::new(
459 i2cbus
460 .i2c_bus_write_byte(*bus_id, *addr, *value, context)
461 .then(|result| {
462 let response = match result {
463 Ok(x) => Response::WriteByte(x),
464 Err(e) => Response::Error(e),
465 };
466 match request.sender.send(response) {
467 Ok(_) => Ok(()),
468 Err(e) => {
469 warn!("Failed to return WriteByte call {:?}", e);
470 Err(()) }
472 }
473 }),
474 )
475}
476
477fn handle_request_write_byte_reg(
478 request: Request,
479 i2cbus: &Client<hyper::client::ResponseFuture>,
480 context: &make_context_ty!(
481 ContextBuilder,
482 EmptyContext,
483 Option<AuthData>,
484 XSpanIdString
485 ),
486 bus_id: BusId,
487 addr: Addr,
488 reg: Reg,
489 value: Value,
490) -> Box<dyn Future<Item = (), Error = ()>> {
491 Box::new(
492 i2cbus
493 .i2c_bus_write_byte_reg(*bus_id, *addr, *reg, *value, context)
494 .then(|result| {
495 let response = match result {
496 Ok(x) => Response::WriteByteReg(x),
497 Err(e) => Response::Error(e),
498 };
499 match request.sender.send(response) {
500 Ok(_) => Ok(()),
501 Err(e) => {
502 warn!("Failed to return WriteByteReg call {:?}", e);
503 Err(()) }
505 }
506 }),
507 )
508}
509
510fn handle_request_write_bytes(
511 request: Request,
512 i2cbus: &Client<hyper::client::ResponseFuture>,
513 context: &make_context_ty!(
514 ContextBuilder,
515 EmptyContext,
516 Option<AuthData>,
517 XSpanIdString
518 ),
519 bus_id: BusId,
520 addr: Addr,
521 values: Values,
522) -> Box<dyn Future<Item = (), Error = ()>> {
523 Box::new(
524 i2cbus
525 .i2c_bus_write_bytes(*bus_id, *addr, values, context)
526 .then(|result| {
527 let response = match result {
528 Ok(x) => Response::WriteBytes(x),
529 Err(e) => Response::Error(e),
530 };
531 match request.sender.send(response) {
532 Ok(_) => Ok(()),
533 Err(e) => {
534 warn!("Failed to return WriteBytes call {:?}", e);
535 Err(()) }
537 }
538 }),
539 )
540}
541
542fn handle_request_write_bytes_reg(
543 request: Request,
544 i2cbus: &Client<hyper::client::ResponseFuture>,
545 context: &make_context_ty!(
546 ContextBuilder,
547 EmptyContext,
548 Option<AuthData>,
549 XSpanIdString
550 ),
551 bus_id: BusId,
552 addr: Addr,
553 reg: Reg,
554 values: Values,
555) -> Box<dyn Future<Item = (), Error = ()>> {
556 Box::new(
557 i2cbus
558 .i2c_bus_write_bytes_reg(*bus_id, *addr, *reg, values, context)
559 .then(|result| {
560 let response = match result {
561 Ok(x) => Response::WriteBytesReg(x),
562 Err(e) => Response::Error(e),
563 };
564 match request.sender.send(response) {
565 Ok(_) => Ok(()),
566 Err(e) => {
567 warn!("Failed to return WriteBytesReg call {:?}", e);
568 Err(()) }
570 }
571 }),
572 )
573}
574
575fn handle_request(
576 request: Request,
577 i2cbus: &Client<hyper::client::ResponseFuture>,
578 context: &make_context_ty!(
579 ContextBuilder,
580 EmptyContext,
581 Option<AuthData>,
582 XSpanIdString
583 ),
584) -> impl Future<Item = (), Error = ()> {
585 let ty = request.ty.clone();
586 match ty {
587 RequestType::Api => handle_request_api(request, i2cbus, context),
588 RequestType::List => handle_request_list(request, i2cbus, context),
589 RequestType::ReadByte { bus_id, addr } => {
590 handle_request_read_byte(request, i2cbus, context, bus_id, addr)
591 }
592 RequestType::ReadBytes {
593 bus_id,
594 addr,
595 num_bytes,
596 } => handle_request_read_bytes(request, i2cbus, context, bus_id, addr, num_bytes),
597 RequestType::ReadReg {
598 bus_id,
599 addr,
600 reg,
601 num_bytes,
602 } => handle_request_read_reg(request, i2cbus, context, bus_id, addr, reg, num_bytes),
603 RequestType::WriteByte {
604 bus_id,
605 addr,
606 value,
607 } => handle_request_write_byte(request, i2cbus, context, bus_id, addr, value),
608 RequestType::WriteByteReg {
609 bus_id,
610 addr,
611 reg,
612 value,
613 } => handle_request_write_byte_reg(request, i2cbus, context, bus_id, addr, reg, value),
614 RequestType::WriteBytes {
615 bus_id,
616 addr,
617 values,
618 } => handle_request_write_bytes(request, i2cbus, context, bus_id, addr, values),
619 RequestType::WriteBytesReg {
620 bus_id,
621 addr,
622 reg,
623 values,
624 } => handle_request_write_bytes_reg(request, i2cbus, context, bus_id, addr, reg, values),
625 }
626}