1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
use anyhow::*;
use aqueue::Actor;
use std::net::SocketAddr;
use std::ops::Deref;
use std::sync::Arc;
use tokio::io::AsyncWriteExt;
use tokio::net::tcp::OwnedWriteHalf;

pub struct TCPPeer {
    pub addr: SocketAddr,
    pub sender: Option<OwnedWriteHalf>,
}

impl TCPPeer {
    /// 创建一个TCP PEER
    #[inline]
    pub fn new(addr: SocketAddr, sender: OwnedWriteHalf) -> Arc<Actor<TCPPeer>> {
        Arc::new(Actor::new(TCPPeer {
            addr,
            sender: Some(sender),
        }))
    }
    /// 是否断线
    #[inline]
    pub fn is_disconnect(&self) -> bool {
        self.sender.is_none()
    }

    /// 发送
    #[inline]
    pub async fn send<T: Deref<Target = [u8]> + Send + Sync + 'static>(
        &mut self,
        buff: T,
    ) -> Result<usize> {
        if let Some(ref mut sender) = self.sender {
            Ok(sender.write(&buff).await?)
        } else {
            bail!("ConnectionReset")
        }
    }

    /// 掐线
    #[inline]
    pub async fn disconnect(&mut self) -> Result<()> {
        if let Some(mut sender) = self.sender.take() {
            Ok(sender.shutdown().await?)
        } else {
            Ok(())
        }
    }
}

#[async_trait::async_trait]
pub trait IPeer {
    fn addr(&self) -> SocketAddr;
    async fn is_disconnect(&self) -> Result<bool>;
    async fn send<T: Deref<Target = [u8]> + Send + Sync + 'static>(&self, buff: T)
        -> Result<usize>;
    async fn disconnect(&self) -> Result<()>;
}

#[async_trait::async_trait]
impl IPeer for Actor<TCPPeer> {
    #[inline]
    fn addr(&self) -> SocketAddr {
        unsafe { self.deref_inner().addr }
    }

    #[inline]
    async fn is_disconnect(&self) -> Result<bool> {
        self.inner_call(async move |inner| Ok(inner.get().is_disconnect()))
            .await
    }

    #[inline]
    async fn send<T: Deref<Target = [u8]> + Send + Sync + 'static>(
        &self,
        buff: T,
    ) -> Result<usize> {
        ensure!(!buff.is_empty(), "send buff is null");
        self.inner_call(async move |inner| inner.get_mut().send(buff).await)
            .await
    }

    #[inline]
    async fn disconnect(&self) -> Result<()> {
        self.inner_call(async move |inner| inner.get_mut().disconnect().await)
            .await
    }
}