extern crate capnp;
#[allow(unused_imports)]
use std::io::Read;
use result;
use result::Result;
use std::mem;
use std::sync::mpsc::{Sender, Receiver, SyncSender};
use std::sync::mpsc::sync_channel;
use scheduler::CompMsg;
pub struct Msg {
pub vec: Vec<u8>,
pub action: String,
reader: Option<capnp::message::Reader<capnp::serialize::OwnedSegments>>,
builder: Option<capnp::message::Builder<capnp::message::HeapAllocator>>,
}
impl Msg {
pub fn new() -> Self {
Msg { vec: vec![],
action: String::new(),
reader: None,
builder: None,
}
}
pub fn read_schema<'a, T: capnp::traits::FromPointerReader<'a>>(&'a mut self) -> Result<T> {
let msg = try!(capnp::serialize::read_message(&mut &self.vec[..], capnp::message::ReaderOptions::new()));
self.reader = Some(msg);
Ok(try!(self.reader.as_ref().unwrap().get_root()))
}
pub fn build_schema<'a, T: capnp::traits::FromPointerBuilder<'a>>(&'a mut self) -> T {
let msg = capnp::message::Builder::new_default();
self.builder = Some(msg);
self.builder.as_mut().unwrap().init_root()
}
pub fn edit_schema<'a, T: capnp::traits::FromPointerBuilder<'a>,
U: capnp::traits::FromPointerReader<'a> + capnp::traits::SetPointerBuilder<T>>
(&'a mut self) -> Result<T> {
let reader = try!(capnp::serialize::read_message(&mut &self.vec[..], capnp::message::ReaderOptions::new()));
self.reader = Some(reader);
let reader: U = try!(self.reader.as_ref().unwrap().get_root());
let mut msg = capnp::message::Builder::new_default();
try!(msg.set_root(reader));
self.builder = Some(msg);
Ok(try!(self.builder.as_mut().unwrap().get_root()))
}
pub fn before_send(&mut self) -> Result<()> {
let mut build = mem::replace(&mut self.builder, None);
if let Some(ref mut b) = build {
self.vec.clear();
try!(capnp::serialize::write_message(&mut self.vec, b))
}
Ok(())
}
}
impl Clone for Msg {
fn clone(&self) -> Self {
Msg {
vec: self.vec.clone(),
action: self.action.clone(),
reader: None,
builder: None,
}
}
}
#[derive(Clone)]
pub struct MsgSender {
pub sender: SyncSender<Msg>,
pub dest: usize,
pub sched: Sender<CompMsg>,
must_sched: bool,
}
impl MsgSender {
pub fn send(&self, mut msg: Msg) -> Result<()> {
try!(msg.before_send());
try!(self.sender.send(msg));
if self.must_sched {
try!(self.sched.send(CompMsg::Inc(self.dest)));
}
Ok(())
}
}
pub trait OutputSend {
fn send(&self, msg:Msg) -> Result<()>;
}
impl OutputSend for Option<MsgSender> {
fn send(&self, msg: Msg) -> Result<()> {
if let &Some(ref sender) = self {
sender.send(msg)?;
Ok(())
} else {
Err(result::Error::OutputNotConnected)
}
}
}
pub struct MsgReceiver {
id: usize,
recv: Receiver<Msg>,
sender: MsgSender,
sched: Sender<CompMsg>,
must_sched: bool,
}
impl MsgReceiver {
pub fn new(id: usize, sched: Sender<CompMsg>, must_sched: bool) -> (MsgReceiver, MsgSender) {
let (s, r) = sync_channel(25);
let s = MsgSender {
sender: s,
dest: id,
must_sched: must_sched,
sched: sched.clone(),
};
let r = MsgReceiver {
recv: r,
id: id,
sender: s.clone(),
sched: sched,
must_sched: must_sched,
};
(r, s)
}
pub fn recv(&self) -> Result<Msg> {
let msg = try!(self.recv.recv());
if self.must_sched {
try!(self.sched.send(CompMsg::Dec(self.id)));
}
Ok(msg)
}
pub fn try_recv(&self) -> Result<Msg> {
let msg = self.recv.try_recv()?;
if self.must_sched {
try!(self.sched.send(CompMsg::Dec(self.id)));
}
Ok(msg)
}
pub fn get_sender(&self) -> MsgSender {
self.sender.clone()
}
}