use super::desync::*;
use futures::*;
use futures::future::{BoxFuture};
use futures::stream::{Stream};
use futures::task;
use futures::task::{Poll, Context};
use std::sync::*;
use std::pin::{Pin};
use std::collections::VecDeque;
lazy_static! {
static ref REFERENCE_CHUTE: Desync<()> = Desync::new(());
}
const PIPE_BACKPRESSURE_COUNT: usize = 5;
struct PipeWaker<Core, PollFn>
where
Core: Send + Unpin,
{
context: Mutex<Option<Arc<PipeContext<Core, PollFn>>>>
}
struct PipeContext<Core, PollFn>
where
Core: Send + Unpin
{
target: Weak<Desync<Core>>,
poll_fn: Arc<Mutex<Option<PollFn>>>
}
impl<Core, PollFn> PipeContext<Core, PollFn>
where
Core: 'static + Send + Unpin,
PollFn: 'static + Send + for<'a> FnMut(&'a mut Core, task::Waker) -> BoxFuture<'a, bool>
{
fn new(target: &Arc<Desync<Core>>, poll_fn: PollFn) -> Arc<PipeContext<Core, PollFn>> {
let context = PipeContext {
target: Arc::downgrade(target),
poll_fn: Arc::new(Mutex::new(Some(poll_fn)))
};
Arc::new(context)
}
fn poll(arc_self: Arc<Self>) {
if let Some(target) = arc_self.target.upgrade() {
let maybe_poll_fn = Arc::clone(&arc_self.poll_fn);
target.future_desync(move |core| {
async move {
let waker = PipeWaker { context: Mutex::new(Some(Arc::clone(&arc_self))) };
let waker = Arc::new(waker);
let waker = task::waker(waker);
let future_poll = {
let mut maybe_poll_fn = maybe_poll_fn.lock().unwrap();
let future_poll = maybe_poll_fn.as_mut().map(move |poll_fn| {
(poll_fn)(core, waker.clone())
});
future_poll
};
if let Some(future_poll) = future_poll {
let keep_polling = future_poll.await;
if !keep_polling {
(*arc_self.poll_fn.lock().unwrap()) = None;
}
}
}
}.boxed()).detach();
} else {
let old_poll_fn = arc_self.poll_fn.lock().unwrap().take();
REFERENCE_CHUTE.desync(move |_| {
use std::mem;
mem::drop(old_poll_fn);
});
}
}
}
impl<Core, PollFn> task::ArcWake for PipeWaker<Core, PollFn>
where
Core: 'static + Send + Unpin,
PollFn: 'static + Send + for<'a> FnMut(&'a mut Core, task::Waker) -> BoxFuture<'a, bool>,
{
fn wake_by_ref(arc_self: &Arc<Self>) {
let context = arc_self.context.lock().unwrap().take();
if let Some(context) = context {
PipeContext::poll(context)
}
}
}
pub fn pipe_in<Core, S, ProcessFn>(desync: Arc<Desync<Core>>, stream: S, process: ProcessFn)
where
Core: 'static + Send + Unpin,
S: 'static + Send + Unpin + Stream,
S::Item: Send,
ProcessFn: 'static + Send + for<'a> FnMut(&'a mut Core, S::Item) -> BoxFuture<'a, ()>,
{
let stream = Arc::new(Mutex::new(stream));
let process = Arc::new(Mutex::new(process));
let context = PipeContext::new(&desync, move |core, desync_waker| {
let process = Arc::clone(&process);
let stream = Arc::clone(&stream);
async move {
loop {
let next = {
let mut desync_context = Context::from_waker(&desync_waker);
stream.lock().unwrap().poll_next_unpin(&mut desync_context)
};
match next {
Poll::Pending => return true,
Poll::Ready(None) => return false,
Poll::Ready(Some(next)) => {
let process_future = (&mut *process.lock().unwrap())(core, next);
process_future.await;
}
}
}
}.boxed()
});
PipeContext::poll(context);
desync.sync(|_| { });
}
pub fn pipe<Core, S, Output, ProcessFn>(desync: Arc<Desync<Core>>, stream: S, process: ProcessFn) -> PipeStream<Output>
where
Core: 'static + Send + Unpin,
S: 'static + Send + Unpin + Stream,
S::Item: Send,
Output: 'static + Send,
ProcessFn: 'static + Send + for <'a> FnMut(&'a mut Core, S::Item) -> BoxFuture<'a, Output>,
{
let input_stream = Arc::new(Mutex::new(stream));
let process = Arc::new(Mutex::new(process));
let mut output_desync = Some(Arc::clone(&desync));
let output_stream = PipeStream::<Output>::new(move || {
output_desync.take();
});
let stream_core = Arc::clone(&output_stream.core);
let stream_core = Arc::downgrade(&stream_core);
let context = PipeContext::new(&desync, move |core, desync_waker| {
let stream_core = stream_core.upgrade();
let input_stream = Arc::clone(&input_stream);
let process = Arc::clone(&process);
async move {
if let Some(stream_core) = stream_core {
let is_closed = {
let mut stream_core = stream_core.lock().unwrap();
if stream_core.pending.len() >= stream_core.max_pipe_depth {
stream_core.backpressure_release_notify = Some(desync_waker.clone());
return true;
}
stream_core.closed
};
if is_closed {
let notify = { stream_core.lock().unwrap().notify.take() };
notify.map(|notify| notify.wake());
return false;
}
loop {
stream_core.lock().unwrap().notify_stream_closed = None;
let next = {
let mut desync_context = Context::from_waker(&desync_waker);
input_stream.lock().unwrap().poll_next_unpin(&mut desync_context)
};
match next {
Poll::Pending => {
stream_core.lock().unwrap().notify_stream_closed = Some(desync_waker.clone());
return true
},
Poll::Ready(None) => {
let notify = {
let mut stream_core = stream_core.lock().unwrap();
stream_core.closed = true;
stream_core.notify.take()
};
notify.map(|notify| notify.wake());
return false;
},
Poll::Ready(Some(next)) => {
let next_item = (&mut *process.lock().unwrap())(core, next);
let next_item = next_item.await;
let notify = {
let mut stream_core = stream_core.lock().unwrap();
stream_core.pending.push_back(next_item);
stream_core.notify.take()
};
notify.map(|notify| notify.wake());
}
}
}
} else {
return false;
}
}.boxed()
});
PipeContext::poll(context);
desync.sync(|_| { });
output_stream
}
struct PipeStreamCore<Item> {
max_pipe_depth: usize,
pending: VecDeque<Item>,
closed: bool,
notify: Option<task::Waker>,
notify_stream_closed: Option<task::Waker>,
backpressure_release_notify: Option<task::Waker>
}
pub struct PipeStream<Item> {
core: Arc<Mutex<PipeStreamCore<Item>>>,
on_drop: Option<Box<dyn Send+Sync+FnMut() -> ()>>
}
impl<Item> PipeStream<Item> {
fn new<FnOnDrop: 'static+Send+Sync+FnMut() -> ()>(on_drop: FnOnDrop) -> PipeStream<Item> {
PipeStream {
core: Arc::new(Mutex::new(PipeStreamCore {
max_pipe_depth: PIPE_BACKPRESSURE_COUNT,
pending: VecDeque::new(),
closed: false,
notify: None,
notify_stream_closed: None,
backpressure_release_notify: None
})),
on_drop: Some(Box::new(on_drop))
}
}
pub fn set_backpressure_depth(&mut self, max_depth: usize) {
self.core.lock().unwrap().max_pipe_depth = max_depth;
}
}
impl<Item> Drop for PipeStream<Item> {
fn drop(&mut self) {
let mut core = self.core.lock().unwrap();
core.pending = VecDeque::new();
core.closed = true;
core.notify_stream_closed.take().map(|notify_stream_closed| notify_stream_closed.wake());
self.on_drop.take().map(|mut on_drop| {
REFERENCE_CHUTE.desync(move |_| {
(on_drop)()
})
});
}
}
impl<Item> Stream for PipeStream<Item> {
type Item = Item;
fn poll_next(self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Item>> {
let (result, notify) = {
let mut core = self.core.lock().unwrap();
if let Some(item) = core.pending.pop_front() {
let notify_backpressure = core.backpressure_release_notify.take();
(Poll::Ready(Some(item)), notify_backpressure)
} else if core.closed {
(Poll::Ready(None), None)
} else {
let notify_backpressure = core.backpressure_release_notify.take();
core.notify = Some(context.waker().clone());
(Poll::Pending, notify_backpressure)
}
};
notify.map(|notify| notify.wake());
result
}
}