#![warn(missing_debug_implementations, missing_docs)]
#[macro_use]
extern crate bitflags;
extern crate byteorder;
extern crate futures;
extern crate libc;
#[macro_use]
extern crate log;
#[macro_use]
extern crate serde;
#[macro_use]
extern crate serde_derive;
extern crate rmp;
extern crate rmp_serde as rmps;
extern crate rmpv;
extern crate tokio_core;
extern crate tokio_io;
use std::borrow::Cow;
use std::collections::{HashMap, VecDeque};
use std::error;
use std::fmt::{self, Debug, Display, Formatter};
use std::io::{self, Cursor, ErrorKind, Read, Write};
use std::net::SocketAddr;
use std::ptr;
use std::sync::{Arc, Mutex};
use futures::{Async, Future, Poll, Stream};
use futures::stream::Fuse;
use futures::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use futures::sync::oneshot;
use tokio_core::net::TcpStream;
pub use tokio_core::reactor::Core;
use tokio_core::reactor::Handle;
use tokio_io::io::Window;
use rmpv::decode::read_value_ref;
use Async::*;
pub mod dispatch;
mod frame;
pub mod hpack;
pub mod logging;
mod net;
pub mod protocol;
mod resolve;
mod response;
mod request;
pub mod service;
mod sys;
use net::connect;
use self::frame::Frame;
use self::hpack::RawHeader;
pub use self::response::Response;
pub use self::resolve::{EventGraph, FixedResolver, GraphNode, Resolve, ResolveInfo, Resolver};
pub use self::request::Request;
pub use self::service::ServiceBuilder;
use self::sys::{PollWrite, SendAll};
const FRAME_LENGTH: u32 = 4;
const EMPTY_SLICE: &[u8] = &[0; 0];
pub trait Dispatch: Send {
fn process(self: Box<Self>, response: &Response) -> Option<Box<Dispatch>>;
fn discard(self: Box<Self>, err: &Error);
}
fn flatten_err<T, E>(result: Result<Result<T, Error>, E>) -> Result<T, Error>
where E: Into<Error>
{
match result {
Ok(Ok(v)) => Ok(v),
Ok(Err(e)) => Err(e),
Err(e) => Err(e.into()),
}
}
struct Call {
request: Request,
dispatch: Box<Dispatch>,
complete: oneshot::Sender<Result<u64, Error>>,
}
impl Debug for Call {
fn fmt(&self, fmt: &mut fmt::Formatter) -> Result<(), fmt::Error> {
fmt.debug_struct("Call")
.field("request", &self.request)
.finish()
}
}
impl Into<MultiplexEvent> for Call {
fn into(self) -> MultiplexEvent {
MultiplexEvent::Call(self)
}
}
struct Mute {
request: Request,
complete: oneshot::Sender<Result<u64, Error>>,
}
impl Debug for Mute {
fn fmt(&self, fmt: &mut fmt::Formatter) -> Result<(), fmt::Error> {
fmt.debug_struct("Mute")
.field("request", &self.request)
.finish()
}
}
impl Into<MultiplexEvent> for Mute {
fn into(self) -> MultiplexEvent {
MultiplexEvent::Mute(self)
}
}
struct Push {
id: u64,
request: Request,
complete: oneshot::Sender<Result<(), Error>>,
}
impl Debug for Push {
fn fmt(&self, fmt: &mut fmt::Formatter) -> Result<(), fmt::Error> {
fmt.debug_struct("Push")
.field("id", &self.id)
.field("request", &self.request)
.finish()
}
}
impl Into<MultiplexEvent> for Push {
fn into(self) -> MultiplexEvent {
MultiplexEvent::Push(self)
}
}
#[derive(Debug)]
enum MultiplexEvent {
Call(Call),
Mute(Mute),
Push(Push),
}
struct MessageBuf {
head: Window<[u8; 32]>,
data: Window<Vec<u8>>,
}
impl MessageBuf {
fn new(id: u64, request: Request) -> Result<Self, io::Error> {
let mut head = [0; 32];
let head_len = MessageBuf::encode_head(&mut head[..], id, request.ty())?;
let mut head = Window::new(head);
head.set_end(head_len);
let (mut data, headers) = request.into_components();
MessageBuf::encode_headers(&mut data, headers)?;
let mbuf = MessageBuf {
head: head,
data: Window::new(data),
};
Ok(mbuf)
}
fn encode_head(head: &mut [u8], id: u64, ty: u64) -> Result<usize, io::Error> {
let mut cur = Cursor::new(&mut head[..]);
rmp::encode::write_array_len(&mut cur, FRAME_LENGTH)?;
rmp::encode::write_uint(&mut cur, id)?;
rmp::encode::write_uint(&mut cur, ty)?;
Ok(cur.position() as usize)
}
fn encode_headers<W>(wr: &mut W, headers: Vec<RawHeader>) -> Result<(), io::Error>
where W: Write
{
rmp::encode::write_array_len(wr, headers.len() as u32)?;
for header in headers {
rmp::encode::write_array_len(wr, 3)?;
rmp::encode::write_bool(wr, false)?;
rmp::encode::write_bin(wr, &header.name[..])?;
rmp::encode::write_bin(wr, &header.data[..])?;
}
Ok(())
}
fn remaining(&self) -> usize {
self.head.as_ref().len() + self.data.as_ref().len()
}
fn advance(&mut self, num: usize) {
let mut num = num;
if num < self.head.as_ref().len() {
let from = self.head.start();
self.head.set_start(from + num);
} else {
num -= self.head.as_ref().len();
self.head.set_start(0);
self.head.set_end(0);
let from = self.data.start();
self.data.set_start(from + num);
}
}
}
enum Notify {
Call(u64, oneshot::Sender<Result<u64, Error>>),
Push(oneshot::Sender<Result<(), Error>>),
}
impl Notify {
fn complete(self, val: Result<(), io::Error>) {
match self {
Notify::Call(id, tx) => drop(tx.send(val.and(Ok(id)).map_err(Error::Io))),
Notify::Push(tx) => drop(tx.send(val.map_err(Error::Io))),
}
}
}
struct Message {
mbuf: MessageBuf,
notify: Notify,
}
impl Message {
fn remaining(&self) -> usize {
self.mbuf.remaining()
}
fn advance(&mut self, n: usize) {
self.mbuf.advance(n)
}
fn complete(self, val: Result<(), io::Error>) {
self.notify.complete(val)
}
}
bitflags! {
flags Shutdown: u8 {
const CLOSE_SEND = 0b0001,
const CLOSE_RECV = 0b0010,
const CLOSE_USER = 0b0100,
}
}
#[derive(Debug)]
enum MultiplexError {
Io(io::Error),
InvalidProtocol(io::Error),
InvalidFraming(frame::Error),
InvalidDataFraming(String),
}
impl MultiplexError {
fn clone(&self) -> Self {
match *self {
MultiplexError::Io(ref err) => {
MultiplexError::Io(io::Error::new(err.kind(), error::Error::description(err)))
}
MultiplexError::InvalidProtocol(ref err) => {
MultiplexError::InvalidProtocol(io::Error::new(err.kind(), error::Error::description(err)))
}
MultiplexError::InvalidFraming(ref err) => {
MultiplexError::InvalidFraming(*err)
}
MultiplexError::InvalidDataFraming(ref err) => {
MultiplexError::InvalidDataFraming(err.clone())
}
}
}
}
impl From<io::Error> for MultiplexError {
fn from(err: io::Error) -> Self {
MultiplexError::Io(err)
}
}
impl From<rmpv::decode::Error> for MultiplexError {
fn from(err: rmpv::decode::Error) -> Self {
MultiplexError::InvalidProtocol(err.into())
}
}
impl From<frame::Error> for MultiplexError {
fn from(err: frame::Error) -> Self {
MultiplexError::InvalidFraming(err)
}
}
#[must_use = "futures do nothing unless polled"]
struct Multiplex<T> {
id: u64,
sock: T,
peer: SocketAddr,
state: Shutdown,
pending: VecDeque<Message>,
dispatches: HashMap<u64, Box<Dispatch>>,
ring: Vec<u8>,
rd_offset: usize,
rx_offset: usize,
}
impl<T> Drop for Multiplex<T> {
fn drop(&mut self) {
info!("dropped multiplex with connection to {}", self.peer);
}
}
const IOVEC_MAX: usize = 64;
fn unexpected_eof() -> io::Error {
ErrorKind::UnexpectedEof.into()
}
impl<T: Read + Write + SendAll + PollWrite> Multiplex<T> {
pub fn new(sock: T, peer: SocketAddr) -> Self {
Multiplex {
id: 0,
sock: sock,
peer: peer,
state: Shutdown::empty(),
pending: VecDeque::new(),
dispatches: HashMap::new(),
ring: vec![0; 4096],
rd_offset: 0,
rx_offset: 0,
}
}
fn add_event(&mut self, event: MultiplexEvent) {
match event {
MultiplexEvent::Call(Call { request, dispatch, complete }) => {
self.invoke(request, complete);
self.dispatches.insert(self.id, dispatch);
}
MultiplexEvent::Mute(Mute { request, complete }) => {
self.invoke(request, complete);
}
MultiplexEvent::Push(Push { id, request, complete }) => {
self.push(id, request, || Notify::Push(complete))
}
}
}
fn invoke(&mut self, request: Request, complete: oneshot::Sender<Result<u64, Error>>) {
self.id += 1;
let id = self.id;
self.push(id, request, || Notify::Call(id, complete));
}
fn push<F>(&mut self, id: u64, request: Request, f: F)
where F: FnOnce() -> Notify
{
let mbuf = MessageBuf::new(id, request).expect("failed to pack frame header");
let message = Message {
mbuf: mbuf,
notify: f(),
};
self.pending.push_back(message);
}
fn send_all(&mut self) -> Result<usize, io::Error> {
let mut size = 0;
let mut bufs = [EMPTY_SLICE; IOVEC_MAX];
for (idx, message) in self.pending.iter().enumerate().take(IOVEC_MAX / 2) {
size += 2;
bufs[idx * 2] = &message.mbuf.head.as_ref()[..];
bufs[idx * 2 + 1] = &message.mbuf.data.as_ref()[..];
}
self.sock.send_all(&bufs[..size])
}
fn poll_send(&mut self) -> Poll<(), MultiplexError> {
if self.pending.is_empty() && self.state.contains(CLOSE_RECV) {
return Ok(Ready(()));
}
loop {
if self.pending.is_empty() {
break;
}
debug!("sending {} pending buffer(s) of total {} byte(s) ...",
self.pending.len(),
self.pending.iter().fold(0, |s, x| s + x.remaining()));
match self.send_all() {
Ok(mut nlen) => {
debug!("sent {} bytes", nlen);
while nlen > 0 {
let bytes_left = self.pending.front().unwrap().remaining();
if bytes_left > nlen {
self.pending.front_mut().unwrap().advance(nlen);
break;
}
nlen -= bytes_left;
self.pending.pop_front().unwrap().complete(Ok(()));
}
}
Err(ref err) if err.kind() == ErrorKind::WouldBlock => {
break;
}
Err(err) => {
error!("failed to send bytes: {}", err);
for message in self.pending.drain(..) {
message.complete(Err(io::Error::last_os_error()));
}
self.state |= CLOSE_SEND;
return Err(err.into());
}
}
}
Ok(NotReady)
}
fn poll_recv(&mut self) -> Poll<(), MultiplexError> {
loop {
match self.sock.read(&mut self.ring[self.rd_offset..]) {
Ok(0) => {
self.state |= CLOSE_RECV;
if self.dispatches.is_empty() {
debug!("EOF");
} else {
warn!("EOF while there are {} pending dispatch(es)", self.dispatches.len());
for (.., dispatch) in self.dispatches.drain() {
dispatch.discard(&Error::Io(unexpected_eof()));
}
return Err(unexpected_eof().into());
}
return Ok(Ready(()));
}
Ok(nread) => {
self.rd_offset += nread;
debug!("read {} bytes; Ring {{ rx: {}, rd: {}, len: {} }}", nread, self.rx_offset, self.rd_offset, self.ring.len());
loop {
let mut rdbuf = Cursor::new(&self.ring[self.rx_offset..self.rd_offset]);
match read_value_ref(&mut rdbuf) {
Ok(raw) => {
let frame = Frame::new(&raw)?;
debug!("-> {}", frame);
let id = frame.id();
let ty = frame.ty();
let args = frame.args();
let response = Response::new(ty, args, frame.meta())
.map_err(|err| MultiplexError::InvalidDataFraming(format!("{}", err)))?;
match self.dispatches.remove(&id) {
Some(dispatch) => {
match dispatch.process(&response) {
Some(dispatch) => {
self.dispatches.insert(id, dispatch);
}
None => {
debug!("revoked channel {}", id);
}
}
}
None => {
warn!("dropped unexpected value");
}
}
self.rx_offset += rdbuf.position() as usize;
}
Err(ref err) if err.kind() == ErrorKind::UnexpectedEof => {
debug!("failed to decode frame - insufficient bytes");
break;
}
Err(err) => {
error!("failed to decode value from the read buffer: {:?}", err);
return Err(err.into());
}
}
}
let pending = self.rd_offset - self.rx_offset;
if self.rx_offset != 0 {
unsafe {
ptr::copy(
self.ring.as_ptr().offset(self.rx_offset as isize),
self.ring.as_mut_ptr(),
pending
);
}
self.rd_offset = pending;
self.rx_offset = 0;
debug!("compactified the ring");
}
let len = self.ring.len();
if pending * 2 >= len {
self.ring.resize(len * 2, 0);
debug!("resized rdbuf to {}", self.ring.len());
}
}
Err(ref err) if err.kind() == ErrorKind::WouldBlock => {
break;
}
Err(err) => {
error!("failed to read from the socket: {:?}", err);
self.state |= CLOSE_RECV;
let e = io::Error::last_os_error().into();
for (.., dispatch) in self.dispatches.drain() {
dispatch.discard(&e);
}
return Err(err.into());
}
}
}
Ok(NotReady)
}
}
impl<T: Read + Write + SendAll + PollWrite> Future for Multiplex<T> {
type Item = ();
type Error = MultiplexError;
fn poll(&mut self) -> Poll<(), Self::Error> {
if !self.state.contains(CLOSE_SEND) {
match self.poll_send() {
Ok(Ready(())) => return Ok(Ready(())),
Ok(NotReady) => {}
Err(err) => return Err(err), }
}
if !self.state.contains(CLOSE_RECV) {
match self.poll_recv() {
Ok(Ready(())) => return Ok(Ready(())),
Ok(NotReady) => {}
Err(err) => {
for (.., dispatch) in self.dispatches.drain() {
dispatch.discard(&err.clone().into());
}
return Err(err);
}
}
}
if self.state.contains(CLOSE_USER) && self.pending.is_empty() && self.dispatches.is_empty() {
Ok(Ready(()))
} else {
Ok(NotReady)
}
}
}
pub struct Sender {
id: u64,
tx: UnboundedSender<Event>,
}
impl Sender {
fn new(id: u64, tx: UnboundedSender<Event>) -> Self {
Self { id, tx }
}
pub fn send(&self, request: Request) -> impl Future<Item = (), Error = Error> {
let (tx, rx) = oneshot::channel();
let event = Push {
id: self.id,
request: request,
complete: tx,
};
self.tx.unbounded_send(Event::Push(event)).unwrap();
rx.then(flatten_err)
}
}
impl Debug for Sender {
fn fmt(&self, fmt: &mut fmt::Formatter) -> Result<(), fmt::Error> {
fmt.debug_struct("Sender").field("id", &self.id).finish()
}
}
#[derive(Debug)]
pub enum Error {
Io(io::Error),
InvalidProtocol(io::Error),
InvalidFraming(frame::Error),
InvalidDataFraming(String),
Service(protocol::Error),
Canceled,
}
impl Error {
fn clone(&self) -> Self {
match *self {
Error::Io(ref err) => {
Error::Io(io::Error::new(err.kind(), error::Error::description(err)))
}
Error::InvalidProtocol(ref err) => {
Error::InvalidProtocol(io::Error::new(err.kind(), error::Error::description(err)))
}
Error::InvalidFraming(ref err) => {
Error::InvalidFraming(*err)
}
Error::InvalidDataFraming(ref err) => Error::InvalidDataFraming(err.clone()),
Error::Service(ref err) => Error::Service(err.clone()),
Error::Canceled => Error::Canceled,
}
}
}
impl From<io::Error> for Error {
fn from(err: io::Error) -> Error {
Error::Io(err)
}
}
impl From<frame::Error> for Error {
fn from(err: frame::Error) -> Error {
Error::InvalidFraming(err)
}
}
impl From<MultiplexError> for Error {
fn from(err: MultiplexError) -> Error {
match err {
MultiplexError::Io(err) => Error::Io(err),
MultiplexError::InvalidProtocol(err) => Error::InvalidProtocol(err),
MultiplexError::InvalidFraming(err) => Error::InvalidFraming(err),
MultiplexError::InvalidDataFraming(err) => Error::InvalidDataFraming(err),
}
}
}
impl From<oneshot::Canceled> for Error {
fn from(err: oneshot::Canceled) -> Self {
match err {
oneshot::Canceled => Error::Canceled
}
}
}
impl Display for Error {
fn fmt(&self, fmt: &mut Formatter) -> Result<(), fmt::Error> {
match *self {
Error::Io(ref err) |
Error::InvalidProtocol(ref err) => Display::fmt(&err, fmt),
Error::InvalidFraming(ref err) => Display::fmt(&err, fmt),
Error::InvalidDataFraming(ref err) => Display::fmt(&err, fmt),
Error::Service(ref err) => Display::fmt(&err, fmt),
Error::Canceled => write!(fmt, "canceled"),
}
}
}
impl error::Error for Error {
fn description(&self) -> &str {
match *self {
Error::Io(..) => "operation has been aborted due to I/O error",
Error::InvalidProtocol(..) => "transport protocol error",
Error::InvalidFraming(..) => "invalid framing",
Error::InvalidDataFraming(..) =>
"failed to unpack data frame into the expected type",
Error::Service(..) => "service error",
Error::Canceled => "operation has been canceled",
}
}
fn cause(&self) -> Option<&error::Error> {
match *self {
Error::Io(ref err) |
Error::InvalidProtocol(ref err) => Some(err),
Error::InvalidFraming(ref err) => Some(err),
Error::InvalidDataFraming(..) |
Error::Service(..) |
Error::Canceled => None,
}
}
}
impl serde::de::Error for Error {
fn custom<T: Display>(msg: T) -> Self {
Error::InvalidDataFraming(format!("{}", msg))
}
}
enum State<R: Resolve> {
Disconnected,
Resolving(R::Future),
Connecting(Box<Future<Item=TcpStream, Error=io::Error>>),
Running(Multiplex<TcpStream>),
}
impl<R: Resolve> Debug for State<R> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> Result<(), fmt::Error> {
match *self {
State::Disconnected => write!(fmt, "State::Disconnected"),
State::Resolving(..) => write!(fmt, "State::Resolving"),
State::Connecting(..) => write!(fmt, "State::Connecting"),
State::Running(..) => write!(fmt, "State::Running"),
}
}
}
impl<R: Resolve> Display for State<R> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> Result<(), fmt::Error> {
match *self {
State::Disconnected => write!(fmt, "disconnected"),
State::Resolving(..) => write!(fmt, "resolving"),
State::Connecting(..) => write!(fmt, "connecting"),
State::Running(..) => write!(fmt, "running"),
}
}
}
enum Event {
Connect(oneshot::Sender<Result<(), Error>>),
Disconnect,
Call(Call),
Mute(Mute),
Push(Push),
}
impl Debug for Event {
fn fmt(&self, fmt: &mut fmt::Formatter) -> Result<(), fmt::Error> {
match *self {
Event::Connect(..) => {
fmt.debug_struct("Event::Connect")
.finish()
}
Event::Disconnect => fmt.debug_struct("Event::Disconnect").finish(),
Event::Call(Call { ref request, .. }) => {
fmt.debug_struct("Event::Call")
.field("request", &request)
.finish()
}
Event::Mute(Mute { ref request, .. }) => {
fmt.debug_struct("Event::Mute")
.field("request", &request)
.finish()
}
Event::Push(Push { id, ref request, .. }) => {
fmt.debug_struct("Event::Push")
.field("id", &id)
.field("request", &request)
.finish()
}
}
}
}
#[must_use = "futures do nothing unless polled"]
struct Supervisor<R: Resolve> {
name: Cow<'static, str>,
shared: Arc<Mutex<SharedState>>,
resolver: R,
state: Option<State<R>>,
rx: Fuse<UnboundedReceiver<Event>>,
handle: Handle,
concerns: VecDeque<oneshot::Sender<Result<(), Error>>>,
events: VecDeque<MultiplexEvent>,
}
impl<R: Resolve> Supervisor<R> {
fn spawn(name: Cow<'static, str>, shared: Arc<Mutex<SharedState>>, resolver: R, handle: &Handle) -> UnboundedSender<Event>
where R: 'static
{
let (tx, rx) = mpsc::unbounded();
let v = Supervisor {
name: name,
shared: shared,
resolver: resolver,
state: Some(State::Disconnected),
rx: rx.fuse(),
handle: handle.clone(),
concerns: VecDeque::new(),
events: VecDeque::new(),
};
handle.spawn(v.map_err(|err| warn!("stopped supervisor task: {:?}", err)));
tx
}
#[inline]
fn push_event<E: Into<MultiplexEvent>>(&mut self, event: E) {
self.events.push_back(event.into());
debug!("pushed event into the queue, pending: {}", self.events.len());
}
fn disconnect(&mut self) {
*self.shared.lock().unwrap() = Default::default();
self.state = Some(State::Disconnected);
}
}
impl<R: Resolve> Future for Supervisor<R> {
type Item = ();
type Error = MultiplexError;
fn poll(&mut self) -> Poll<(), Self::Error> {
debug!("poll supervisor, state: {} [id={:p}]", self.state.as_ref().unwrap(), self);
match self.state.take().expect("failed to extract internal state") {
State::Disconnected => {
match self.rx.poll() {
Ok(Ready(Some(event))) => {
match event {
Event::Connect(tx) => {
self.concerns.push_back(tx);
}
Event::Disconnect => {
return self.poll();
}
Event::Call(event) => {
self.push_event(event);
}
Event::Mute(event) => {
self.push_event(event);
}
Event::Push(event) => {
self.push_event(event);
}
}
self.state = Some(State::Resolving(self.resolver.resolve(&self.name)));
debug!("switched state from `disconnected` to `resolving`");
return self.poll();
}
Ok(Ready(None)) | Err(()) => {
info!("service state machine has been terminated");
return Ok(Ready(()));
}
Ok(NotReady) => {
self.state = Some(State::Disconnected);
}
}
}
State::Resolving(mut future) => {
loop {
match self.rx.poll() {
Ok(Ready(Some(event))) => {
match event {
Event::Connect(tx) => {
self.concerns.push_back(tx);
}
Event::Disconnect => {}
Event::Call(event) => {
self.push_event(event);
}
Event::Mute(event) => {
self.push_event(event);
}
Event::Push(event) => {
self.push_event(event);
}
}
}
Ok(..) | Err(()) => {
break;
}
}
}
match future.poll() {
Ok(Ready(info)) => {
info!("successfully resolved `{}` service", self.name);
let ResolveInfo { addrs, methods, .. } = info;
self.shared.lock().unwrap().methods = Some(methods);
self.state = Some(State::Connecting(Box::new(connect(addrs, &self.handle))));
return self.poll();
}
Ok(NotReady) => {
self.state = Some(State::Resolving(future));
}
Err(err) => {
error!("failed to resolve `{}` service: {}", self.name, err);
for concern in self.concerns.drain(..) {
drop(concern.send(Err(err.clone())));
}
for event in self.events.drain(..) {
match event {
MultiplexEvent::Call(Call { dispatch, complete, .. }) => {
dispatch.discard(&err);
drop(complete.send(Err(err.clone())));
}
MultiplexEvent::Mute(..) |
MultiplexEvent::Push(..) => {}
}
}
self.state = Some(State::Disconnected);
}
}
}
State::Connecting(mut future) => {
loop {
match self.rx.poll() {
Ok(Ready(Some(event))) => {
match event {
Event::Connect(tx) => {
self.concerns.push_back(tx);
}
Event::Disconnect => {}
Event::Call(event) => {
self.push_event(event);
}
Event::Mute(event) => {
self.push_event(event);
}
Event::Push(event) => {
self.push_event(event);
}
}
}
Ok(..) | Err(()) => {
break;
}
}
}
match future.poll() {
Ok(Ready(sock)) => {
let peer = sock.peer_addr()?;
sock.set_nodelay(true)?;
let local_addr = sock.local_addr()?;
info!("successfully connected to {}", peer);
for concern in self.concerns.drain(..) {
drop(concern.send(Ok(())));
}
let mut mx = Multiplex::new(sock, peer);
for event in self.events.drain(..) {
mx.add_event(event);
}
self.shared.lock().unwrap().peer_addr = Some(peer);
self.shared.lock().unwrap().local_addr = Some(local_addr);
self.state = Some(State::Running(mx));
return self.poll();
}
Ok(NotReady) => {
debug!("connection - in progress");
self.state = Some(State::Connecting(future));
}
Err(err) => {
error!("failed to connect to `{}` service: {}", self.name, err);
let err = Error::Io(err);
for concern in self.concerns.drain(..) {
drop(concern.send(Err(err.clone())));
}
for event in self.events.drain(..) {
match event {
MultiplexEvent::Call(Call { dispatch, .. }) => {
dispatch.discard(&err);
}
MultiplexEvent::Mute(..) |
MultiplexEvent::Push(..) => {}
}
}
self.state = Some(State::Disconnected);
}
}
}
State::Running(mut future) => {
loop {
match self.rx.poll() {
Ok(Ready(Some(event))) => {
match event {
Event::Connect(tx) => {
drop(tx.send(Ok(())));
}
Event::Disconnect => {
self.disconnect();
break;
}
Event::Call(event) => {
future.add_event(event.into());
}
Event::Mute(event) => {
future.add_event(event.into());
}
Event::Push(event) => {
future.add_event(event.into());
}
}
}
Ok(NotReady) => {
break;
}
Ok(Ready(None)) | Err(()) => {
if future.pending.len() > 0 || future.dispatches.len() > 0 {
debug!("detached supervisor with {} messages and {} dispatches",
future.pending.len(), future.dispatches.len());
future.state |= CLOSE_USER;
self.handle.spawn(future.map_err(|_err| ()));
}
return Ok(Ready(()));
}
}
}
match future.poll() {
Ok(Ready(())) => {
self.disconnect();
}
Ok(NotReady) => {
self.state = Some(State::Running(future));
debug!("running - not ready");
}
Err(..) => {
self.disconnect();
}
}
}
}
Ok(NotReady)
}
}
#[derive(Default)]
struct SharedState {
peer_addr: Option<SocketAddr>,
local_addr: Option<SocketAddr>,
methods: Option<HashMap<u64, EventGraph>>,
}
#[derive(Clone)]
pub struct Service {
name: Cow<'static, str>,
shared: Arc<Mutex<SharedState>>,
tx: UnboundedSender<Event>,
}
impl Service {
pub fn new<N>(name: N, handle: &Handle) -> Self
where
N: Into<Cow<'static, str>>
{
ServiceBuilder::new(name).build(handle)
}
pub fn name(&self) -> &str {
&self.name
}
pub fn connect(&self) -> impl Future<Item=(), Error=Error> {
let (tx, rx) = oneshot::channel();
self.tx.unbounded_send(Event::Connect(tx)).unwrap();
rx.then(flatten_err)
}
pub fn methods(&self) -> Option<HashMap<u64, EventGraph>> {
self.shared.lock().unwrap().methods.clone()
}
pub fn disconnect(&self) {
self.tx.unbounded_send(Event::Disconnect).expect("communication channel must live");
}
pub fn peer_addr(&self) -> Result<SocketAddr, io::Error> {
self.shared.lock().unwrap().peer_addr.ok_or_else(|| ErrorKind::NotConnected.into())
}
pub fn local_addr(&self) -> Result<SocketAddr, io::Error> {
self.shared.lock().unwrap().local_addr.ok_or_else(|| ErrorKind::NotConnected.into())
}
pub fn call<D>(&self, request: Request, dispatch: D) -> impl Future<Item=Sender, Error=Error>
where
D: Dispatch + 'static
{
let (tx, rx) = oneshot::channel();
let event = Call {
request: request,
dispatch: Box::new(dispatch),
complete: tx,
};
self.tx.unbounded_send(Event::Call(event)).unwrap();
let tx = self.tx.clone();
rx.then(flatten_err).and_then(|id| Ok(Sender::new(id, tx)))
}
pub fn call_mute(&self, request: Request) -> impl Future<Item=Sender, Error=Error> {
let (tx, rx) = oneshot::channel();
let event = Mute {
request: request,
complete: tx,
};
self.tx.unbounded_send(Event::Mute(event)).unwrap();
let tx = self.tx.clone();
rx.then(flatten_err).and_then(|id| Ok(Sender::new(id, tx)))
}
}
impl Debug for Service {
fn fmt(&self, fmt: &mut Formatter) -> Result<(), fmt::Error> {
fmt.debug_struct("Service")
.field("name", &self.name)
.finish()
}
}
fn _assert_kinds() {
fn _assert_send<T: Send>() {}
fn _assert_sync<T: Sync>() {}
fn _assert_clone<T: Clone>() {}
_assert_send::<Service>();
_assert_sync::<Service>();
_assert_clone::<Service>();
}