use std::fmt;
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use futures::executor;
use futures::io::{AsyncRead, AsyncWrite};
use futures::lock::{Mutex, MutexLockFuture};
use futures::task::{Context, Poll};
use super::connectioninfo::ConnectionInfo;
use crate::tncio::arqstate::ArqState;
use crate::tncio::asynctnc::AsyncTncTcp;
pub struct ArqStream {
tnc: Arc<Mutex<AsyncTncTcp>>,
state: ArqState,
}
impl ArqStream {
pub fn is_open(&self) -> bool {
self.state.is_open()
}
pub fn is_disconnecting(&self) -> bool {
self.state.is_disconnecting()
}
pub fn info(&self) -> &ConnectionInfo {
self.state.info()
}
pub fn bytes_received(&self) -> u64 {
self.state.bytes_received()
}
pub fn bytes_transmitted(&self) -> u64 {
self.state.bytes_transmitted()
}
pub fn bytes_unacknowledged(&self) -> u64 {
self.state.bytes_unacknowledged()
}
pub fn bytes_staged(&self) -> u64 {
self.state.bytes_staged()
}
pub fn elapsed_time(&self) -> Duration {
self.state.elapsed_time()
}
pub(crate) fn new(tnc: Arc<Mutex<AsyncTncTcp>>, info: ConnectionInfo) -> Self {
ArqStream {
tnc,
state: ArqState::new(info),
}
}
}
impl AsyncRead for ArqStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let this = self.get_mut();
let mut lock_future: MutexLockFuture<AsyncTncTcp> = this.tnc.lock();
let mut tnc = ready!(Pin::new(&mut lock_future).poll(cx));
let data = tnc.data_stream_sink();
this.state.poll_read(data, cx, buf)
}
}
impl AsyncWrite for ArqStream {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
let this = self.get_mut();
let mut lock_future: MutexLockFuture<AsyncTncTcp> = this.tnc.lock();
let mut tnc = ready!(Pin::new(&mut lock_future).poll(cx));
let data = tnc.data_stream_sink();
this.state.poll_write(data, cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let this = self.get_mut();
let mut lock_future: MutexLockFuture<AsyncTncTcp> = this.tnc.lock();
let mut tnc = ready!(Pin::new(&mut lock_future).poll(cx));
let data = tnc.data_stream_sink();
this.state.poll_flush(data, cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let this = self.get_mut();
if !this.is_open() {
return Poll::Ready(Ok(()));
}
if !this.is_disconnecting() {
this.state.shutdown_write();
}
let mut lock_future: MutexLockFuture<AsyncTncTcp> = this.tnc.lock();
let mut tnc = ready!(Pin::new(&mut lock_future).poll(cx));
match ready!(tnc.poll_disconnect(cx)) {
Ok(k) => {
this.state.shutdown_read();
Poll::Ready(Ok(k))
}
Err(e) => {
error!(
"Unclean disconnect to {}: {}",
this.state.info().peer_call(),
&e
);
this.state.shutdown_read();
Poll::Ready(Err(e))
}
}
}
}
impl Drop for ArqStream {
fn drop(&mut self) {
if !self.is_open() {
return;
}
self.state.shutdown_write();
let tncref = self.tnc.clone();
executor::block_on(async move {
let mut tnc = tncref.lock().await;
let _ = tnc.disconnect().await;
});
self.state.shutdown_read();
}
}
impl fmt::Display for ArqStream {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.state.fmt(f)
}
}
impl Unpin for ArqStream {}