use errors::CoreError;
use event::{CoreEvent, NetworkEvent, NetworkTx};
use event_loop::{CoreMsg, CoreMsgTx};
use routing::{Event, MessageId, Response};
use std::sync::mpsc::Receiver;
pub fn run<T>(routing_rx: &Receiver<Event>, mut core_tx: CoreMsgTx<T>, net_tx: &NetworkTx)
where
T: 'static,
{
for it in routing_rx.iter() {
trace!("Received Routing Event: {:?}", it);
match it {
Event::Response { response, .. } => {
let (msg_id, event) = match get_core_event(response) {
Ok(val) => val,
Err(_) => break,
};
if !fire(&mut core_tx, msg_id, event) {
break;
}
}
Event::Terminate => {
if let Err(e) = net_tx.unbounded_send(NetworkEvent::Disconnected) {
trace!("Couldn't send NetworkEvent::Disconnected: {:?}", e);
}
break;
}
x => {
debug!(
"Routing Event {:?} is not handled in context of routing event loop.",
x
);
}
}
}
}
fn get_core_event(res: Response) -> Result<(MessageId, CoreEvent), CoreError> {
Ok(match res {
Response::ChangeMDataOwner { res, msg_id } |
Response::DelMDataUserPermissions { res, msg_id } |
Response::SetMDataUserPermissions { res, msg_id } |
Response::MutateMDataEntries { res, msg_id } |
Response::PutMData { res, msg_id } |
Response::PutIData { res, msg_id } |
Response::InsAuthKey { res, msg_id } |
Response::DelAuthKey { res, msg_id } => {
(msg_id, CoreEvent::Mutation(res.map_err(CoreError::from)))
}
Response::GetAccountInfo { res, msg_id } => {
(
msg_id,
CoreEvent::GetAccountInfo(res.map_err(CoreError::from)),
)
}
Response::GetIData { res, msg_id } => {
(msg_id, CoreEvent::GetIData(res.map_err(CoreError::from)))
}
Response::GetMData { res, msg_id } => {
(msg_id, CoreEvent::GetMData(res.map_err(CoreError::from)))
}
Response::GetMDataValue { res, msg_id } => {
(
msg_id,
CoreEvent::GetMDataValue(res.map_err(CoreError::from)),
)
}
Response::GetMDataVersion { res, msg_id } => {
(
msg_id,
CoreEvent::GetMDataVersion(res.map_err(CoreError::from)),
)
}
Response::GetMDataShell { res, msg_id } => {
(
msg_id,
CoreEvent::GetMDataShell(res.map_err(CoreError::from)),
)
}
Response::ListMDataEntries { res, msg_id } => {
(
msg_id,
CoreEvent::ListMDataEntries(res.map_err(CoreError::from)),
)
}
Response::ListMDataKeys { res, msg_id } => {
(
msg_id,
CoreEvent::ListMDataKeys(res.map_err(CoreError::from)),
)
}
Response::ListMDataValues { res, msg_id } => {
(
msg_id,
CoreEvent::ListMDataValues(res.map_err(CoreError::from)),
)
}
Response::ListMDataPermissions { res, msg_id } => {
(
msg_id,
CoreEvent::ListMDataPermissions(res.map_err(CoreError::from)),
)
}
Response::ListMDataUserPermissions { res, msg_id } => {
(
msg_id,
CoreEvent::ListMDataUserPermissions(res.map_err(CoreError::from)),
)
}
Response::ListAuthKeysAndVersion { res, msg_id } => {
(
msg_id,
CoreEvent::ListAuthKeysAndVersion(res.map_err(CoreError::from)),
)
}
})
}
fn fire<T: 'static>(core_tx: &mut CoreMsgTx<T>, msg_id: MessageId, event: CoreEvent) -> bool {
let msg = CoreMsg::new(move |client, _| {
client.fire_hook(&msg_id, event);
None
});
core_tx.unbounded_send(msg).is_ok()
}