use crate::{Stream, error::ConnectionError};
use futures::{ready, channel::{mpsc, oneshot}, prelude::*};
use std::{pin::Pin, task::{Context, Poll}};
use super::ControlCommand;
type Result<T> = std::result::Result<T, ConnectionError>;
#[derive(Debug)]
pub struct Control {
sender: mpsc::Sender<ControlCommand>,
pending_open: Option<oneshot::Receiver<Result<Stream>>>,
pending_close: Option<oneshot::Receiver<()>>
}
impl Clone for Control {
fn clone(&self) -> Self {
Control {
sender: self.sender.clone(),
pending_open: None,
pending_close: None
}
}
}
impl Control {
pub(crate) fn new(sender: mpsc::Sender<ControlCommand>) -> Self {
Control {
sender,
pending_open: None,
pending_close: None
}
}
pub async fn open_stream(&mut self) -> Result<Stream> {
let (tx, rx) = oneshot::channel();
self.sender.send(ControlCommand::OpenStream(tx)).await?;
rx.await?
}
pub async fn close(&mut self) -> Result<()> {
let (tx, rx) = oneshot::channel();
if self.sender.send(ControlCommand::CloseConnection(tx)).await.is_err() {
return Ok(())
}
let _ = rx.await;
Ok(())
}
pub fn poll_open_stream(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<Stream>> {
loop {
match self.pending_open.take() {
None => {
ready!(self.sender.poll_ready(cx)?);
let (tx, rx) = oneshot::channel();
self.sender.start_send(ControlCommand::OpenStream(tx))?;
self.pending_open = Some(rx)
}
Some(mut rx) => match rx.poll_unpin(cx)? {
Poll::Ready(result) => {
return Poll::Ready(result)
}
Poll::Pending => {
self.pending_open = Some(rx);
return Poll::Pending
}
}
}
}
}
pub fn abort_open_stream(&mut self) {
self.pending_open = None
}
pub fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
loop {
match self.pending_close.take() {
None => {
if ready!(self.sender.poll_ready(cx)).is_err() {
return Poll::Ready(Ok(()))
}
let (tx, rx) = oneshot::channel();
if let Err(e) = self.sender.start_send(ControlCommand::CloseConnection(tx)) {
if e.is_full() {
continue
}
debug_assert!(e.is_disconnected());
return Poll::Ready(Ok(()))
}
self.pending_close = Some(rx)
}
Some(mut rx) => match rx.poll_unpin(cx) {
Poll::Ready(Ok(())) => {
return Poll::Ready(Ok(()))
}
Poll::Ready(Err(oneshot::Canceled)) => {
return Poll::Ready(Ok(()))
}
Poll::Pending => {
self.pending_close = Some(rx);
return Poll::Pending
}
}
}
}
}
}