extern crate libloading;
extern crate threadpool;
use self::threadpool::ThreadPool;
use result;
use result::Result;
use ports::{MsgSender, MsgReceiver, Msg};
use agent::Agent;
use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::mpsc::{Sender, Receiver};
use std::sync::mpsc::channel;
use std::thread;
use std::thread::JoinHandle;
use std::mem;
pub type BoxedComp = Box<Agent + Send>;
pub enum CompMsg {
NewAgent(usize, String, BoxedComp),
Halt,
HaltState,
Start(usize),
ConnectOutputPort(usize, String, MsgSender),
ConnectOutputArrayPort(usize, String, String, MsgSender),
Disconnect(usize, String),
DisconnectArray(usize, String, String),
AddInputArrayElement(usize, String, String, MsgReceiver),
RemoveInputArrayElement(usize, String, String),
AddOutputArrayElement(usize, String, String),
RunEnd(usize, BoxedComp, Result<Signal>),
SetReceiver(usize, String, Receiver<Msg>),
Inc(usize),
Dec(usize),
Remove(usize, Sender<SyncMsg>),
}
pub enum Signal {
End,
Continue,
}
pub struct Comp {
pub id: usize,
pub inputs: HashMap<String, MsgSender>,
pub inputs_array: HashMap<String, HashMap<String, MsgSender>>,
pub sort: String,
pub start: bool,
}
pub struct Scheduler {
pub cache: AgentCache,
pub agents: HashMap<String, Comp>,
pub sender: Sender<CompMsg>,
pub error_receiver: Receiver<result::Error>,
id: usize,
th: JoinHandle<()>,
}
impl Scheduler {
pub fn new() -> Self {
let (s, r) = channel();
let (error_s, error_r) = channel();
let mut sched_s = SchedState::new(s.clone());
let th = thread::spawn(move || {
loop {
let msg = r.recv().expect("no message received");
let res: Result<()> = match msg {
CompMsg::NewAgent(id, name, comp) => { sched_s.new_agent(id, name, comp) },
CompMsg::Start(name) => { sched_s.start(name) },
CompMsg::Halt => { break; },
CompMsg::HaltState => { sched_s.halt() },
CompMsg::RunEnd(name, boxed_comp, res) => { sched_s.run_end(name, boxed_comp, res) },
CompMsg::AddInputArrayElement(name, port, element, recv) => {
sched_s.edit_agent(name, EditCmp::AddInputArrayElement(port, element, recv))
},
CompMsg::RemoveInputArrayElement(name, port, element) => {
sched_s.edit_agent(name, EditCmp::RemoveInputArrayElement(port, element))
},
CompMsg::AddOutputArrayElement(name, port, element) => {
sched_s.edit_agent(name, EditCmp::AddOutputArrayElement(port, element))
},
CompMsg::ConnectOutputPort(comp_out, port_out, sender) => {
sched_s.edit_agent(comp_out, EditCmp::ConnectOutputPort(port_out, sender))
},
CompMsg::ConnectOutputArrayPort(comp_out, port_out, element_out, sender) => {
sched_s.edit_agent(comp_out, EditCmp::ConnectOutputArrayPort(port_out, element_out, sender))
},
CompMsg::SetReceiver(comp, port, receiver) => {
sched_s.edit_agent(comp, EditCmp::SetReceiver(port, receiver))
},
CompMsg::Disconnect(name, port) => {
sched_s.edit_agent(name, EditCmp::Disconnect(port))
},
CompMsg::DisconnectArray(name, port, element) => {
sched_s.edit_agent(name, EditCmp::DisconnectArray(port, element))
},
CompMsg::Inc(dest) => { sched_s.inc(dest) },
CompMsg::Dec(dest) => { sched_s.dec(dest) },
CompMsg::Remove(name, sync_sender) => {
sched_s.remove(name, sync_sender)
}
};
res.map_err(|e| { error_s.send(e).expect("cannot send the error"); }).ok();
}
});
Scheduler {
cache: AgentCache::new(),
agents: HashMap::new(),
sender: s,
error_receiver: error_r,
th: th,
id: 0,
}
}
pub fn add_node<'a, A, B>(&mut self, name: A, sort: B) -> Result<()> where
A: Into<Cow<'a, str>>,
B: Into<Cow<'a, str>>
{
let name = name.into().into_owned();
let sort = sort.into().into_owned();
let (comp, senders) = self.cache.create_comp(&sort, self.id, self.sender.clone()).expect("cannot create comp");
let start = !comp.is_input_ports();
self.sender.send(CompMsg::NewAgent(self.id, name.clone(), comp)).expect("Cannot send to sched state");
let s_acc = try!(senders.get("accumulator").ok_or(result::Error::PortNotFound(name.clone(), "accumulator".into()))).clone();
self.agents.insert(name.clone(),
Comp {
id: self.id,
inputs: senders,
inputs_array: HashMap::new(),
sort: sort,
start: start,
});
self.sender.send(CompMsg::ConnectOutputPort(self.id, "accumulator".into(), s_acc)).expect("Cannot send to sched state");
self.id += 1;
Ok(())
}
pub fn start(&self) {
for comp in self.agents.values() {
if comp.start {
self.sender.send(CompMsg::Start(comp.id)).expect("start: unable to send to sched state");
}
}
}
pub fn start_if_needed<'a, A: Into<Cow<'a, str>>>(&self, name: A) -> Result<()> {
let name = name.into().into_owned();
self.agents.get(&name).ok_or(result::Error::AgentNotFound(name.clone()))
.and_then(|comp| {
if comp.start {
self.sender.send(CompMsg::Start(comp.id)).expect("start_if_needed");
}
Ok(())
})
}
pub fn start_agent<'a, A>(&self, name: A) -> Result<()> where
A: Into<Cow<'a, str>>
{
let name = name.into();
let comp = self.agents.get(&name as &str).ok_or(result::Error::AgentNotFound(name.into_owned()))?;
self.sender.send(CompMsg::Start(comp.id)).expect("start: unable to send to sched state");
Ok(())
}
pub fn remove_agent<'a, A: Into<Cow<'a, str>>>(&mut self, name: A) -> Result<(BoxedComp, Comp)>{
let name = name.into().into_owned();
let (s, r) = channel();
{
let comp = self.agents.get(&name).ok_or(result::Error::AgentNotFound(name.clone()))?;
self.sender.send(CompMsg::Remove(comp.id, s)).expect("Scheduler remove_agent: cannot send to the state");
}
let response = try!(r.recv());
match response {
SyncMsg::Remove(boxed_comp) => {
Ok((boxed_comp, try!(self.agents.remove(&name).ok_or(result::Error::AgentNotFound(name.into())))))
},
SyncMsg::CannotRemove => {
Err(result::Error::CannotRemove(name))
},
}
}
pub fn connect<'a, A, B, C, D>(&self, comp_out: A, port_out: B, comp_in: C, port_in: D) -> Result<()> where
A: Into<Cow<'a, str>>,
B: Into<Cow<'a, str>>,
C: Into<Cow<'a, str>>,
D: Into<Cow<'a, str>>
{
let comp_out = comp_out.into().into_owned();
let port_out = port_out.into().into_owned();
let comp_in = &*(comp_in.into());
let port_in = &*(port_in.into());
let sort_in = self.agents.get(comp_in).ok_or(result::Error::AgentNotFound(comp_in.into()))?;
let sort_out = self.agents.get(&comp_out).ok_or(result::Error::AgentNotFound(comp_out.clone()))?;
let in_schema = self.cache.get_schema_input(&sort_in.sort, port_in)?;
let out_schema = self.cache.get_schema_output(&sort_out.sort, &port_out)?;
if in_schema != "any" && out_schema != "any" && in_schema != out_schema {
return Err(result::Error::BadSchema(comp_out.clone(), port_out.clone(), out_schema, comp_in.into(), port_in.into(), in_schema));
}
let sender = try!(self.get_sender(comp_in, port_in));
let comp = self.agents.get(&comp_out).ok_or(result::Error::AgentNotFound(comp_out.clone()))?;
self.sender.send(CompMsg::ConnectOutputPort(comp.id, port_out, sender)).ok().expect("Scheduler connect: unable to send to sched state");
Ok(())
}
pub fn connect_array<'a, A, B, C, D, E>(&self, comp_out: A, port_out: B, element_out: C, comp_in: D, port_in: E) -> Result<()> where
A: Into<Cow<'a, str>>,
B: Into<Cow<'a, str>>,
C: Into<Cow<'a, str>>,
D: Into<Cow<'a, str>>,
E: Into<Cow<'a, str>>
{
let comp_out = comp_out.into().into_owned();
let port_out = port_out.into().into_owned();
let element_out = element_out.into().into_owned();
let comp_in = &*(comp_in.into());
let port_in = &*(port_in.into());
let sort_in = self.agents.get(comp_in).ok_or(result::Error::AgentNotFound(comp_in.into()))?;
let sort_out = self.agents.get(&comp_out).ok_or(result::Error::AgentNotFound(comp_out.clone()))?;
let in_schema = self.cache.get_schema_input(&sort_in.sort, port_in)?;
let out_schema = self.cache.get_schema_output_array(&sort_out.sort, &port_out)?;
if in_schema != "any" && out_schema != "any" && in_schema != out_schema {
return Err(result::Error::BadSchema(comp_out.clone(), port_out.clone(), out_schema, comp_in.into(), port_in.into(), in_schema));
}
let sender = try!(self.get_sender(comp_in, port_in));
let comp = self.agents.get(&comp_out).ok_or(result::Error::AgentNotFound(comp_out.clone()))?;
self.sender.send(CompMsg::ConnectOutputArrayPort(comp.id, port_out, element_out, sender)).ok().expect("Scheduler connect: unable to send to scheduler state");
Ok(())
}
pub fn connect_to_array<'a, A, B, C, D, E>(&self, comp_out: A, port_out: B, comp_in: C, port_in: D, element_in: E) -> Result<()> where
A: Into<Cow<'a, str>>,
B: Into<Cow<'a, str>>,
C: Into<Cow<'a, str>>,
D: Into<Cow<'a, str>>,
E: Into<Cow<'a, str>>
{
let comp_out = comp_out.into().into_owned();
let port_out = port_out.into().into_owned();
let comp_in = &*(comp_in.into());
let port_in = &*(port_in.into());
let element_in = &*(element_in.into());
let sort_in = self.agents.get(comp_in).ok_or(result::Error::AgentNotFound(comp_in.into()))?;
let sort_out = self.agents.get(&comp_out).ok_or(result::Error::AgentNotFound(comp_out.clone()))?;
let in_schema = self.cache.get_schema_input_array(&sort_in.sort, port_in)?;
let out_schema = self.cache.get_schema_output(&sort_out.sort, &port_out)?;
if in_schema != "any" && out_schema != "any" && in_schema != out_schema {
return Err(result::Error::BadSchema(comp_out.clone(), port_out.clone(), out_schema, comp_in.into(), port_in.into(), in_schema));
}
let sender = try!(self.get_array_sender(comp_in, port_in, element_in));
let comp = self.agents.get(&comp_out).ok_or(result::Error::AgentNotFound(comp_out.clone()))?;
self.sender.send(CompMsg::ConnectOutputPort(comp.id, port_out, sender)).ok().expect("Scheduler connect: unable to send to scheduler state");
Ok(())
}
pub fn connect_array_to_array<'a, A, B, C, D, E, F>(&self, comp_out: A, port_out: B, element_out: C, comp_in: D, port_in: E, element_in: F) -> Result<()> where
A: Into<Cow<'a, str>>,
B: Into<Cow<'a, str>>,
C: Into<Cow<'a, str>>,
D: Into<Cow<'a, str>>,
E: Into<Cow<'a, str>>,
F: Into<Cow<'a, str>>
{
let comp_out = comp_out.into().into_owned();
let port_out = port_out.into().into_owned();
let element_out = element_out.into().into_owned();
let comp_in = &*(comp_in.into());
let port_in = &*(port_in.into());
let element_in = &*(element_in.into());
let sort_in = self.agents.get(comp_in).ok_or(result::Error::AgentNotFound(comp_in.into()))?;
let sort_out = self.agents.get(&comp_out).ok_or(result::Error::AgentNotFound(comp_out.clone()))?;
let in_schema = self.cache.get_schema_input_array(&sort_in.sort, port_in)?;
let out_schema = self.cache.get_schema_output_array(&sort_out.sort, &port_out)?;
if in_schema != "any" && out_schema != "any" && in_schema != out_schema {
return Err(result::Error::BadSchema(comp_out.clone(), port_out.clone(), out_schema, comp_in.into(), port_in.into(), in_schema));
}
let sender = try!(self.get_array_sender(comp_in, port_in, element_in));
let comp = self.agents.get(&comp_out).ok_or(result::Error::AgentNotFound(comp_out.clone()))?;
self.sender.send(CompMsg::ConnectOutputArrayPort(comp.id, port_out, element_out, sender)).ok().expect("Scheduler connect: unable to send to scheduler state");
Ok(())
}
pub fn disconnect<'a, A, B>(&self, comp_out: A, port_out: B) -> Result<()> where
A: Into<Cow<'a, str>>,
B: Into<Cow<'a, str>>,
{
let comp_out = comp_out.into().into_owned();
let port_out = port_out.into().into_owned();
let comp = self.agents.get(&comp_out).ok_or(result::Error::AgentNotFound(comp_out.clone()))?;
self.sender.send(CompMsg::Disconnect(comp.id, port_out)).ok().expect("Scheduler disconnect: unable to send to scheduler state");
Ok(())
}
pub fn disconnect_array<'a, A, B, C>(&self, comp_out: A, port_out: B, element: C) -> Result<()> where
A: Into<Cow<'a, str>>,
B: Into<Cow<'a, str>>,
C: Into<Cow<'a, str>>,
{
let comp_out = comp_out.into().into_owned();
let port_out = port_out.into().into_owned();
let element = element.into().into_owned();
let comp = self.agents.get(&comp_out).ok_or(result::Error::AgentNotFound(comp_out.clone()))?;
self.sender.send(CompMsg::DisconnectArray(comp.id, port_out, element)).ok().expect("Scheduler disconnect_array: unable to send to scheduler state");
Ok(())
}
pub fn add_input_array_element<'a, A, B, C>(&mut self, comp_name: A, port: B, element: C) -> Result<()> where
A: Into<Cow<'a, str>>,
B: Into<Cow<'a, str>>,
C: Into<Cow<'a, str>>,
{
let comp_name = comp_name.into().into_owned();
let port = port.into().into_owned();
let element = element.into().into_owned();
let comp_id = self.agents.get(&comp_name).ok_or(result::Error::AgentNotFound(comp_name.clone()))?.id;
let (r, s) = MsgReceiver::new(
comp_id,
self.sender.clone(),
true
);
try!(self.agents.get_mut(&comp_name).ok_or(result::Error::AgentNotFound(comp_name.clone()))
.and_then(|mut comp| {
if !comp.inputs_array.contains_key(&port) {
comp.inputs_array.insert(port.clone(), HashMap::new());
}
comp.inputs_array.get_mut(&port).ok_or(result::Error::ElementNotFound(comp_name.clone(), port.clone(), element.clone()))
.and_then(|mut port| {
port.insert(element.clone(), s);
Ok(())
})
}));
self.sender.send(CompMsg::AddInputArrayElement(comp_id, port, element, r)).ok().expect("Scheduler add_input_array_element : Unable to send to scheduler state");
Ok(())
}
pub fn soft_add_input_array_element<'a, A, B, C>(&mut self, comp: A, port: B, element: C) -> Result<()> where
A: Into<Cow<'a, str>>,
B: Into<Cow<'a, str>>,
C: Into<Cow<'a, str>>
{
let mut res = true;
let comp = comp.into();
let port = port.into();
let element = element.into();
{
let c = &(comp) as &str;
let p = &(port) as &str;
let e = &(element) as &str;
if let Some(comp) = self.agents.get(c) {
if let Some(port) = comp.inputs_array.get(p) {
if let Some(_) = port.get(e) {
res = false;
}
}
}
}
if res {
self.add_input_array_element(comp, port, element)
} else {
Ok(())
}
}
pub fn add_output_array_element<'a, A, B, C>(&self, comp: A, port: B, element: C) -> Result<()> where
A: Into<Cow<'a, str>>,
B: Into<Cow<'a, str>>,
C: Into<Cow<'a, str>>
{
let comp = comp.into();
let port = port.into().into_owned();
let element = element.into().into_owned();
let comp = self.agents.get(&comp as &str).ok_or(result::Error::AgentNotFound(comp.into_owned()))?;
self.sender.send(CompMsg::AddOutputArrayElement(comp.id, port, element)).ok().expect("Scheduler add_output_array_element : Unable to send to scheduler state");
Ok(())
}
pub fn set_receiver<'a, A, B>(&self, comp: A, port: B, receiver: Receiver<Msg>) -> Result<()> where
A: Into<Cow<'a, str>>,
B: Into<Cow<'a, str>>,
{
let comp = comp.into();
let port = port.into().into_owned();
let comp = self.agents.get(&comp as &str).ok_or(result::Error::AgentNotFound(comp.into_owned()))?;
self.sender.send(CompMsg::SetReceiver(comp.id, port, receiver)).expect("scheduler cannot send");
Ok(())
}
pub fn set_array_receiver<'a, A, B, C>(&self, comp: A, port: B, element: C, receiver: MsgReceiver) -> Result<()> where
A: Into<Cow<'a, str>>,
B: Into<Cow<'a, str>>,
C: Into<Cow<'a, str>>
{
let comp = comp.into();
let port = port.into().into_owned();
let element = element.into().into_owned();
let comp = self.agents.get(&comp as &str).ok_or(result::Error::AgentNotFound(comp.into_owned()))?;
self.sender.send(CompMsg::AddInputArrayElement(comp.id, port, element, receiver)).expect("scheduler cannot send");
Ok(())
}
pub fn get_sender<'a, A, B>(&self, comp: A, port: B) -> Result<MsgSender> where
A: Into<Cow<'a, str>>,
B: Into<Cow<'a, str>>,
{
let comp = comp.into();
let port = port.into();
self.agents.get(&comp as &str).ok_or(result::Error::AgentNotFound(comp.to_string()))
.and_then(|c| {
c.inputs.get(&port as &str).ok_or(result::Error::PortNotFound(comp.to_string(), port.to_string()))
.map(|s| { s.clone() })
})
}
pub fn get_array_sender<'a, A, B, C>(&self, comp: A, port: B, element: C) -> Result<MsgSender> where
A: Into<Cow<'a, str>>,
B: Into<Cow<'a, str>>,
C: Into<Cow<'a, str>>,
{
let comp = comp.into();
let port = port.into();
let element = element.into();
self.agents.get(&comp as &str).ok_or(result::Error::AgentNotFound(comp.to_string()))
.and_then(|c| {
c.inputs_array.get(&port as &str).ok_or(result::Error::PortNotFound(comp.to_string(), port.to_string()))
.and_then(|p| {
p.get(&element as &str).ok_or(result::Error::ElementNotFound(comp.to_string(), port.to_string(), element.to_string()))
.map(|s| { s.clone() })
})
})
}
pub fn get_schema_input<'a, A, B>(&self, comp: A, port: B) -> Result<String> where
A: Into<Cow<'a, str>>,
B: Into<Cow<'a, str>>,
{
let comp = comp.into();
let port = port.into();
self.agents.get(&comp as &str).ok_or(result::Error::AgentNotFound(comp.to_string()))
.and_then(|c| {
self.cache.get_schema_input(&c.sort, &port as &str)
})
}
pub fn get_schema_input_array<'a, A, B>(&self, comp: A, port: B) -> Result<String> where
A: Into<Cow<'a, str>>,
B: Into<Cow<'a, str>>,
{
let comp = comp.into();
let port = port.into();
self.agents.get(&comp as &str).ok_or(result::Error::AgentNotFound(comp.to_string()))
.and_then(|c| {
self.cache.get_schema_input_array(&c.sort, &port as &str)
})
}
pub fn get_schema_output<'a, A, B>(&self, comp: A, port: B) -> Result<String> where
A: Into<Cow<'a, str>>,
B: Into<Cow<'a, str>>,
{
let comp = comp.into();
let port = port.into();
self.agents.get(&comp as &str).ok_or(result::Error::AgentNotFound(comp.to_string()))
.and_then(|c| {
self.cache.get_schema_output(&c.sort, &port as &str)
})
}
pub fn get_schema_output_array<'a, A, B>(&self, comp: A, port: B) -> Result<String> where
A: Into<Cow<'a, str>>,
B: Into<Cow<'a, str>>,
{
let comp = comp.into();
let port = port.into();
self.agents.get(&comp as &str).ok_or(result::Error::AgentNotFound(comp.to_string()))
.and_then(|c| {
self.cache.get_schema_output_array(&c.sort, &port as &str)
})
}
pub fn join(self) {
self.sender.send(CompMsg::HaltState).ok().expect("Scheduler join : Cannot send HaltState");
self.th.join().ok().expect("Scheduelr join : Cannot join the thread");
}
}
enum EditCmp {
AddInputArrayElement(String, String, MsgReceiver),
RemoveInputArrayElement(String, String),
AddOutputArrayElement(String, String),
ConnectOutputPort(String, MsgSender),
ConnectOutputArrayPort(String, String, MsgSender),
SetReceiver(String, Receiver<Msg>),
Disconnect(String),
DisconnectArray(String, String),
}
pub enum SyncMsg {
Remove(BoxedComp),
CannotRemove,
}
struct CompState {
comp: Option<BoxedComp>,
name: String,
is_run: bool,
can_run: bool,
edit_msgs: Vec<EditCmp>,
ips: isize,
}
struct SchedState {
sched_sender: Sender<CompMsg>,
agents: HashMap<usize, CompState>,
running: usize,
can_halt: bool,
pool: ThreadPool,
}
impl SchedState {
fn new(s: Sender<CompMsg>) -> Self {
SchedState {
sched_sender: s,
agents: HashMap::new(),
running: 0,
can_halt: false,
pool: ThreadPool::new(8),
}
}
fn inc(&mut self, id: usize) -> Result<()> {
let mut start = false;
if let Some(ref mut comp) = self.agents.get_mut(&id) {
comp.ips += 1;
start = comp.ips > 0 && comp.comp.is_some();
}
if start { self.run(id); }
Ok(())
}
fn dec(&mut self, id: usize) -> Result<()> {
if let Some(ref mut comp) = self.agents.get_mut(&id) {
comp.ips -= 1;
}
Ok(())
}
fn new_agent(&mut self, id: usize, name: String, comp: BoxedComp) -> Result<()> {
self.agents.insert(id, CompState {
comp: Some(comp),
name: name,
is_run: false,
can_run: false,
edit_msgs: vec![],
ips: 0,
});
Ok(())
}
fn remove(&mut self, id: usize, sync_sender: Sender<SyncMsg>) -> Result<()>{
let must_remove = {
let mut o_comp = self.agents.get_mut(&id).expect("SchedState remove : agent doesn't exist");
let b_comp = mem::replace(&mut o_comp.comp, None);
if let Some(boxed_comp) = b_comp {
sync_sender.send(SyncMsg::Remove(boxed_comp)).expect("SchedState remove : cannot send to the channel");
true
} else {
sync_sender.send(SyncMsg::CannotRemove).expect("SchedState remove : cannot send to the channel");
false
}
};
if must_remove { self.agents.remove(&id); }
Ok(())
}
fn start(&mut self, id: usize) -> Result<()> {
let start = {
let mut comp = self.agents.get_mut(&id).expect("SchedState start : agent not found");
comp.can_run = true;
comp.comp.is_some()
};
if start {
self.run(id);
}
Ok(())
}
fn halt(&mut self) -> Result<()> {
self.can_halt = true;
if self.running <= 0 {
self.sched_sender.send(CompMsg::Halt).ok().expect("SchedState RunEnd : Cannot send Halt");
}
Ok(())
}
fn run_end(&mut self, id: usize, mut box_comp: BoxedComp, res: Result<Signal>) -> Result<()>{
let must_restart = {
let mut comp = self.agents.get_mut(&id).expect("SchedState RunEnd : agent doesn't exist");
for msg in comp.edit_msgs.drain(..) {
try!(Self::edit_one_comp(&mut box_comp, msg));
}
let must_restart = comp.ips > 0;
comp.comp = Some(box_comp);
if let Ok(Signal::End) = res {
if comp.is_run {
self.running -= 1;
comp.is_run = false;
}
} else if let Err(e) = res {
println!("{} fails : {}", comp.name, e);
}
must_restart
};
if must_restart {
self.run(id);
} else {
if self.running <= 0 && self.can_halt {
self.sched_sender.send(CompMsg::Halt).ok().expect("SchedState RunEnd : Cannot send Halt");
}
}
Ok(())
}
#[allow(unused_must_use)]
fn run(&mut self, id: usize) {
let mut o_comp = self.agents.get_mut(&id).expect("SchedSate run : agent doesn't exist");
if let Some(mut b_comp) = mem::replace(&mut o_comp.comp, None) {
if !o_comp.is_run {
self.running += 1;
o_comp.is_run = true;
}
let sched_s = self.sched_sender.clone();
self.pool.execute(move || {
let res = b_comp.run();
sched_s.send(CompMsg::RunEnd(id, b_comp, res)).expect("SchedState run : unable to send RunEnd");
});
};
}
fn edit_agent(&mut self, id: usize, msg: EditCmp) -> Result<()> {
let mut comp = self.agents.get_mut(&id).expect("SchedState edit_agent : agent doesn't exist");
if let Some(ref mut c) = comp.comp {
let mut c = c;
try!(Self::edit_one_comp(&mut c, msg));
} else {
comp.edit_msgs.push(msg);
}
Ok(())
}
fn edit_one_comp(mut c: &mut BoxedComp, msg: EditCmp) -> Result<()> {
match msg {
EditCmp::AddInputArrayElement(port, element, recv) => {
c.add_inarr_element(&port, element, recv)?;
},
EditCmp::RemoveInputArrayElement(_port, _element) => {
unimplemented!();
}
EditCmp::AddOutputArrayElement(_port, _element) => {
unimplemented!();
},
EditCmp::ConnectOutputPort(port_out, his) => {
c.connect(&port_out, his)?;
},
EditCmp::ConnectOutputArrayPort(port_out, element_out, his) => {
c.connect_array(&port_out, element_out, his)?;
},
EditCmp::SetReceiver(_port, _hir) => {
unimplemented!();
}
EditCmp::Disconnect(_port) => {
unimplemented!();
},
EditCmp::DisconnectArray(_port, _element) => {
unimplemented!();
},
}
Ok(())
}
}
#[allow(dead_code)]
pub struct AgentLoader {
lib: libloading::Library,
create: extern "C" fn(usize, Sender<CompMsg>) -> Result<(Box<Agent + Send>, HashMap<String, MsgSender>)>,
get_schema_input: extern "C" fn(&str) -> Result<String>,
get_schema_input_array: extern "C" fn(&str) -> Result<String>,
get_schema_output: extern "C" fn(&str) -> Result<String>,
get_schema_output_array: extern "C" fn(&str) -> Result<String>,
}
pub struct AgentCache {
cache: HashMap<String, AgentLoader>,
}
impl AgentCache {
pub fn new() -> Self {
AgentCache {
cache: HashMap::new(),
}
}
pub fn create_comp(&mut self, path: &str, id: usize, sender: Sender<CompMsg>) -> Result<(Box<Agent + Send>, HashMap<String, MsgSender>)> {
if !self.cache.contains_key(path) {
let lib_comp = libloading::Library::new(path).expect("cannot load");
let new_comp: extern fn(usize, Sender<CompMsg>) -> Result<(Box<Agent + Send>, HashMap<String, MsgSender>)> = unsafe {
*(lib_comp.get(b"create_agent\0").expect("cannot find create method"))
};
let get_in : extern fn(&str) -> Result<String> = unsafe {
*(lib_comp.get(b"get_schema_input\0").expect("cannot find get input method"))
};
let get_in_a : extern fn(&str) -> Result<String> = unsafe {
*(lib_comp.get(b"get_schema_input_array\0").expect("cannot find get input method"))
};
let get_out : extern fn(&str) -> Result<String> = unsafe {
*(lib_comp.get(b"get_schema_output\0").expect("cannot find get output method"))
};
let get_out_a : extern fn(&str) -> Result<String> = unsafe {
*(lib_comp.get(b"get_schema_output_array\0").expect("cannot find get output method"))
};
self.cache.insert(path.into(),
AgentLoader {
lib: lib_comp,
create: new_comp,
get_schema_input: get_in,
get_schema_input_array: get_in_a,
get_schema_output: get_out,
get_schema_output_array: get_out_a,
});
}
if let Some(loader) = self.cache.get(path){
(loader.create)(id, sender)
} else {
unreachable!()
}
}
pub fn get_schema_input(&self, comp: &str, port: &str) -> Result<String> {
self.cache.get(comp).ok_or(result::Error::AgentNotFound(comp.into()))
.map(|comp| {
(comp.get_schema_input)(port).expect("cannot get")
})
}
pub fn get_schema_input_array(&self, comp: &str, port: &str) -> Result<String> {
self.cache.get(comp).ok_or(result::Error::AgentNotFound(comp.into()))
.map(|comp| {
(comp.get_schema_input_array)(port).expect("cannot get")
})
}
pub fn get_schema_output(&self, comp: &str, port: &str) -> Result<String> {
self.cache.get(comp).ok_or(result::Error::AgentNotFound(comp.into()))
.map(|comp| {
(comp.get_schema_output)(port).expect("cannot get")
})
}
pub fn get_schema_output_array(&self, comp: &str, port: &str) -> Result<String> {
self.cache.get(comp).ok_or(result::Error::AgentNotFound(comp.into()))
.map(|comp| {
(comp.get_schema_output_array)(port).expect("cannot get")
})
}
}
unsafe impl Send for AgentCache {}