1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
use anyhow::*;
use aqueue::Actor;
use std::net::SocketAddr;
use std::ops::Deref;
use std::sync::Arc;
use tokio::io::AsyncWriteExt;
use tokio::net::tcp::OwnedWriteHalf;
pub struct TCPPeer {
pub addr: SocketAddr,
pub sender: Option<OwnedWriteHalf>,
}
impl TCPPeer {
#[inline]
pub fn new(addr: SocketAddr, sender: OwnedWriteHalf) -> Arc<Actor<TCPPeer>> {
Arc::new(Actor::new(TCPPeer {
addr,
sender: Some(sender),
}))
}
#[inline]
pub fn is_disconnect(&self) -> bool {
self.sender.is_none()
}
#[inline]
pub async fn send<T: Deref<Target = [u8]> + Send + Sync + 'static>(
&mut self,
buff: T,
) -> Result<usize> {
if let Some(ref mut sender) = self.sender {
Ok(sender.write(&buff).await?)
} else {
bail!("ConnectionReset")
}
}
#[inline]
pub async fn disconnect(&mut self) -> Result<()> {
if let Some(mut sender) = self.sender.take() {
Ok(sender.shutdown().await?)
} else {
Ok(())
}
}
}
#[async_trait::async_trait]
pub trait IPeer {
fn addr(&self) -> SocketAddr;
async fn is_disconnect(&self) -> Result<bool>;
async fn send<T: Deref<Target = [u8]> + Send + Sync + 'static>(&self, buff: T)
-> Result<usize>;
async fn disconnect(&self) -> Result<()>;
}
#[async_trait::async_trait]
impl IPeer for Actor<TCPPeer> {
#[inline]
fn addr(&self) -> SocketAddr {
unsafe { self.deref_inner().addr }
}
#[inline]
async fn is_disconnect(&self) -> Result<bool> {
self.inner_call(async move |inner| Ok(inner.get().is_disconnect()))
.await
}
#[inline]
async fn send<T: Deref<Target = [u8]> + Send + Sync + 'static>(
&self,
buff: T,
) -> Result<usize> {
ensure!(!buff.is_empty(), "send buff is null");
self.inner_call(async move |inner| inner.get_mut().send(buff).await)
.await
}
#[inline]
async fn disconnect(&self) -> Result<()> {
self.inner_call(async move |inner| inner.get_mut().disconnect().await)
.await
}
}