use std::cell::UnsafeCell;
use std::io;
use std::marker::PhantomData;
use std::sync::Arc;
use bytes::BytesMut;
use futures::{Async, Poll};
use tokio_io::codec::Encoder;
use tokio_io::AsyncWrite;
use actor::{Actor, ActorContext, AsyncContext, Running, SpawnHandle};
use fut::ActorFuture;
#[allow(unused_variables)]
pub trait WriteHandler<E>
where
Self: Actor,
Self::Context: ActorContext,
{
fn error(&mut self, err: E, ctx: &mut Self::Context) -> Running {
Running::Stop
}
fn finished(&mut self, ctx: &mut Self::Context) {
ctx.stop()
}
}
bitflags! {
struct Flags: u8 {
const CLOSING = 0b0000_0001;
const CLOSED = 0b0000_0010;
}
}
const LOW_WATERMARK: usize = 4 * 1024;
const HIGH_WATERMARK: usize = 4 * LOW_WATERMARK;
pub struct Writer<T: AsyncWrite, E: From<io::Error>> {
inner: UnsafeWriter<T, E>,
}
struct UnsafeWriter<T: AsyncWrite, E: From<io::Error>>(
Arc<UnsafeCell<InnerWriter<T, E>>>,
);
impl<T: AsyncWrite, E: From<io::Error>> Clone for UnsafeWriter<T, E> {
fn clone(&self) -> Self {
UnsafeWriter(self.0.clone())
}
}
struct InnerWriter<T: AsyncWrite, E: From<io::Error>> {
flags: Flags,
io: T,
buffer: BytesMut,
error: Option<E>,
low: usize,
high: usize,
handle: SpawnHandle,
}
impl<T: AsyncWrite, E: From<io::Error> + 'static> Writer<T, E> {
pub fn new<A, C>(io: T, ctx: &mut C) -> Writer<T, E>
where
A: Actor<Context = C> + WriteHandler<E>,
C: AsyncContext<A>,
T: 'static,
{
let inner = UnsafeWriter(Arc::new(UnsafeCell::new(InnerWriter {
io,
flags: Flags::empty(),
buffer: BytesMut::new(),
error: None,
low: LOW_WATERMARK,
high: HIGH_WATERMARK,
handle: SpawnHandle::default(),
})));
let h = ctx.spawn(WriterFut {
inner: inner.clone(),
act: PhantomData,
});
let mut writer = Writer { inner };
writer.as_mut().handle = h;
writer
}
#[inline]
fn as_ref(&self) -> &InnerWriter<T, E> {
unsafe { &*self.inner.0.get() }
}
#[inline]
fn as_mut(&mut self) -> &mut InnerWriter<T, E> {
unsafe { &mut *self.inner.0.get() }
}
pub fn close(&mut self) {
self.as_mut().flags.insert(Flags::CLOSING);
}
pub fn closed(&self) -> bool {
self.as_ref().flags.contains(Flags::CLOSED)
}
pub fn set_buffer_capacity(&mut self, low_watermark: usize, high_watermark: usize) {
self.as_mut().low = low_watermark;
self.as_mut().high = high_watermark;
}
pub fn write(&mut self, msg: &[u8]) {
let inner = self.as_mut();
inner.buffer.extend_from_slice(msg);
}
pub fn handle(&self) -> SpawnHandle {
self.as_ref().handle
}
}
struct WriterFut<T, E, A>
where
T: AsyncWrite,
E: From<io::Error>,
{
act: PhantomData<A>,
inner: UnsafeWriter<T, E>,
}
impl<T: 'static, E: 'static, A> ActorFuture for WriterFut<T, E, A>
where
T: AsyncWrite,
E: From<io::Error>,
A: Actor + WriteHandler<E>,
A::Context: AsyncContext<A>,
{
type Item = ();
type Error = ();
type Actor = A;
fn poll(
&mut self, act: &mut A, ctx: &mut A::Context,
) -> Poll<Self::Item, Self::Error> {
let inner = unsafe { &mut *self.inner.0.get() };
if let Some(err) = inner.error.take() {
if act.error(err, ctx) == Running::Stop {
act.finished(ctx);
return Ok(Async::Ready(()));
}
}
while !inner.buffer.is_empty() {
match inner.io.write(&inner.buffer) {
Ok(n) => {
if n == 0
&& act.error(
io::Error::new(
io::ErrorKind::WriteZero,
"failed to write frame to transport",
).into(),
ctx,
) == Running::Stop
{
act.finished(ctx);
return Ok(Async::Ready(()));
}
let _ = inner.buffer.split_to(n);
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
if inner.buffer.len() > inner.high {
ctx.wait(WriterDrain {
inner: self.inner.clone(),
act: PhantomData,
});
}
return Ok(Async::NotReady);
}
Err(e) => if act.error(e.into(), ctx) == Running::Stop {
act.finished(ctx);
return Ok(Async::Ready(()));
},
}
}
match inner.io.flush() {
Ok(_) => (),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
return Ok(Async::NotReady)
}
Err(e) => if act.error(e.into(), ctx) == Running::Stop {
act.finished(ctx);
return Ok(Async::Ready(()));
},
}
if inner.flags.contains(Flags::CLOSING) {
inner.flags |= Flags::CLOSED;
act.finished(ctx);
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
}
}
}
struct WriterDrain<T, E, A>
where
T: AsyncWrite,
E: From<io::Error>,
{
act: PhantomData<A>,
inner: UnsafeWriter<T, E>,
}
impl<T, E, A> ActorFuture for WriterDrain<T, E, A>
where
T: AsyncWrite,
E: From<io::Error>,
A: Actor,
A::Context: AsyncContext<A>,
{
type Item = ();
type Error = ();
type Actor = A;
fn poll(&mut self, _: &mut A, _: &mut A::Context) -> Poll<Self::Item, Self::Error> {
let inner = unsafe { &mut *self.inner.0.get() };
if inner.error.is_some() {
return Ok(Async::Ready(()));
}
while !inner.buffer.is_empty() {
match inner.io.write(&inner.buffer) {
Ok(n) => {
if n == 0 {
inner.error = Some(
io::Error::new(
io::ErrorKind::WriteZero,
"failed to write frame to transport",
).into(),
);
return Err(());
}
let _ = inner.buffer.split_to(n);
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
return if inner.buffer.len() < inner.low {
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
};
}
Err(e) => {
inner.error = Some(e.into());
return Err(());
}
}
}
Ok(Async::Ready(()))
}
}
pub struct FramedWrite<T: AsyncWrite, U: Encoder> {
enc: U,
inner: UnsafeWriter<T, U::Error>,
}
impl<T: AsyncWrite, U: Encoder> FramedWrite<T, U> {
pub fn new<A, C>(io: T, enc: U, ctx: &mut C) -> FramedWrite<T, U>
where
A: Actor<Context = C> + WriteHandler<U::Error>,
C: AsyncContext<A>,
U::Error: 'static,
T: 'static,
{
let inner = UnsafeWriter(Arc::new(UnsafeCell::new(InnerWriter {
io,
flags: Flags::empty(),
buffer: BytesMut::new(),
error: None,
low: LOW_WATERMARK,
high: HIGH_WATERMARK,
handle: SpawnHandle::default(),
})));
let h = ctx.spawn(WriterFut {
inner: inner.clone(),
act: PhantomData,
});
let mut writer = FramedWrite { enc, inner };
writer.as_mut().handle = h;
writer
}
pub fn from_buffer<A, C>(
io: T, enc: U, buffer: BytesMut, ctx: &mut C,
) -> FramedWrite<T, U>
where
A: Actor<Context = C> + WriteHandler<U::Error>,
C: AsyncContext<A>,
U::Error: 'static,
T: 'static,
{
let inner = UnsafeWriter(Arc::new(UnsafeCell::new(InnerWriter {
io,
buffer,
flags: Flags::empty(),
error: None,
low: LOW_WATERMARK,
high: HIGH_WATERMARK,
handle: SpawnHandle::default(),
})));
let h = ctx.spawn(WriterFut {
inner: inner.clone(),
act: PhantomData,
});
let mut writer = FramedWrite { enc, inner };
writer.as_mut().handle = h;
writer
}
#[inline]
fn as_ref(&self) -> &InnerWriter<T, U::Error> {
unsafe { &*self.inner.0.get() }
}
#[inline]
fn as_mut(&mut self) -> &mut InnerWriter<T, U::Error> {
unsafe { &mut *self.inner.0.get() }
}
pub fn close(&mut self) {
self.as_mut().flags.insert(Flags::CLOSING);
}
pub fn closed(&self) -> bool {
self.as_ref().flags.contains(Flags::CLOSED)
}
pub fn set_buffer_capacity(&mut self, low: usize, high: usize) {
self.as_mut().low = low;
self.as_mut().high = high;
}
pub fn write(&mut self, item: U::Item) {
let inner: &mut InnerWriter<T, U::Error> =
unsafe { &mut *(self.as_mut() as *mut _) };
let _ = self.enc.encode(item, &mut inner.buffer).map_err(|e| {
inner.error = Some(e);
});
}
pub fn handle(&self) -> SpawnHandle {
self.as_ref().handle
}
}