use std::{self, mem};
use futures::prelude::*;
use futures::unsync::{mpsc, oneshot};
use organelle;
use organelle::{Axon, Constraint, Impulse, Organelle, Soma};
use tokio_core::reactor;
use url::Url;
use super::{Error, Result};
use ctrlc_breaker::CtrlcBreakerSoma;
use data::{GameSetup, PlayerSetup};
use launcher::{GamePorts, Launcher, LauncherSoma, LauncherTerminal};
use synapses::{Dendrite, Synapse, Terminal};
#[derive(Debug, Copy, Clone)]
pub enum UpdateScheme {
Realtime,
Interval(u32),
}
pub trait MeleeCompetitor {
type Soma: Soma + 'static;
fn into_soma(self) -> Self::Soma;
}
pub struct MeleeBuilder<
P1: MeleeCompetitor + 'static,
P2: MeleeCompetitor + 'static,
> {
players: (P1, P2),
launcher: Option<Launcher>,
suite: Option<MeleeSuite>,
update_scheme: UpdateScheme,
break_on_ctrlc: bool,
handle: Option<reactor::Handle>,
}
impl<P1, P2> MeleeBuilder<P1, P2>
where
P1: MeleeCompetitor + 'static,
P2: MeleeCompetitor + 'static,
{
pub fn new(player1: P1, player2: P2) -> Self {
Self {
players: (player1, player2),
launcher: None,
suite: None,
update_scheme: UpdateScheme::Realtime,
break_on_ctrlc: false,
handle: None,
}
}
pub fn launcher_settings(self, settings: Launcher) -> Self {
Self {
launcher: Some(settings),
..self
}
}
pub fn one_and_done(self, game: GameSetup) -> Self {
Self {
suite: Some(MeleeSuite::OneAndDone(game)),
..self
}
}
pub fn repeat_forever(self, game: GameSetup) -> Self {
Self {
suite: Some(MeleeSuite::EndlessRepeat(game)),
..self
}
}
pub fn update_scheme(self, scheme: UpdateScheme) -> Self {
Self {
update_scheme: scheme,
..self
}
}
pub fn break_on_ctrlc(self, flag: bool) -> Self {
Self {
break_on_ctrlc: flag,
..self
}
}
pub fn handle(self, handle: reactor::Handle) -> Self {
Self {
handle: Some(handle),
..self
}
}
pub fn create(self) -> Result<Melee>
where
P1::Soma: Soma,
P2::Soma: Soma,
<P1::Soma as Soma>::Synapse: organelle::Synapse,
<P2::Soma as Soma>::Synapse: organelle::Synapse,
<P1::Soma as Soma>::Synapse: From<Synapse> + Into<Synapse>,
<P2::Soma as Soma>::Synapse: From<Synapse> + Into<Synapse>,
<<P1::Soma as Soma>::Synapse as organelle::Synapse>::Terminal: From<Terminal>
+ Into<Terminal>,
<<P2::Soma as Soma>::Synapse as organelle::Synapse>::Terminal: From<Terminal>
+ Into<Terminal>,
<<P1::Soma as Soma>::Synapse as organelle::Synapse>::Dendrite: From<Dendrite>
+ Into<Dendrite>,
<<P2::Soma as Soma>::Synapse as organelle::Synapse>::Dendrite: From<Dendrite>
+ Into<Dendrite>,
{
if self.launcher.is_none() {
bail!("missing launcher settings")
} else if self.suite.is_none() {
bail!("missing melee suite")
} else if self.handle.is_none() {
bail!("missing reactor handle")
}
let handle = self.handle.unwrap();
let mut organelle = Organelle::new(
MeleeSoma::axon(self.suite.unwrap(), self.update_scheme)?,
handle.clone(),
);
let launcher =
organelle.add_soma(LauncherSoma::axon(self.launcher.unwrap())?);
let melee = organelle.nucleus();
let player1 = organelle.add_soma(self.players.0.into_soma());
let player2 = organelle.add_soma(self.players.1.into_soma());
if self.break_on_ctrlc {
organelle.add_soma(CtrlcBreakerSoma::axon());
}
organelle.connect(melee, launcher, Synapse::Launcher)?;
organelle.connect(melee, player1, Synapse::Melee)?;
organelle.connect(melee, player2, Synapse::Melee)?;
Ok(Melee {
handle: handle,
organelle: organelle,
})
}
}
pub struct Melee {
handle: reactor::Handle,
organelle: Organelle<Axon<MeleeSoma>>,
}
impl IntoFuture for Melee {
type Item = ();
type Error = Error;
type Future = Box<Future<Item = Self::Item, Error = Self::Error>>;
#[async(boxed)]
fn into_future(self) -> Result<()> {
await!(self.organelle.run(self.handle))?;
Ok(())
}
}
enum MeleeSuite {
OneAndDone(GameSetup),
EndlessRepeat(GameSetup),
}
pub fn synapse() -> (MeleeTerminal, MeleeDendrite) {
let (tx, rx) = mpsc::channel(1);
(MeleeTerminal::new(tx), MeleeDendrite::new(rx))
}
pub struct MeleeSoma {
suite: Option<MeleeSuite>,
launcher: Option<LauncherTerminal>,
agents: Vec<Option<MeleeTerminal>>,
update_scheme: UpdateScheme,
}
impl MeleeSoma {
fn axon(
suite: MeleeSuite,
update_scheme: UpdateScheme,
) -> Result<Axon<Self>> {
Ok(Axon::new(
Self {
suite: Some(suite),
launcher: None,
agents: vec![],
update_scheme: update_scheme,
},
vec![],
vec![
Constraint::One(Synapse::Launcher),
Constraint::Variadic(Synapse::Melee),
],
))
}
}
impl Soma for MeleeSoma {
type Synapse = Synapse;
type Error = Error;
#[async(boxed)]
fn update(mut self, msg: Impulse<Self::Synapse>) -> Result<Self> {
match msg {
Impulse::AddTerminal(
_,
Synapse::Launcher,
Terminal::Launcher(tx),
) => {
self.launcher = Some(tx);
Ok(self)
},
Impulse::AddTerminal(_, Synapse::Melee, Terminal::Melee(tx)) => {
self.agents.push(Some(tx));
Ok(self)
},
Impulse::Start(_, main_tx, handle) => {
assert!(self.launcher.is_some());
assert!(self.suite.is_some());
if self.agents.len() != 2 {
bail!("expected 2 agents, got {}", self.agents.len())
}
assert!(self.agents[0].is_some() && self.agents[1].is_some());
let main_tx2 = main_tx.clone();
handle.spawn(
run_melee(
mem::replace(&mut self.suite, None).unwrap(),
self.update_scheme,
mem::replace(&mut self.launcher, None).unwrap(),
(
mem::replace(&mut self.agents[0], None).unwrap(),
mem::replace(&mut self.agents[1], None).unwrap(),
),
main_tx2,
).or_else(move |e| {
main_tx
.send(Impulse::Error(e.into()))
.map(|_| ())
.map_err(|_| ())
}),
);
Ok(self)
},
_ => bail!("unexpected impulse"),
}
}
}
#[async]
fn run_melee(
suite: MeleeSuite,
update_scheme: UpdateScheme,
launcher: LauncherTerminal,
agents: (MeleeTerminal, MeleeTerminal),
main_tx: mpsc::Sender<Impulse<Synapse>>,
) -> Result<()> {
let (game, _suite) = match suite {
MeleeSuite::OneAndDone(game) => (game, None),
MeleeSuite::EndlessRepeat(game) => {
let suite = Some(MeleeSuite::EndlessRepeat(game.clone()));
(game, suite)
},
};
let player1 = await!(agents.0.clone().get_player_setup(game.clone()))?;
let player2 = await!(agents.1.clone().get_player_setup(game.clone()))?;
let is_pvp = {
if player1.is_player() && player2.is_computer() {
false
} else if player1.is_computer() && player2.is_player() {
false
} else if player1.is_player() && player2.is_player() {
true
} else {
bail!("invalid player setups")
}
};
if is_pvp {
let instances = {
let launch1 = launcher.clone().launch();
let launch2 = launcher.clone().launch();
await!(launch1.join(launch2))?
};
{
let connect1 = agents.0.clone().connect(instances.0.get_url()?);
let connect2 = agents.1.clone().connect(instances.1.get_url()?);
await!(connect1.join(connect2))?;
}
await!(
agents
.0
.clone()
.create_game(game.clone(), vec![player1, player2])
)?;
let mut ports = await!(launcher.clone().get_game_ports())?;
ports.client_ports.push(instances.0.ports);
ports.client_ports.push(instances.1.ports);
{
let join1 =
agents.0.clone().join_game(player1, Some(ports.clone()));
let join2 = agents.1.clone().join_game(player2, Some(ports));
await!(join1.join(join2))?;
}
{
let run1 = agents.0.clone().run_game(update_scheme);
let run2 = agents.1.clone().run_game(update_scheme);
await!(run1.join(run2))?;
}
await!(
main_tx
.send(Impulse::Stop)
.map(|_| ())
.map_err(|_| Error::from("unable to stop"))
)?;
} else {
let (player, computer) = if player1.is_computer() {
((agents.1, player2), (agents.0, player1))
} else if player2.is_computer() {
((agents.0, player1), (agents.1, player2))
} else {
unreachable!()
};
assert!(player.1.is_player() && computer.1.is_computer());
let instance = await!(launcher.clone().launch())?;
await!(player.0.clone().connect(instance.get_url()?))?;
await!(
player
.0
.clone()
.create_game(game.clone(), vec![player.1, computer.1])
)?;
await!(player.0.clone().join_game(player.1, None))?;
await!(player.0.clone().run_game(update_scheme))?;
await!(
main_tx
.send(Impulse::Stop)
.map(|_| ())
.map_err(|_| Error::from("unable to stop"))
)?;
}
Ok(())
}
#[derive(Debug)]
enum MeleeRequest {
PlayerSetup(GameSetup, oneshot::Sender<PlayerSetup>),
Connect(Url, oneshot::Sender<()>),
CreateGame(GameSetup, Vec<PlayerSetup>, oneshot::Sender<()>),
JoinGame(PlayerSetup, Option<GamePorts>, oneshot::Sender<()>),
RunGame(UpdateScheme, oneshot::Sender<()>),
}
#[derive(Debug, Clone)]
pub struct MeleeTerminal {
tx: mpsc::Sender<MeleeRequest>,
}
pub trait MeleeContract: Sized {
type Error: std::error::Error + Send + Into<Error>;
fn get_player_setup(
self,
game: GameSetup,
) -> Box<Future<Item = (Self, PlayerSetup), Error = Self::Error>>;
fn connect(self, url: Url)
-> Box<Future<Item = Self, Error = Self::Error>>;
fn create_game(
self,
game: GameSetup,
players: Vec<PlayerSetup>,
) -> Box<Future<Item = Self, Error = Self::Error>>;
fn join_game(
self,
setup: PlayerSetup,
ports: Option<GamePorts>,
) -> Box<Future<Item = Self, Error = Self::Error>>;
fn run_game(
self,
update_scheme: UpdateScheme,
) -> Box<Future<Item = Self, Error = Self::Error>>;
}
#[derive(Debug)]
pub struct MeleeDendrite {
rx: mpsc::Receiver<MeleeRequest>,
}
impl MeleeDendrite {
fn new(rx: mpsc::Receiver<MeleeRequest>) -> Self {
Self { rx: rx }
}
#[async]
pub fn wrap<T>(self, mut dendrite: T) -> Result<()>
where
T: MeleeContract + 'static,
{
#[async]
for req in self.rx.map_err(|_| -> Error { unreachable!() }) {
match req {
MeleeRequest::PlayerSetup(game, tx) => {
let result = await!(dendrite.get_player_setup(game))
.map_err(|e| e.into())?;
tx.send(result.1).unwrap();
dendrite = result.0;
},
MeleeRequest::Connect(url, tx) => {
dendrite =
await!(dendrite.connect(url)).map_err(|e| e.into())?;
tx.send(()).unwrap();
},
MeleeRequest::CreateGame(game, players, tx) => {
dendrite = await!(
dendrite
.create_game(game, players)
.map_err(|e| e.into())
)?;
tx.send(()).unwrap();
},
MeleeRequest::JoinGame(setup, ports, tx) => {
dendrite = await!(
dendrite.join_game(setup, ports).map_err(|e| e.into())
)?;
tx.send(()).unwrap();
},
MeleeRequest::RunGame(update_scheme, tx) => {
dendrite = await!(
dendrite.run_game(update_scheme).map_err(|e| e.into())
)?;
tx.send(()).unwrap();
},
}
}
Ok(())
}
}
impl MeleeTerminal {
fn new(tx: mpsc::Sender<MeleeRequest>) -> Self {
Self { tx: tx }
}
#[async]
pub fn get_player_setup(self, game: GameSetup) -> Result<PlayerSetup> {
let (tx, rx) = oneshot::channel();
await!(
self.tx
.send(MeleeRequest::PlayerSetup(game, tx))
.map_err(|_| Error::from("unable to request player setup"))
)?;
await!(rx.map_err(|_| Error::from("unable to receive player setup")))
}
#[async]
pub fn connect(self, url: Url) -> Result<()> {
let (tx, rx) = oneshot::channel();
await!(
self.tx
.send(MeleeRequest::Connect(url, tx))
.map_err(|_| Error::from("unable to request player setup"))
)?;
await!(rx.map_err(|_| Error::from("unable to receive player setup")))
}
#[async]
pub fn create_game(
self,
game: GameSetup,
players: Vec<PlayerSetup>,
) -> Result<()> {
let (tx, rx) = oneshot::channel();
await!(
self.tx
.send(MeleeRequest::CreateGame(game, players, tx))
.map_err(|_| Error::from("unable to create game"))
)?;
await!(rx.map_err(|_| Error::from("unable to create game")))
}
#[async]
pub fn join_game(
self,
setup: PlayerSetup,
ports: Option<GamePorts>,
) -> Result<()> {
let (tx, rx) = oneshot::channel();
await!(
self.tx
.send(MeleeRequest::JoinGame(setup, ports, tx))
.map_err(|_| Error::from("unable to join game"))
)?;
await!(rx.map_err(|_| Error::from("unable to join game")))
}
#[async]
pub fn run_game(self, update_scheme: UpdateScheme) -> Result<()> {
let (tx, rx) = oneshot::channel();
await!(
self.tx
.send(MeleeRequest::RunGame(update_scheme, tx))
.map_err(|_| Error::from("unable to run game"))
)?;
await!(rx.map_err(|_| Error::from("unable to run game")))
}
}