use super::KCPPeer;
use crate::kcp::kcp_module::prelude::Kcp;
use crate::prelude::kcp_module::KcpResult;
use async_lock::MutexGuard;
use futures::future::BoxFuture;
use futures::FutureExt;
use std::pin::Pin;
use std::task::{ready, Context, Poll};
pub struct KcpWriter<'a> {
peer: &'a KCPPeer,
state: KcpWriterState<'a>,
}
enum KcpWriterState<'a> {
Begin,
Write(BoxFuture<'a, MutexGuard<'a, Kcp>>),
FlushAsync(BoxFuture<'a, KcpResult<()>>),
Close(BoxFuture<'a, ()>),
}
impl<'a> From<&'a KCPPeer> for KcpWriter<'a> {
fn from(peer: &'a KCPPeer) -> Self {
Self {
peer,
state: KcpWriterState::Begin,
}
}
}
impl KcpWriter<'_> {
#[inline]
fn poll_send(&mut self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<std::io::Result<usize>> {
loop {
match self.state {
KcpWriterState::Begin => {
self.state = KcpWriterState::Write(self.peer.kcp.lock().boxed());
}
KcpWriterState::Write(ref mut write_lock) => {
let mut kcp = ready!(write_lock.as_mut().poll(cx));
self.state = KcpWriterState::Begin;
return match kcp.send(buf) {
Ok(size) => Ok(size).into(),
Err(err) => Err(err.into()).into(),
};
}
_ => panic!("poll_send KcpWriterState error state"),
}
}
}
#[inline]
fn poll_flush_async(&mut self, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
loop {
match self.state {
KcpWriterState::Begin => {
self.state = KcpWriterState::FlushAsync(self.peer.flush().boxed());
}
KcpWriterState::FlushAsync(ref mut flush_future) => {
let res = ready!(flush_future.as_mut().poll(cx));
self.state = KcpWriterState::Begin;
return match res {
Ok(()) => Ok(()).into(),
Err(err) => Err(err.into()).into(),
};
}
_ => panic!("poll_flush_async KcpWriterState error state"),
}
}
}
#[inline]
fn poll_close_async(&mut self, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
loop {
match self.state {
KcpWriterState::Begin => {
self.state = KcpWriterState::Close(self.peer.close().boxed());
}
KcpWriterState::Close(ref mut close_future) => {
ready!(close_future.as_mut().poll(cx));
self.state = KcpWriterState::Begin;
return Ok(()).into();
}
_ => panic!("poll_close KcpWriterState error state"),
}
}
}
}
impl<'a> futures::AsyncWrite for KcpWriter<'a> {
#[inline]
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
self.poll_send(cx, buf)
}
#[inline]
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
self.poll_flush_async(cx)
}
#[inline]
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
self.poll_close_async(cx)
}
}
impl<'a> tokio::io::AsyncWrite for KcpWriter<'a> {
#[inline]
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
self.poll_send(cx, buf)
}
#[inline]
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
self.poll_flush_async(cx)
}
#[inline]
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
self.poll_close_async(cx)
}
}