use std::collections::vec_deque::VecDeque;
use std::io;
use std::marker::Unpin;
use std::pin::Pin;
use std::sync::Arc;
use futures::channel::mpsc;
use futures::lock::Mutex;
use futures::prelude::*;
use futures::task::Context;
use futures::task::Poll;
use futures_codec::Framed;
use crate::framing::control::TncControlFraming;
use crate::protocol::response::{CommandResult, Event, Response};
pub fn controlstream<I>(
control: I,
) -> (
ControlStreamEvents<I>,
ControlStreamResults<I>,
ControlSink<I>,
)
where
I: AsyncRead + AsyncWrite + Unpin,
{
let (evt_tx, evt_rx) = mpsc::unbounded();
let (res_tx, res_rx) = mpsc::unbounded();
let state = ControlState {
io: Framed::new(control, TncControlFraming::new()),
dest_events: evt_tx,
dest_results: res_tx,
fused: false,
};
let stateref = Arc::new(Mutex::new(state));
let ctrl = ControlStreamEvents {
state: stateref.clone(),
events: evt_rx,
};
let results = ControlStreamResults {
state: stateref.clone(),
results: res_rx,
};
let sink = ControlSink {
state: stateref,
outqueue: VecDeque::with_capacity(32),
};
(ctrl, results, sink)
}
struct ControlState<I>
where
I: AsyncRead + AsyncWrite + Unpin,
{
io: Framed<I, TncControlFraming>,
dest_events: mpsc::UnboundedSender<Event>,
dest_results: mpsc::UnboundedSender<CommandResult>,
fused: bool,
}
pub struct ControlStreamEvents<I>
where
I: AsyncRead + AsyncWrite + Unpin,
{
state: Arc<Mutex<ControlState<I>>>,
events: mpsc::UnboundedReceiver<Event>,
}
pub struct ControlStreamResults<I>
where
I: AsyncRead + AsyncWrite + Unpin,
{
state: Arc<Mutex<ControlState<I>>>,
results: mpsc::UnboundedReceiver<CommandResult>,
}
pub struct ControlSink<I>
where
I: AsyncRead + AsyncWrite + Unpin,
{
state: Arc<Mutex<ControlState<I>>>,
outqueue: VecDeque<String>,
}
impl<I> Unpin for ControlStreamEvents<I> where I: AsyncRead + AsyncWrite + Unpin {}
impl<I> Unpin for ControlStreamResults<I> where I: AsyncRead + AsyncWrite + Unpin {}
impl<I> Unpin for ControlSink<I> where I: AsyncRead + AsyncWrite + Unpin {}
impl<I> Stream for ControlStreamEvents<I>
where
I: AsyncRead + AsyncWrite + Unpin,
{
type Item = Event;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = &mut (*self);
loop {
if let Ok(Some(out)) = this.events.try_next() {
return Poll::Ready(Some(out));
}
let mut state = ready!(Pin::new(&mut this.state.lock()).poll(cx));
match ready!(Pin::new(&mut *state).poll_next(cx)) {
None => return Poll::Ready(None),
Some(()) => continue,
}
}
}
}
impl<I> Stream for ControlStreamResults<I>
where
I: AsyncRead + AsyncWrite + Unpin,
{
type Item = CommandResult;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = &mut (*self);
loop {
if let Ok(Some(out)) = this.results.try_next() {
return Poll::Ready(Some(out));
}
let mut state = ready!(Pin::new(&mut this.state.lock()).poll(cx));
match ready!(Pin::new(&mut *state).poll_next(cx)) {
None => return Poll::Ready(None),
Some(()) => continue,
}
}
}
}
impl<I> Stream for ControlState<I>
where
I: AsyncRead + AsyncWrite + Unpin,
{
type Item = ();
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if (*self).fused {
return Poll::Ready(None);
}
match ready!(Pin::new(&mut (*self).io).poll_next(cx)) {
Some(Ok(Response::CommandResult(res))) => {
match (*self).dest_results.unbounded_send(res) {
Ok(_ok) => Poll::Ready(Some(())),
Err(_e) => {
(*self).fused = true;
Poll::Ready(None)
}
}
}
Some(Ok(Response::Event(evt))) => match (*self).dest_events.unbounded_send(evt) {
Ok(_ok) => Poll::Ready(Some(())),
Err(_e) => {
(*self).fused = true;
Poll::Ready(None)
}
},
_ => {
(*self).fused = true;
Poll::Ready(None)
}
}
}
}
impl<I> Sink<String> for ControlSink<I>
where
I: AsyncRead + AsyncWrite + Unpin,
{
type Error = io::Error;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let mut state = ready!(Pin::new(&mut (*self).state.lock()).poll(cx));
Pin::new(&mut state.io).poll_ready(cx)
}
fn start_send(mut self: Pin<&mut Self>, item: String) -> Result<(), Self::Error> {
Ok((*self).outqueue.push_back(item))
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
let this = &mut (*self);
let state_mutex = &mut this.state;
let outqueue = &mut this.outqueue;
let mut state = ready!(Pin::new(&mut state_mutex.lock()).poll(cx));
while !outqueue.is_empty() {
ready!(Pin::new(&mut state.io).poll_ready(cx))?;
Pin::new(&mut state.io).start_send(outqueue.pop_front().unwrap())?;
}
Pin::new(&mut state.io).poll_flush(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
let mut state = ready!(Pin::new(&mut (*self).state.lock()).poll(cx));
Pin::new(&mut state.io).poll_close(cx)
}
}
#[cfg(test)]
mod test {
use super::*;
use futures::executor;
use futures::io::Cursor;
use crate::protocol::constants::CommandID;
#[test]
fn test_streams() {
let curs = Cursor::new(b"MYCALL\rBUSY TRUE\rARQBW\r".to_vec());
let (mut evt, mut res, _out) = controlstream(curs);
executor::block_on(async {
let e1 = evt.next().await.unwrap();
assert_eq!(Event::BUSY(true), e1);
let r1 = res.next().await.unwrap();
assert_eq!(Ok((CommandID::MYCALL, None)), r1);
let r2 = res.next().await.unwrap();
assert_eq!(Ok((CommandID::ARQBW, None)), r2);
assert!(res.next().await.is_none());
assert!(evt.next().await.is_none());
});
}
#[test]
fn test_sink() {
let curs = Cursor::new(vec![0u8; 24]);
let (_evt, _res, mut sink) = controlstream(curs);
let _ = executor::block_on(sink.send("ABORT\r".to_owned()));
}
}