use std::collections::VecDeque;
use std::io;
use std::rc::Rc;
use std::time;
use bytes::{ Buf, BufMut };
use organelle;
use organelle::{ ResultExt, Handle, Protocol, Constraint, Nucleus };
use futures::prelude::*;
use futures::sync::{ oneshot, mpsc };
use protobuf;
use protobuf::{ Message as ProtobufMessage, parse_from_reader };
use sc2_proto::sc2api::{ Request, Response };
use tokio_timer::{ Timer };
use tokio_tungstenite::{ connect_async };
use tungstenite;
use url::Url;
use uuid::Uuid;
use super::{ Result, Error, ErrorKind, Message, Soma, Role, Eukaryote };
pub struct Transactor {
client: Handle,
transaction: Uuid,
kind: ClientMessageKind,
}
impl Transactor {
pub fn send(soma: &Soma, req: ClientRequest) -> Result<Self> {
let transaction = req.transaction;
let kind = req.kind;
let client = soma.req_output(Role::Client)?;
soma.effector()?.send(client, Message::ClientRequest(req));
Ok(
Self {
client: client,
transaction: transaction,
kind: kind,
}
)
}
pub fn expect(self, src: Handle, result: ClientResult)
-> Result<Response>
{
match result {
ClientResult::Success(rsp) => {
if self.client != src {
bail!("unexpected source for client response")
}
if self.transaction != rsp.transaction {
bail!("transaction id mismatch")
}
if self.kind != rsp.kind {
bail!("expected {:?} message, got {:?}", self.kind, rsp.kind)
}
if rsp.response.get_error().len() != 0 {
bail!(
ErrorKind::GameErrors(
rsp.response.get_error().iter()
.map(|e| e.clone())
.collect()
)
)
}
Ok(rsp.response)
},
ClientResult::Timeout(transaction) => {
if self.transaction != transaction {
bail!("transaction id mismatch")
}
else {
bail!("transaction timed out")
}
}
}
}
}
#[derive(PartialEq, Copy, Clone, Debug)]
enum ClientMessageKind {
Unknown,
CreateGame,
JoinGame,
RestartGame,
StartReplay,
LeaveGame,
QuickSave,
QuickLoad,
Quit,
GameInfo,
Observation,
Action,
Step,
Data,
Query,
SaveReplay,
ReplayInfo,
AvailableMaps,
SaveMap,
Ping,
Debug
}
#[derive(Debug)]
pub struct ClientRequest {
transaction: Uuid,
request: Request,
timeout: time::Duration,
kind: ClientMessageKind,
}
impl ClientRequest {
pub fn new(request: Request) -> Self {
Self::with_timeout(request, time::Duration::from_secs(5))
}
pub fn with_timeout(request: Request, timeout: time::Duration)
-> Self
{
let kind = Self::get_kind(&request);
Self {
transaction: Uuid::new_v4(),
request: request,
timeout: timeout,
kind: kind
}
}
fn get_kind(req: &Request) -> ClientMessageKind {
if req.has_create_game() {
ClientMessageKind::CreateGame
}
else if req.has_join_game() {
ClientMessageKind::JoinGame
}
else if req.has_restart_game() {
ClientMessageKind::RestartGame
}
else if req.has_start_replay() {
ClientMessageKind::StartReplay
}
else if req.has_leave_game() {
ClientMessageKind::LeaveGame
}
else if req.has_quick_save() {
ClientMessageKind::QuickSave
}
else if req.has_quick_load() {
ClientMessageKind::QuickLoad
}
else if req.has_quit() {
ClientMessageKind::Quit
}
else if req.has_game_info() {
ClientMessageKind::GameInfo
}
else if req.has_observation() {
ClientMessageKind::Observation
}
else if req.has_action() {
ClientMessageKind::Action
}
else if req.has_step() {
ClientMessageKind::Step
}
else if req.has_data() {
ClientMessageKind::Data
}
else if req.has_query() {
ClientMessageKind::Query
}
else if req.has_save_replay() {
ClientMessageKind::SaveReplay
}
else if req.has_replay_info() {
ClientMessageKind::ReplayInfo
}
else if req.has_available_maps() {
ClientMessageKind::AvailableMaps
}
else if req.has_save_map() {
ClientMessageKind::SaveMap
}
else if req.has_ping() {
ClientMessageKind::Ping
}
else if req.has_debug() {
ClientMessageKind::Debug
}
else {
ClientMessageKind::Unknown
}
}
}
#[derive(Debug)]
pub struct ClientResponse {
transaction: Uuid,
response: Response,
kind: ClientMessageKind,
}
#[derive(Debug)]
pub enum ClientResult {
Success(ClientResponse),
Timeout(Uuid),
}
impl ClientResult {
fn success(transaction: Uuid, response: Response) -> Self {
let kind = Self::get_kind(&response);
ClientResult::Success(
ClientResponse {
transaction: transaction,
response: response,
kind: kind,
}
)
}
fn get_kind(rsp: &Response) -> ClientMessageKind {
if rsp.has_create_game() {
ClientMessageKind::CreateGame
}
else if rsp.has_join_game() {
ClientMessageKind::JoinGame
}
else if rsp.has_restart_game() {
ClientMessageKind::RestartGame
}
else if rsp.has_start_replay() {
ClientMessageKind::StartReplay
}
else if rsp.has_leave_game() {
ClientMessageKind::LeaveGame
}
else if rsp.has_quick_save() {
ClientMessageKind::QuickSave
}
else if rsp.has_quick_load() {
ClientMessageKind::QuickLoad
}
else if rsp.has_quit() {
ClientMessageKind::Quit
}
else if rsp.has_game_info() {
ClientMessageKind::GameInfo
}
else if rsp.has_observation() {
ClientMessageKind::Observation
}
else if rsp.has_action() {
ClientMessageKind::Action
}
else if rsp.has_step() {
ClientMessageKind::Step
}
else if rsp.has_data() {
ClientMessageKind::Data
}
else if rsp.has_query() {
ClientMessageKind::Query
}
else if rsp.has_save_replay() {
ClientMessageKind::SaveReplay
}
else if rsp.has_replay_info() {
ClientMessageKind::ReplayInfo
}
else if rsp.has_available_maps() {
ClientMessageKind::AvailableMaps
}
else if rsp.has_save_map() {
ClientMessageKind::SaveMap
}
else if rsp.has_ping() {
ClientMessageKind::Ping
}
else if rsp.has_debug() {
ClientMessageKind::Debug
}
else {
ClientMessageKind::Unknown
}
}
}
const NUM_RETRIES: u32 = 10;
pub enum ClientCell {
Init(Init),
AwaitInstance(AwaitInstance),
Connect(Connect),
Open(Open),
Disconnect(Disconnect),
}
impl ClientCell {
pub fn new() -> Result<Eukaryote<ClientCell>> {
Ok(
Eukaryote::new(
ClientCell::Init(Init { }),
vec![
Constraint::RequireOne(Role::InstanceProvider),
Constraint::Variadic(Role::Client)
],
vec![ ],
)?
)
}
}
impl Nucleus for ClientCell {
type Message = Message;
type Role = Role;
fn update(self, soma: &Soma, msg: Protocol<Message, Role>)
-> organelle::Result<Self>
{
match self {
ClientCell::Init(state) => state.update(soma, msg),
ClientCell::AwaitInstance(state) => state.update(soma, msg),
ClientCell::Connect(state) => state.update(soma, msg),
ClientCell::Open(state) => state.update(soma, msg),
ClientCell::Disconnect(state) => state.update(soma, msg),
}.chain_err(
|| organelle::ErrorKind::CellError
)
}
}
pub struct Init { }
impl Init {
fn update(self, _: &Soma, msg: Protocol<Message, Role>)
-> Result<ClientCell>
{
match msg {
Protocol::Start => self.start(),
Protocol::Message(_, msg) => {
bail!("unexpected message {:#?}", msg)
},
_ => bail!("unexpected protocol message")
}
}
fn start(self) -> Result<ClientCell> {
AwaitInstance::await()
}
}
pub struct AwaitInstance { }
impl AwaitInstance {
fn await() -> Result<ClientCell> {
Ok(ClientCell::AwaitInstance(AwaitInstance { }))
}
fn reset(soma: &Soma) -> Result<ClientCell> {
for c in soma.var_input(Role::Client)? {
soma.effector()?.send(*c, Message::ClientClosed);
}
Self::await()
}
fn reset_error(soma: &Soma, e: Rc<Error>) -> Result<ClientCell> {
for c in soma.var_input(Role::Client)? {
soma.effector()?.send_in_order(
*c,
vec![
Message::ClientError(Rc::clone(&e)),
Message::ClientClosed
]
);
}
Self::await()
}
fn update(self, soma: &Soma, msg: Protocol<Message, Role>)
-> Result<ClientCell>
{
match msg {
Protocol::Message(
src, Message::ProvideInstance(instance, url)
) => {
self.assign_instance(soma, src, instance, url)
},
Protocol::Message(_, msg) => {
bail!("unexpected message {:#?}", msg)
},
_ => bail!("unexpected protocol message")
}
}
fn assign_instance(self, soma: &Soma, src: Handle, _: Uuid, url: Url)
-> Result<ClientCell>
{
assert_eq!(src, soma.req_input(Role::InstanceProvider)?);
Connect::connect(soma, url)
}
}
pub struct Connect {
timer: Timer,
retries: u32,
}
impl Connect {
fn connect(soma: &Soma, url: Url) -> Result<ClientCell> {
let this_cell = soma.effector()?.this_cell();
soma.effector()?.send(
this_cell, Message::ClientAttemptConnect(url)
);
Ok(
ClientCell::Connect(
Connect {
timer: Timer::default(),
retries: NUM_RETRIES,
}
)
)
}
fn update(self, soma: &Soma, msg: Protocol<Message, Role>)
-> Result<ClientCell>
{
match msg {
Protocol::Message(src, Message::ClientAttemptConnect(url)) => {
self.attempt_connect(soma, src, url)
},
Protocol::Message(src, Message::ClientConnected(sender)) => {
self.on_connected(soma, src, sender)
},
Protocol::Message(_, msg) => {
bail!("unexpected message {:#?}", msg)
},
_ => bail!("unexpected protocol message")
}
}
fn attempt_connect(mut self, soma: &Soma, src: Handle, url: Url)
-> Result<ClientCell>
{
assert_eq!(src, soma.effector()?.this_cell());
let connected_effector = soma.effector()?.clone();
let retry_effector = soma.effector()?.clone();
let timer_effector = soma.effector()?.clone();
let client_remote = soma.effector()?.remote();
if self.retries == 0 {
bail!("unable to connect to instance")
}
else {
println!(
"attempting to connect to instance {} - retries {}",
url,
self.retries
);
self.retries -= 1;
}
let retry_url = url.clone();
soma.effector()?.spawn(
self.timer.sleep(time::Duration::from_secs(5))
.and_then(move |_| connect_async(url, client_remote)
.and_then(move |(ws_stream, _)| {
let this_cell = connected_effector.this_cell();
let (send_tx, send_rx) = mpsc::channel(10);
let (sink, stream) = ws_stream.split();
connected_effector.spawn(
sink.send_all(
send_rx.map_err(
|_| tungstenite::Error::Io(
io::ErrorKind::BrokenPipe
.into()
)
)
)
.then(|_| Ok(()))
);
let recv_eff = connected_effector.clone();
let close_eff = connected_effector.clone();
let error_eff = connected_effector.clone();
connected_effector.spawn(
stream.for_each(move |msg| {
recv_eff.send(
this_cell, Message::ClientReceive(msg)
);
Ok(())
})
.and_then(move |_| {
close_eff.send(
this_cell, Message::ClientClosed
);
Ok(())
})
.or_else(move |e| {
error_eff.send(
this_cell,
Message::ClientError(
Rc::from(
Error::with_chain(
e,
ErrorKind::ClientRecvFailed
)
)
)
);
Ok(())
})
);
connected_effector.send(
this_cell,
Message::ClientConnected(send_tx)
);
Ok(())
})
.or_else(move |_| {
let this_cell = retry_effector.this_cell();
retry_effector.send(
this_cell,
Message::ClientAttemptConnect(retry_url)
);
Ok(())
})
)
.or_else(move |e| {
timer_effector.error(
organelle::Error::with_chain(
e, organelle::ErrorKind::CellError
)
);
Ok(())
})
);
Ok(ClientCell::Connect(self))
}
fn on_connected(
self,
soma: &Soma,
src: Handle,
sender: mpsc::Sender<tungstenite::Message>
)
-> Result<ClientCell>
{
assert_eq!(src, soma.effector()?.this_cell());
Open::open(soma, sender, self.timer)
}
}
pub struct Open {
sender: mpsc::Sender<tungstenite::Message>,
timer: Timer,
transactions: VecDeque<
(Uuid, Handle, oneshot::Sender<()>)
>,
}
impl Open {
fn open(
soma: &Soma, sender: mpsc::Sender<tungstenite::Message>, timer: Timer
)
-> Result<ClientCell>
{
for c in soma.var_input(Role::Client)? {
soma.effector()?.send(*c, Message::Ready);
}
Ok(
ClientCell::Open(
Open {
sender: sender,
timer: timer,
transactions: VecDeque::new(),
}
)
)
}
fn update(self, soma: &Soma, msg: Protocol<Message, Role>)
-> Result<ClientCell>
{
match msg {
Protocol::Message(src, Message::ClientRequest(req)) => {
self.send(soma, src, req)
},
Protocol::Message(src, Message::ClientReceive(msg)) => {
self.recv(soma, src, msg)
},
Protocol::Message(
src, Message::ClientTimeout(transaction)
) => {
self.on_timeout(soma, src, transaction)
},
Protocol::Message(_, Message::ClientDisconnect) => {
Disconnect::disconnect()
},
Protocol::Message(src, Message::ClientClosed) => {
self.on_close(soma, src)
},
Protocol::Message(src, Message::ClientError(e)) => {
self.on_error(soma, src, e)
},
Protocol::Message(_, msg) => {
bail!("unexpected message {:#?}", msg)
},
_ => bail!("unexpected protocol message")
}
}
fn send(mut self, soma: &Soma, src: Handle, req: ClientRequest)
-> Result<ClientCell>
{
let buf = Vec::new();
let mut writer = buf.writer();
let (tx, rx) = oneshot::channel();
let transaction = req.transaction;
self.transactions.push_back((transaction, src, tx));
{
let mut cos = protobuf::CodedOutputStream::new(&mut writer);
req.request.write_to(&mut cos)?;
cos.flush()?;
}
let timeout_effector = soma.effector()?.clone();
soma.effector()?.spawn(
self.timer.timeout(
self.sender.clone().send(
tungstenite::Message::Binary(writer.into_inner())
)
.map_err(|_| ())
.and_then(|_| rx.map_err(|_| ())),
req.timeout
)
.and_then(|_| Ok(()))
.or_else(move |_| {
let this_cell = timeout_effector.this_cell();
timeout_effector.send(
this_cell, Message::ClientTimeout(transaction)
);
Ok(())
})
);
Ok(ClientCell::Open(self))
}
fn recv(mut self, soma: &Soma, src: Handle, msg: tungstenite::Message)
-> Result<ClientCell>
{
assert_eq!(src, soma.effector()?.this_cell());
let rsp = match msg {
tungstenite::Message::Binary(buf) => {
let cursor = io::Cursor::new(buf);
parse_from_reader::<Response>(&mut cursor.reader())?
}
_ => bail!("unexpected non-binary message"),
};
let (transaction, dest, tx) = match self.transactions.pop_front() {
Some(transaction) => transaction,
None => bail!("no pending transactions for this response"),
};
if let Err(_) = tx.send(()) {
}
soma.effector()?.send(
dest,
Message::ClientResult(ClientResult::success(transaction, rsp))
);
Ok(ClientCell::Open(self))
}
fn on_timeout(mut self, soma: &Soma, src: Handle, transaction: Uuid)
-> Result<ClientCell>
{
assert_eq!(src, soma.effector()?.this_cell());
if let Some(i) = self.transactions.iter()
.position(|&(ref t, _, _)| *t == transaction)
{
let dest = self.transactions[i].1;
self.transactions.remove(i);
soma.effector()?.send(
dest, Message::ClientTimeout(transaction)
);
}
Ok(ClientCell::Open(self))
}
fn on_close(self, soma: &Soma, src: Handle) -> Result<ClientCell> {
assert_eq!(src, soma.effector()?.this_cell());
AwaitInstance::reset(soma)
}
fn on_error(self, soma: &Soma, src: Handle, e: Rc<Error>) -> Result<ClientCell> {
assert_eq!(src, soma.effector()?.this_cell());
AwaitInstance::reset_error(soma, e)
}
}
pub struct Disconnect { }
impl Disconnect {
fn disconnect() -> Result<ClientCell> {
Ok(ClientCell::Disconnect(Disconnect { }))
}
fn update(self, soma: &Soma, msg: Protocol<Message, Role>)
-> Result<ClientCell>
{
match msg {
Protocol::Message(_, Message::ClientClosed) => {
AwaitInstance::reset(soma)
},
Protocol::Message(_, Message::ClientError(e)) => {
AwaitInstance::reset_error(soma, e)
},
Protocol::Message(_, msg) => {
bail!("unexpected msg {:#?}", msg)
},
_ => bail!("unexpected protocol message")
}
}
}