mod reader;
mod writer;
use crate::kcp::kcp_module::prelude::{Kcp, KcpResult};
use async_lock::MutexGuard;
use futures::{
future::{poll_fn, BoxFuture},
FutureExt,
};
pub use reader::*;
use std::fmt::Formatter;
use std::future::Future;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc;
use std::task::{ready, Context, Poll};
pub use writer::*;
pub type KCPPeer = Arc<KcpPeer>;
pub struct KcpPeer {
kcp: async_lock::Mutex<Kcp>,
wake: atomic_waker::AtomicWaker,
is_broken_pipe: AtomicBool,
pub conv: u32,
pub addr: SocketAddr,
pub next_update_time: AtomicU32,
}
impl std::fmt::Display for KcpPeer {
#[inline]
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "({}-{:?})", self.conv, self.addr)
}
}
impl Drop for KcpPeer {
#[inline]
fn drop(&mut self) {
log::trace!("kcp_peer:{} is Drop", self);
}
}
impl KcpPeer {
pub fn new(kcp: Kcp, conv: u32, addr: SocketAddr) -> Arc<KcpPeer> {
Arc::new(Self {
kcp: async_lock::Mutex::new(kcp),
wake: Default::default(),
is_broken_pipe: Default::default(),
conv,
addr,
next_update_time: Default::default(),
})
}
#[inline]
pub(crate) async fn input(&self, buf: &mut [u8], decode: bool) -> KcpResult<usize> {
match self.kcp.lock().await.input(buf, decode) {
Ok(usize) => {
self.wake.wake();
self.next_update_time.store(0, Ordering::Release);
Ok(usize)
}
Err(err) => Err(err),
}
}
#[inline]
pub(crate) fn set_broken_pipe(&self) {
self.is_broken_pipe.store(true, Ordering::Release);
self.wake.wake();
}
#[inline]
pub(crate) async fn close(&self) {
if !self.is_broken_pipe.load(Ordering::Acquire) {
self.kcp.lock().await.output.peer.close();
}
}
#[inline]
pub(crate) async fn recv_buf(&self, buf: &mut [u8]) -> KcpResult<usize> {
self.kcp.lock().await.recv(buf)
}
#[inline]
pub(crate) fn need_update(&self, current: u32) -> bool {
self.next_update_time.load(Ordering::Acquire) < current
}
#[inline]
pub(crate) async fn update(&self, current: u32) -> KcpResult<()> {
if current >= self.next_update_time.load(Ordering::Acquire) {
let mut kcp = self.kcp.lock().await;
kcp.update(current).await?;
self.next_update_time
.store(kcp.check(current) + current, Ordering::Release);
Ok(())
} else {
Ok(())
}
}
#[inline]
pub fn get_addr(&self) -> SocketAddr {
self.addr
}
#[inline]
pub fn get_conv(&self) -> u32 {
self.conv
}
#[inline]
pub async fn recv(&self, buf: &mut [u8]) -> KcpResult<usize> {
struct WaitInput<'a> {
peer: &'a KcpPeer,
state: WaitInputState<'a>,
}
enum WaitInputState<'a> {
Check,
WaitLock(BoxFuture<'a, MutexGuard<'a, Kcp>>),
}
impl<'a> Future for WaitInput<'a> {
type Output = KcpResult<usize>;
#[inline]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match self.state {
WaitInputState::Check => {
self.state = WaitInputState::WaitLock(self.peer.kcp.lock().boxed());
continue;
}
WaitInputState::WaitLock(ref mut lock_future) => {
let kcp = ready!(lock_future.as_mut().poll(cx));
let result_size = match kcp.peek_size() {
Ok(size) => Ok(size),
Err(err) => {
if self.peer.is_broken_pipe.load(Ordering::Acquire) {
Err(crate::prelude::kcp_module::Error::BrokenPipe)
} else {
Err(err)
}
}
};
match result_size {
Ok(size) => {
if size > 0 {
return Ok(size).into();
}
}
Err(crate::prelude::kcp_module::Error::BrokenPipe) => {
return Err(crate::prelude::kcp_module::Error::BrokenPipe)
.into()
}
Err(_) => {
self.peer.wake.register(cx.waker());
self.state = WaitInputState::Check;
return Poll::Pending;
}
}
}
}
}
}
}
let mut wait = WaitInput {
peer: self,
state: WaitInputState::Check,
};
let size = poll_fn(|cx| Pin::new(&mut wait).poll(cx)).await?;
if buf.len() < size {
Err(crate::prelude::kcp_module::Error::UserBufTooSmall(size))
} else {
self.recv_buf(buf).await
}
}
#[inline]
pub async fn send(&self, buf: &[u8]) -> KcpResult<usize> {
self.kcp.lock().await.send(buf)
}
#[inline]
pub async fn flush(&self) -> KcpResult<()> {
self.kcp.lock().await.flush_async().await
}
#[inline]
pub fn get_reader<'a>(self: &'a KCPPeer) -> KcpReader<'a> {
KcpReader::from(self)
}
#[inline]
pub fn get_writer<'a>(self: &'a KCPPeer) -> KcpWriter<'a> {
KcpWriter::from(self)
}
}