use std::mem;
use futures::prelude::*;
use futures::unsync::{mpsc, oneshot};
use organelle::{Axon, Constraint, Impulse, Soma};
use sc2_proto::sc2api;
use super::{Error, IntoProto, Result};
use client::ClientTerminal;
use data::{Action, DebugCommand};
use synapses::{Dendrite, Synapse, Terminal};
pub struct ActionSoma {
control: Option<ActionControlDendrite>,
client: Option<ClientTerminal>,
users: Vec<ActionDendrite>,
}
impl ActionSoma {
pub fn axon() -> Result<Axon<Self>> {
Ok(Axon::new(
Self {
control: None,
client: None,
users: vec![],
},
vec![
Constraint::Variadic(Synapse::Action),
Constraint::One(Synapse::ActionControl),
],
vec![Constraint::One(Synapse::Client)],
))
}
}
pub fn synapse() -> (ActionTerminal, ActionDendrite) {
let (tx, rx) = mpsc::channel(10);
(ActionTerminal { tx: tx }, ActionDendrite { rx: rx })
}
pub fn control_synapse() -> (ActionControlTerminal, ActionControlDendrite) {
let (tx, rx) = mpsc::channel(1);
(
ActionControlTerminal { tx: tx },
ActionControlDendrite { rx: rx },
)
}
impl Soma for ActionSoma {
type Synapse = Synapse;
type Error = Error;
#[async(boxed)]
fn update(mut self, imp: Impulse<Self::Synapse>) -> Result<Self> {
match imp {
Impulse::AddTerminal(_, Synapse::Client, Terminal::Client(tx)) => {
Ok(Self {
client: Some(tx),
..self
})
},
Impulse::AddDendrite(_, Synapse::Action, Dendrite::Action(rx)) => {
self.users.push(rx);
Ok(self)
},
Impulse::AddDendrite(
_,
Synapse::ActionControl,
Dendrite::ActionControl(rx),
) => Ok(Self {
control: Some(rx),
..self
}),
Impulse::Start(_, main_tx, handle) => {
let (tx, rx) = mpsc::channel(10);
for user in self.users {
handle.spawn(
tx.clone()
.send_all(user.rx.map_err(|_| unreachable!()))
.map(|_| ())
.map_err(|_| ()),
);
}
let task = ActionTask::new(
self.client.unwrap(),
self.control.unwrap(),
rx,
);
handle.spawn(task.run().or_else(move |e| {
main_tx
.send(Impulse::Error(e.into()))
.map(|_| ())
.map_err(|_| ())
}));
Ok(Self {
client: None,
control: None,
users: vec![],
})
},
_ => bail!("unexpected impulse"),
}
}
}
struct ActionTask {
client: ClientTerminal,
control: Option<ActionControlDendrite>,
queue: Option<mpsc::Receiver<ActionRequest>>,
action_batch: Vec<Action>,
debug_batch: Vec<DebugCommand>,
}
impl ActionTask {
fn new(
client: ClientTerminal,
control: ActionControlDendrite,
rx: mpsc::Receiver<ActionRequest>,
) -> Self {
Self {
client: client,
control: Some(control),
queue: Some(rx),
action_batch: vec![],
debug_batch: vec![],
}
}
#[async]
fn run(mut self) -> Result<()> {
let requests = mem::replace(&mut self.queue, None).unwrap();
let queue = mem::replace(&mut self.control, None)
.unwrap()
.rx
.map(|req| Either::Control(req))
.select(requests.map(|req| Either::Request(req)));
#[async]
for req in queue.map_err(|_| -> Error { unreachable!() }) {
match req {
Either::Control(ActionControlRequest::Step(tx)) => {
self = await!(self.send_actions())?;
self = await!(self.send_debug())?;
tx.send(()).map_err(|_| Error::from("unable to ack step"))?;
},
Either::Request(ActionRequest::SendAction(action, tx)) => {
self.action_batch.push(action);
tx.send(()).map_err(|_| {
Error::from("unable to ack send command")
})?;
},
Either::Request(ActionRequest::SendDebug(cmd, tx)) => {
self.debug_batch.push(cmd);
tx.send(())
.map_err(|_| Error::from("unable to ack send debug"))?;
},
}
}
Ok(())
}
#[async]
fn send_actions(self) -> Result<Self> {
let mut req = sc2api::Request::new();
req.mut_action().mut_actions();
for action in self.action_batch {
req.mut_action().mut_actions().push(action.into_proto()?);
}
await!(self.client.clone().request(req))?;
Ok(Self {
action_batch: vec![],
..self
})
}
#[async]
fn send_debug(self) -> Result<Self> {
let mut req = sc2api::Request::new();
req.mut_debug().mut_debug();
for cmd in self.debug_batch {
req.mut_debug().mut_debug().push(cmd.into_proto()?);
}
await!(self.client.clone().request(req))?;
Ok(Self {
debug_batch: vec![],
..self
})
}
}
#[derive(Debug)]
enum ActionControlRequest {
Step(oneshot::Sender<()>),
}
#[derive(Debug)]
enum ActionRequest {
SendAction(Action, oneshot::Sender<()>),
SendDebug(DebugCommand, oneshot::Sender<()>),
}
#[derive(Debug)]
enum Either {
Control(ActionControlRequest),
Request(ActionRequest),
}
#[derive(Debug, Clone)]
pub struct ActionControlTerminal {
tx: mpsc::Sender<ActionControlRequest>,
}
impl ActionControlTerminal {
#[async]
pub fn step(self) -> Result<()> {
let (tx, rx) = oneshot::channel();
await!(
self.tx
.send(ActionControlRequest::Step(tx))
.map(|_| ())
.map_err(|_| Error::from("unable to send debug command"))
)?;
await!(rx.map_err(|_| Error::from("unable to send debug ack")))
}
}
#[derive(Debug)]
pub struct ActionControlDendrite {
rx: mpsc::Receiver<ActionControlRequest>,
}
#[derive(Debug, Clone)]
pub struct ActionTerminal {
tx: mpsc::Sender<ActionRequest>,
}
impl ActionTerminal {
#[async]
pub fn send_action(self, action: Action) -> Result<()> {
let (tx, rx) = oneshot::channel();
await!(
self.tx
.send(ActionRequest::SendAction(action, tx))
.map(|_| ())
.map_err(|_| Error::from("unable to send command"))
)?;
await!(rx.map_err(|_| Error::from("unable to recv send command ack")))
}
#[async]
pub fn send_debug<T>(self, cmd: T) -> Result<()>
where
T: Into<DebugCommand> + 'static,
{
let (tx, rx) = oneshot::channel();
await!(
self.tx
.send(ActionRequest::SendDebug(cmd.into(), tx))
.map(|_| ())
.map_err(|_| Error::from("unable to send debug command"))
)?;
await!(rx.map_err(|_| Error::from("unable to send debug ack")))
}
}
#[derive(Debug)]
pub struct ActionDendrite {
rx: mpsc::Receiver<ActionRequest>,
}