i2cbus_api/client/
remote.rs

1//!
2//! (c) Piers Finlayson 2018-2020
3//!
4//! This module provides a front-end to the API client implementation which
5//! makes it easy to use the client from within another Rust program, by
6//! taking care of running its own reactor to drive the asynchronous work
7//! implicit is working with a RESTful API.
8//!
9//! To use, create a new handle then call the methods on this object:
10//!
11//! ```
12//! // let handle = Handle::new("http://ip:port");
13//! // let rsp = handle.write_bytes(...);
14//! // Handle response
15//! // ...
16//! ```
17//!
18//! Calls can of course be chained together
19//!
20//! See pca9956b/src/http.rs for more comprehensive usage examples
21//!
22
23use tokio_core::reactor;
24use super::super::models::*;
25use super::Client;
26use futures::sync::{mpsc, oneshot};
27use futures::{Future, Stream};
28use std::sync::mpsc::SendError;
29use std::sync::Mutex;
30use std::thread;
31use swagger::{AuthData, ContextBuilder, EmptyContext, Has, Push, XSpanIdString, make_context, make_context_ty};
32use log::{warn, info, debug};
33
34#[allow(unused_imports)]
35use crate::{
36    Api, ApiError, ApiNoContext, ContextWrapperExt, I2cBusApiResponse, I2cBusListResponse,
37    I2cBusReadByteResponse, I2cBusReadBytesResponse, I2cBusReadRegResponse,
38    I2cBusWriteByteRegResponse, I2cBusWriteByteResponse, I2cBusWriteBytesRegResponse,
39    I2cBusWriteBytesResponse,
40};
41
42#[derive(Debug)]
43pub enum Response {
44    Error(ApiError),
45    Api(I2cBusApiResponse),
46    List(I2cBusListResponse),
47    ReadByte(I2cBusReadByteResponse),
48    ReadBytes(I2cBusReadBytesResponse),
49    ReadReg(I2cBusReadRegResponse),
50    WriteByte(I2cBusWriteByteResponse),
51    WriteByteReg(I2cBusWriteByteRegResponse),
52    WriteBytes(I2cBusWriteBytesResponse),
53    WriteBytesReg(I2cBusWriteBytesRegResponse),
54}
55
56#[derive(Clone)]
57pub enum RequestType {
58    Api,
59    List,
60    ReadByte {
61        bus_id: BusId,
62        addr: Addr,
63    },
64    ReadBytes {
65        bus_id: BusId,
66        addr: Addr,
67        num_bytes: NumBytes,
68    },
69    ReadReg {
70        bus_id: BusId,
71        addr: Addr,
72        reg: Reg,
73        num_bytes: NumBytes,
74    },
75    WriteByte {
76        bus_id: BusId,
77        addr: Addr,
78        value: Value,
79    },
80    WriteByteReg {
81        bus_id: BusId,
82        addr: Addr,
83        reg: Reg,
84        value: Value,
85    },
86    WriteBytes {
87        bus_id: BusId,
88        addr: Addr,
89        values: Values,
90    },
91    WriteBytesReg {
92        bus_id: BusId,
93        addr: Addr,
94        reg: Reg,
95        values: Values,
96    },
97}
98
99struct Request {
100    ty: RequestType,
101    sender: oneshot::Sender<Response>,
102}
103
104#[derive(Clone)]
105pub struct Handle {
106    sender: mpsc::UnboundedSender<Request>,
107}
108
109// fn: function name (should be one of Client::Api::fns)
110// fn_ret: fn OK return type (e.g. I2cBusWriteBytesResponse)
111// ty: RequestType and Response type (e.g. WriteBytes)
112// arg: the args for this function call
113macro_rules! make_api_call {
114    ( $fn:ident, $fn_ret:ident, $ty:ident, $( $arg:tt, $arg_ty:ident),* ) => {
115        pub fn $fn(
116            &self,
117            $( $arg: $arg_ty, )*
118        ) -> Box<dyn Future<Item = $fn_ret, Error = ApiError> + Send> {
119            let (rsp_tx, rsp_rx) = oneshot::channel::<Response>();
120            match self.sender.unbounded_send(Request {
121                ty: RequestType::$ty {
122                    $( $arg ),*
123                },
124                sender: rsp_tx,
125            }) {
126                Ok(_) => {
127                    Box::new(rsp_rx
128                        .map(|rsp| match rsp {
129                            Response::$ty(x) => x,
130                            Response::Error(e) => {
131                                $fn_ret::TransactionFailed(I2cBusError {
132                                    error: None,
133                                    description: Some(format!("Client API Error: {}", e).to_string()), // Would rather turn into ApiError but can't figure out how to do with futures
134                                })
135                            },
136                            _ => panic!("Hit invalid match arm"),
137                        })
138                        .map_err(|rsp| match rsp {
139                            oneshot::Canceled => ApiError("oneshot cancelled error".to_string()), // XXX Replace with error handling
140                        })
141                    )
142                },
143                Err(e) => Box::new(futures::future::err(ApiError(
144                    format!("unbounded_send failure {:?}", e).to_string(),
145                ))),
146            }
147        }
148    }
149}
150
151impl Handle {
152    pub fn new(url: &'static str) -> Handle {
153
154        let (s, r) = mpsc::unbounded::<Request>();
155
156        thread::spawn(move || {
157            //let mut rt = tokio::runtime::Runtime::new().unwrap();
158            let mut core = reactor::Core::new().unwrap();
159            let core_handle = core.handle();
160            let i2cbus = Client::try_new_http(url)
161                .unwrap_or_else(|_| panic!("Failed to connect to I2C bus at {}", url));
162            info!("Created connection to I2C bus at {}", url);
163            let context: make_context_ty!(
164                ContextBuilder,
165                EmptyContext,
166                Option<AuthData>,
167                XSpanIdString
168            ) = make_context!(
169                ContextBuilder,
170                EmptyContext,
171                None as Option<AuthData>,
172                XSpanIdString::default()
173            );
174            let i2cbus_loop = r
175                .map_err(|e| warn!("I2C bus API error = {:?}", e))
176                .for_each(move |request| {
177                    let future = handle_request(request, &i2cbus, &context);
178                    //rt.block_on(future);
179                    core_handle.spawn(future);
180                    Ok(())
181                });
182
183            core.run(i2cbus_loop)
184                .expect("Failed to start i2cbus reactor core loop.");
185        });
186
187        Handle { sender: s }
188    }
189
190    make_api_call!(
191        i2c_bus_read_byte,
192        I2cBusReadByteResponse,
193        ReadByte,
194        bus_id,
195        BusId,
196        addr,
197        Addr
198    );
199
200    make_api_call!(
201        i2c_bus_read_bytes,
202        I2cBusReadBytesResponse,
203        ReadBytes,
204        bus_id,
205        BusId,
206        addr,
207        Addr,
208        num_bytes,
209        NumBytes
210    );
211
212    make_api_call!(
213        i2c_bus_read_reg,
214        I2cBusReadRegResponse,
215        ReadReg,
216        bus_id,
217        BusId,
218        addr,
219        Addr,
220        reg,
221        Reg,
222        num_bytes,
223        NumBytes
224    );
225
226    make_api_call!(
227        i2c_bus_write_byte,
228        I2cBusWriteByteResponse,
229        WriteByte,
230        bus_id,
231        BusId,
232        addr,
233        Addr,
234        value,
235        Value
236    );
237
238    make_api_call!(
239        i2c_bus_write_byte_reg,
240        I2cBusWriteByteRegResponse,
241        WriteByteReg,
242        bus_id,
243        BusId,
244        addr,
245        Addr,
246        reg,
247        Reg,
248        value,
249        Value
250    );
251
252    make_api_call!(
253        i2c_bus_write_bytes,
254        I2cBusWriteBytesResponse,
255        WriteBytes,
256        bus_id,
257        BusId,
258        addr,
259        Addr,
260        values,
261        Values
262    );
263
264    make_api_call!(
265        i2c_bus_write_bytes_reg,
266        I2cBusWriteBytesRegResponse,
267        WriteBytesReg,
268        bus_id,
269        BusId,
270        addr,
271        Addr,
272        reg,
273        Reg,
274        values,
275        Values
276    );
277
278}
279
280fn handle_receiver(
281    rsp: oneshot::Receiver<Response>,
282) -> Box<dyn Future<Item = I2cBusWriteBytesResponse, Error = ApiError> + Send> {
283    Box::new(rsp.then(|r| {
284        if true {
285            return Box::new(futures::future::ok(
286                I2cBusWriteBytesResponse::TransactionFailed(I2cBusError {
287                    error: None,
288                    description: None,
289                }),
290            ));
291        } else {
292            return Box::new(futures::future::err(ApiError(
293                "unbounded_send failure".to_string(), // Can be API Error
294            )));
295        }
296    }))
297}
298
299fn handle_request_api(
300    request: Request,
301    i2cbus: &Client<hyper::client::ResponseFuture>,
302    context: &make_context_ty!(
303        ContextBuilder,
304        EmptyContext,
305        Option<AuthData>,
306        XSpanIdString
307    ),
308) -> Box<dyn Future<Item = (), Error = ()>> {
309    Box::new(i2cbus.i2c_bus_api(context).then(|result| {
310        let response = match result {
311            Ok(x) => Response::Api(x),
312            Err(e) => Response::Error(e),
313        };
314        match request.sender.send(response) {
315            Ok(_) => Ok(()),
316            Err(e) => {
317                warn!("Failed to return Api call {:?}", e);
318                Err(()) // Can only return Err(()) to the handle thread, which can't really do anything about it - this is going to lead to a hung request
319            }
320        }
321    }))
322}
323
324fn handle_request_list(
325    request: Request,
326    i2cbus: &Client<hyper::client::ResponseFuture>,
327    context: &make_context_ty!(
328        ContextBuilder,
329        EmptyContext,
330        Option<AuthData>,
331        XSpanIdString
332    ),
333) -> Box<dyn Future<Item = (), Error = ()>> {
334    Box::new(i2cbus.i2c_bus_list(context).then(|result| {
335        let response = match result {
336            Ok(x) => Response::List(x),
337            Err(e) => Response::Error(e),
338        };
339        match request.sender.send(response) {
340            Ok(_) => Ok(()),
341            Err(e) => {
342                warn!("Failed to return List call {:?}", e);
343                Err(()) // Can only return Err(()) to the handle thread, which can't really do anything about it - this is going to lead to a hung request
344            }
345        }
346    }))
347}
348
349fn handle_request_read_byte(
350    request: Request,
351    i2cbus: &Client<hyper::client::ResponseFuture>,
352    context: &make_context_ty!(
353        ContextBuilder,
354        EmptyContext,
355        Option<AuthData>,
356        XSpanIdString
357    ),
358    bus_id: BusId,
359    addr: Addr,
360) -> Box<dyn Future<Item = (), Error = ()>> {
361    Box::new(
362        i2cbus
363            .i2c_bus_read_byte(*bus_id, *addr, context)
364            .then(|result| {
365                let response = match result {
366                    Ok(x) => Response::ReadByte(x),
367                    Err(e) => Response::Error(e),
368                };
369                match request.sender.send(response) {
370                    Ok(_) => Ok(()),
371                    Err(e) => {
372                        warn!("Failed to return ReadByte call {:?}", e);
373                        Err(()) // Can only return Err(()) to the handle thread, which can't really do anything about it - this is going to lead to a hung request
374                    }
375                }
376            }),
377    )
378}
379
380fn handle_request_read_bytes(
381    request: Request,
382    i2cbus: &Client<hyper::client::ResponseFuture>,
383    context: &make_context_ty!(
384        ContextBuilder,
385        EmptyContext,
386        Option<AuthData>,
387        XSpanIdString
388    ),
389    bus_id: BusId,
390    addr: Addr,
391    num_bytes: NumBytes,
392) -> Box<dyn Future<Item = (), Error = ()>> {
393    Box::new(
394        i2cbus
395            .i2c_bus_read_bytes(*bus_id, *addr, *num_bytes, context)
396            .then(|result| {
397                let response = match result {
398                    Ok(x) => Response::ReadBytes(x),
399                    Err(e) => Response::Error(e),
400                };
401                match request.sender.send(response) {
402                    Ok(_) => Ok(()),
403                    Err(e) => {
404                        warn!("Failed to return ReadBytes call {:?}", e);
405                        Err(()) // Can only return Err(()) to the handle thread, which can't really do anything about it - this is going to lead to a hung request
406                    }
407                }
408            }),
409    )
410}
411
412fn handle_request_read_reg(
413    request: Request,
414    i2cbus: &Client<hyper::client::ResponseFuture>,
415    context: &make_context_ty!(
416        ContextBuilder,
417        EmptyContext,
418        Option<AuthData>,
419        XSpanIdString
420    ),
421    bus_id: BusId,
422    addr: Addr,
423    reg: Reg,
424    num_bytes: NumBytes,
425) -> Box<dyn Future<Item = (), Error = ()>> {
426    Box::new(
427        i2cbus
428            .i2c_bus_read_reg(*bus_id, *addr, *reg, *num_bytes, context)
429            .then(|result| {
430                let response = match result {
431                    Ok(x) => Response::ReadReg(x),
432                    Err(e) => Response::Error(e),
433                };
434                match request.sender.send(response) {
435                    Ok(_) => Ok(()),
436                    Err(e) => {
437                        warn!("Failed to return ReadReg call {:?}", e);
438                        Err(()) // Can only return Err(()) to the handle thread, which can't really do anything about it - this is going to lead to a hung request
439                    }
440                }
441            }),
442    )
443}
444
445fn handle_request_write_byte(
446    request: Request,
447    i2cbus: &Client<hyper::client::ResponseFuture>,
448    context: &make_context_ty!(
449        ContextBuilder,
450        EmptyContext,
451        Option<AuthData>,
452        XSpanIdString
453    ),
454    bus_id: BusId,
455    addr: Addr,
456    value: Value,
457) -> Box<dyn Future<Item = (), Error = ()>> {
458    Box::new(
459        i2cbus
460            .i2c_bus_write_byte(*bus_id, *addr, *value, context)
461            .then(|result| {
462                let response = match result {
463                    Ok(x) => Response::WriteByte(x),
464                    Err(e) => Response::Error(e),
465                };
466                match request.sender.send(response) {
467                    Ok(_) => Ok(()),
468                    Err(e) => {
469                        warn!("Failed to return WriteByte call {:?}", e);
470                        Err(()) // Can only return Err(()) to the handle thread, which can't really do anything about it - this is going to lead to a hung request
471                    }
472                }
473            }),
474    )
475}
476
477fn handle_request_write_byte_reg(
478    request: Request,
479    i2cbus: &Client<hyper::client::ResponseFuture>,
480    context: &make_context_ty!(
481        ContextBuilder,
482        EmptyContext,
483        Option<AuthData>,
484        XSpanIdString
485    ),
486    bus_id: BusId,
487    addr: Addr,
488    reg: Reg,
489    value: Value,
490) -> Box<dyn Future<Item = (), Error = ()>> {
491    Box::new(
492        i2cbus
493            .i2c_bus_write_byte_reg(*bus_id, *addr, *reg, *value, context)
494            .then(|result| {
495                let response = match result {
496                    Ok(x) => Response::WriteByteReg(x),
497                    Err(e) => Response::Error(e),
498                };
499                match request.sender.send(response) {
500                    Ok(_) => Ok(()),
501                    Err(e) => {
502                        warn!("Failed to return WriteByteReg call {:?}", e);
503                        Err(()) // Can only return Err(()) to the handle thread, which can't really do anything about it - this is going to lead to a hung request
504                    }
505                }
506            }),
507    )
508}
509
510fn handle_request_write_bytes(
511    request: Request,
512    i2cbus: &Client<hyper::client::ResponseFuture>,
513    context: &make_context_ty!(
514        ContextBuilder,
515        EmptyContext,
516        Option<AuthData>,
517        XSpanIdString
518    ),
519    bus_id: BusId,
520    addr: Addr,
521    values: Values,
522) -> Box<dyn Future<Item = (), Error = ()>> {
523    Box::new(
524        i2cbus
525            .i2c_bus_write_bytes(*bus_id, *addr, values, context)
526            .then(|result| {
527                let response = match result {
528                    Ok(x) => Response::WriteBytes(x),
529                    Err(e) => Response::Error(e),
530                };
531                match request.sender.send(response) {
532                    Ok(_) => Ok(()),
533                    Err(e) => {
534                        warn!("Failed to return WriteBytes call {:?}", e);
535                        Err(()) // Can only return Err(()) to the handle thread, which can't really do anything about it - this is going to lead to a hung request
536                    }
537                }
538            }),
539    )
540}
541
542fn handle_request_write_bytes_reg(
543    request: Request,
544    i2cbus: &Client<hyper::client::ResponseFuture>,
545    context: &make_context_ty!(
546        ContextBuilder,
547        EmptyContext,
548        Option<AuthData>,
549        XSpanIdString
550    ),
551    bus_id: BusId,
552    addr: Addr,
553    reg: Reg,
554    values: Values,
555) -> Box<dyn Future<Item = (), Error = ()>> {
556    Box::new(
557        i2cbus
558            .i2c_bus_write_bytes_reg(*bus_id, *addr, *reg, values, context)
559            .then(|result| {
560                let response = match result {
561                    Ok(x) => Response::WriteBytesReg(x),
562                    Err(e) => Response::Error(e),
563                };
564                match request.sender.send(response) {
565                    Ok(_) => Ok(()),
566                    Err(e) => {
567                        warn!("Failed to return WriteBytesReg call {:?}", e);
568                        Err(()) // Can only return Err(()) to the handle thread, which can't really do anything about it - this is going to lead to a hung request
569                    }
570                }
571            }),
572    )
573}
574
575fn handle_request(
576    request: Request,
577    i2cbus: &Client<hyper::client::ResponseFuture>,
578    context: &make_context_ty!(
579        ContextBuilder,
580        EmptyContext,
581        Option<AuthData>,
582        XSpanIdString
583    ),
584) -> impl Future<Item = (), Error = ()> {
585    let ty = request.ty.clone();
586    match ty {
587        RequestType::Api => handle_request_api(request, i2cbus, context),
588        RequestType::List => handle_request_list(request, i2cbus, context),
589        RequestType::ReadByte { bus_id, addr } => {
590            handle_request_read_byte(request, i2cbus, context, bus_id, addr)
591        }
592        RequestType::ReadBytes {
593            bus_id,
594            addr,
595            num_bytes,
596        } => handle_request_read_bytes(request, i2cbus, context, bus_id, addr, num_bytes),
597        RequestType::ReadReg {
598            bus_id,
599            addr,
600            reg,
601            num_bytes,
602        } => handle_request_read_reg(request, i2cbus, context, bus_id, addr, reg, num_bytes),
603        RequestType::WriteByte {
604            bus_id,
605            addr,
606            value,
607        } => handle_request_write_byte(request, i2cbus, context, bus_id, addr, value),
608        RequestType::WriteByteReg {
609            bus_id,
610            addr,
611            reg,
612            value,
613        } => handle_request_write_byte_reg(request, i2cbus, context, bus_id, addr, reg, value),
614        RequestType::WriteBytes {
615            bus_id,
616            addr,
617            values,
618        } => handle_request_write_bytes(request, i2cbus, context, bus_id, addr, values),
619        RequestType::WriteBytesReg {
620            bus_id,
621            addr,
622            reg,
623            values,
624        } => handle_request_write_bytes_reg(request, i2cbus, context, bus_id, addr, reg, values),
625    }
626}