tcpclient/
lib.rs

1pub mod error;
2use aqueue::Actor;
3use error::Result;
4use log::*;
5use std::future::Future;
6use std::net::SocketAddr;
7use std::ops::Deref;
8use std::sync::Arc;
9use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf};
10use tokio::net::{TcpStream, ToSocketAddrs};
11
12pub struct TcpClient<T> {
13    disconnect: bool,
14    sender: WriteHalf<T>,
15}
16
17impl TcpClient<TcpStream> {
18    #[inline]
19    pub async fn connect<
20        T: ToSocketAddrs,
21        F: Future<Output = anyhow::Result<bool>> + Send + 'static,
22        A: Send + 'static,
23    >(
24        addr: T,
25        input: impl FnOnce(A, Arc<Actor<TcpClient<TcpStream>>>, ReadHalf<TcpStream>) -> F
26            + Send
27            + 'static,
28        token: A,
29    ) -> Result<Arc<Actor<TcpClient<TcpStream>>>> {
30        let stream = TcpStream::connect(addr).await?;
31        let target = stream.peer_addr()?;
32        Self::init(input, token, stream, target)
33    }
34}
35
36impl<T> TcpClient<T>
37where
38    T: AsyncRead + AsyncWrite + Send + 'static,
39{
40    #[inline]
41    pub async fn connect_stream_type<
42        H: ToSocketAddrs,
43        F: Future<Output = anyhow::Result<bool>> + Send + 'static,
44        S: Future<Output = anyhow::Result<T>> + Send + 'static,
45        A: Send + 'static,
46    >(
47        addr: H,
48        stream_init: impl FnOnce(TcpStream) -> S + Send + 'static,
49        input: impl FnOnce(A, Arc<Actor<TcpClient<T>>>, ReadHalf<T>) -> F + Send + 'static,
50        token: A,
51    ) -> Result<Arc<Actor<TcpClient<T>>>> {
52        let stream = TcpStream::connect(addr).await?;
53        let target = stream.peer_addr()?;
54        let stream = stream_init(stream).await?;
55        Self::init(input, token, stream, target)
56    }
57
58    #[inline]
59    fn init<F: Future<Output = anyhow::Result<bool>> + Send + 'static, A: Send + 'static>(
60        f: impl FnOnce(A, Arc<Actor<TcpClient<T>>>, ReadHalf<T>) -> F + Send + 'static,
61        token: A,
62        stream: T,
63        target: SocketAddr,
64    ) -> Result<Arc<Actor<TcpClient<T>>>> {
65        let (reader, sender) = tokio::io::split(stream);
66        let client = Arc::new(Actor::new(TcpClient {
67            disconnect: false,
68            sender,
69        }));
70        let read_client = client.clone();
71        tokio::spawn(async move {
72            let disconnect_client = read_client.clone();
73            let need_disconnect = f(token, read_client, reader).await.unwrap_or_else(|err| {
74                error!("reader error:{}", err);
75                true
76            });
77
78            if need_disconnect {
79                if let Err(er) = disconnect_client.disconnect().await {
80                    error!("disconnect to{} err:{}", target, er);
81                } else {
82                    debug!("disconnect to {}", target)
83                }
84            } else {
85                debug!("{} reader is close", target);
86            }
87        });
88        Ok(client)
89    }
90
91    #[inline]
92    pub async fn disconnect(&mut self) -> Result<()> {
93        if !self.disconnect {
94            self.sender.shutdown().await?;
95            self.disconnect = true;
96        }
97        Ok(())
98    }
99    #[inline]
100    pub async fn send(&mut self, buff: &[u8]) -> Result<usize> {
101        if !self.disconnect {
102            Ok(self.sender.write(buff).await?)
103        } else {
104            Err(error::Error::SendError("Disconnect".to_string()))
105        }
106    }
107
108    #[inline]
109    async fn send_all(&mut self, buff: &[u8]) -> Result<()> {
110        if !self.disconnect {
111            self.sender.write_all(buff).await?;
112            Ok(self.sender.flush().await?)
113        } else {
114            Err(error::Error::SendError("Disconnect".to_string()))
115        }
116    }
117
118    #[inline]
119    pub async fn flush(&mut self) -> Result<()> {
120        if !self.disconnect {
121            Ok(self.sender.flush().await?)
122        } else {
123            Err(error::Error::SendError("Disconnect".to_string()))
124        }
125    }
126}
127
128pub trait SocketClientTrait {
129    fn send<B: Deref<Target = [u8]> + Send + Sync + 'static>(
130        &self,
131        buff: B,
132    ) -> impl Future<Output = Result<usize>>;
133    fn send_all<B: Deref<Target = [u8]> + Send + Sync + 'static>(
134        &self,
135        buff: B,
136    ) -> impl Future<Output = Result<()>>;
137    fn send_ref(&self, buff: &[u8]) -> impl Future<Output = Result<usize>>;
138    fn send_all_ref(&self, buff: &[u8]) -> impl Future<Output = Result<()>>;
139    fn flush(&self) -> impl Future<Output = Result<()>>;
140    fn disconnect(&self) -> impl Future<Output = Result<()>>;
141}
142
143impl<T> SocketClientTrait for Actor<TcpClient<T>>
144where
145    T: AsyncRead + AsyncWrite + Send + 'static,
146{
147    #[inline]
148    async fn send<B: Deref<Target = [u8]> + Send + Sync + 'static>(
149        &self,
150        buff: B,
151    ) -> Result<usize> {
152        self.inner_call(|inner| async move { inner.get_mut().send(&buff).await })
153            .await
154    }
155    #[inline]
156    async fn send_all<B: Deref<Target = [u8]> + Send + Sync + 'static>(
157        &self,
158        buff: B,
159    ) -> Result<()> {
160        self.inner_call(|inner| async move { inner.get_mut().send_all(&buff).await })
161            .await
162    }
163    #[inline]
164    async fn send_ref(&self, buff: &[u8]) -> Result<usize> {
165        if buff.is_empty() {
166            return Err(error::Error::SendError("send buff is none".to_string()));
167        }
168        self.inner_call(|inner| async move { inner.get_mut().send(buff).await })
169            .await
170    }
171
172    #[inline]
173    async fn send_all_ref(&self, buff: &[u8]) -> Result<()> {
174        if buff.is_empty() {
175            return Err(error::Error::SendError("send buff is none".to_string()));
176        }
177        self.inner_call(|inner| async move { inner.get_mut().send_all(buff).await })
178            .await
179    }
180
181    #[inline]
182    async fn flush(&self) -> Result<()> {
183        self.inner_call(|inner| async move { inner.get_mut().flush().await })
184            .await
185    }
186
187    #[inline]
188    async fn disconnect(&self) -> Result<()> {
189        self.inner_call(|inner| async move { inner.get_mut().disconnect().await })
190            .await
191    }
192}