use std::sync::{Mutex, Arc};
use std::collections::HashMap;
use std::marker::PhantomData;
use futures::prelude::*;
use futures::sync::{oneshot, oneshot::Sender as OneshotSender};
use crate::connector::Connector;
#[derive(Debug, PartialEq)]
pub enum Muxed<REQ, RESP> {
Request(REQ),
Response(RESP),
}
impl <REQ, RESP> Muxed<REQ, RESP> {
pub fn req(self) -> Option<REQ> {
match self {
Muxed::Request(req) => Some(req),
_ => None
}
}
pub fn resp(self) -> Option<RESP> {
match self {
Muxed::Response(resp) => Some(resp),
_ => None
}
}
}
pub struct Mux<ID, ADDR, REQ, RESP, ERR, SENDER> {
requests: Arc<Mutex<HashMap<ID, Box<OneshotSender<RESP>>>>>,
sender: Arc<Mutex<SENDER>>,
_addr: PhantomData<ADDR>,
_req: PhantomData<REQ>,
_err: PhantomData<ERR>,
}
impl <ID, ADDR, REQ, RESP, ERR, SENDER> Mux <ID, ADDR, REQ, RESP, ERR, SENDER>
where
ID: std::cmp::Eq + std::hash::Hash + std::fmt::Debug + Clone + 'static,
ADDR: std::fmt::Debug + 'static,
REQ: std::fmt::Debug + 'static,
RESP: std::fmt::Debug + 'static,
ERR: std::fmt::Debug + 'static,
SENDER: FnMut(ID, Muxed<REQ, RESP>, ADDR) -> Box<Future<Item=(), Error=ERR>> + 'static,
{
pub fn new(sender: SENDER) -> Mux<ID, ADDR, REQ, RESP, ERR, SENDER> {
Mux{
requests: Arc::new(Mutex::new(HashMap::new())),
sender: Arc::new(Mutex::new(sender)),
_addr: PhantomData,
_req: PhantomData,
_err: PhantomData,
}
}
pub fn handle(&mut self, id: ID, addr: ADDR, resp: Muxed<REQ, RESP>) -> Result<Option<(ADDR, REQ)>, ERR> {
let r = match resp {
Muxed::Request(req) => {
Some((addr, req))
},
Muxed::Response(resp) => {
if let Some(ch) = self.requests.lock().unwrap().remove(&id) {
ch.send(resp).unwrap();
} else {
info!("Response id: '{:?}', no request pending", id);
}
None
}
};
Ok(r)
}
}
impl <ID, ADDR, REQ, RESP, ERR, SENDER> Connector <ID, ADDR, REQ, RESP, ERR> for Mux <ID, ADDR, REQ, RESP, ERR, SENDER>
where
ID: std::cmp::Eq + std::hash::Hash + std::fmt::Debug + Clone + 'static,
ADDR: std::fmt::Debug + 'static,
REQ: std::fmt::Debug + 'static,
RESP: std::fmt::Debug + 'static,
ERR: std::fmt::Debug + 'static,
SENDER: FnMut(ID, Muxed<REQ, RESP>, ADDR) -> Box<Future<Item=(), Error=ERR>> + 'static,
{
fn request(&mut self, id: ID, addr: ADDR, req: REQ) -> Box<Future<Item=RESP, Error=ERR>> {
let (tx, rx) = oneshot::channel();
self.requests.lock().unwrap().insert(id.clone(), Box::new(tx));
let sender = self.sender.clone();
Box::new(futures::lazy(move || {
let sender = &mut *sender.lock().unwrap();
(sender)(id, Muxed::Request(req), addr)
})
.and_then(|_| {
rx.map_err(|_e| panic!() )
}))
}
fn respond(&mut self, id: ID, addr: ADDR, resp: RESP) -> Box<Future<Item=(), Error=ERR>> {
let sender = self.sender.clone();
Box::new(futures::lazy(move || {
let sender = &mut *sender.lock().unwrap();
(sender)(id, Muxed::Response(resp), addr)
}))
}
}