1pub mod error;
2use aqueue::Actor;
3use error::Result;
4use log::*;
5use std::future::Future;
6use std::net::SocketAddr;
7use std::ops::Deref;
8use std::sync::Arc;
9use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf};
10use tokio::net::{TcpStream, ToSocketAddrs};
11
12pub struct TcpClient<T> {
13 disconnect: bool,
14 sender: WriteHalf<T>,
15}
16
17impl TcpClient<TcpStream> {
18 #[inline]
19 pub async fn connect<
20 T: ToSocketAddrs,
21 F: Future<Output = anyhow::Result<bool>> + Send + 'static,
22 A: Send + 'static,
23 >(
24 addr: T,
25 input: impl FnOnce(A, Arc<Actor<TcpClient<TcpStream>>>, ReadHalf<TcpStream>) -> F
26 + Send
27 + 'static,
28 token: A,
29 ) -> Result<Arc<Actor<TcpClient<TcpStream>>>> {
30 let stream = TcpStream::connect(addr).await?;
31 let target = stream.peer_addr()?;
32 Self::init(input, token, stream, target)
33 }
34}
35
36impl<T> TcpClient<T>
37where
38 T: AsyncRead + AsyncWrite + Send + 'static,
39{
40 #[inline]
41 pub async fn connect_stream_type<
42 H: ToSocketAddrs,
43 F: Future<Output = anyhow::Result<bool>> + Send + 'static,
44 S: Future<Output = anyhow::Result<T>> + Send + 'static,
45 A: Send + 'static,
46 >(
47 addr: H,
48 stream_init: impl FnOnce(TcpStream) -> S + Send + 'static,
49 input: impl FnOnce(A, Arc<Actor<TcpClient<T>>>, ReadHalf<T>) -> F + Send + 'static,
50 token: A,
51 ) -> Result<Arc<Actor<TcpClient<T>>>> {
52 let stream = TcpStream::connect(addr).await?;
53 let target = stream.peer_addr()?;
54 let stream = stream_init(stream).await?;
55 Self::init(input, token, stream, target)
56 }
57
58 #[inline]
59 fn init<F: Future<Output = anyhow::Result<bool>> + Send + 'static, A: Send + 'static>(
60 f: impl FnOnce(A, Arc<Actor<TcpClient<T>>>, ReadHalf<T>) -> F + Send + 'static,
61 token: A,
62 stream: T,
63 target: SocketAddr,
64 ) -> Result<Arc<Actor<TcpClient<T>>>> {
65 let (reader, sender) = tokio::io::split(stream);
66 let client = Arc::new(Actor::new(TcpClient {
67 disconnect: false,
68 sender,
69 }));
70 let read_client = client.clone();
71 tokio::spawn(async move {
72 let disconnect_client = read_client.clone();
73 let need_disconnect = f(token, read_client, reader).await.unwrap_or_else(|err| {
74 error!("reader error:{}", err);
75 true
76 });
77
78 if need_disconnect {
79 if let Err(er) = disconnect_client.disconnect().await {
80 error!("disconnect to{} err:{}", target, er);
81 } else {
82 debug!("disconnect to {}", target)
83 }
84 } else {
85 debug!("{} reader is close", target);
86 }
87 });
88 Ok(client)
89 }
90
91 #[inline]
92 pub async fn disconnect(&mut self) -> Result<()> {
93 if !self.disconnect {
94 self.sender.shutdown().await?;
95 self.disconnect = true;
96 }
97 Ok(())
98 }
99 #[inline]
100 pub async fn send(&mut self, buff: &[u8]) -> Result<usize> {
101 if !self.disconnect {
102 Ok(self.sender.write(buff).await?)
103 } else {
104 Err(error::Error::SendError("Disconnect".to_string()))
105 }
106 }
107
108 #[inline]
109 async fn send_all(&mut self, buff: &[u8]) -> Result<()> {
110 if !self.disconnect {
111 self.sender.write_all(buff).await?;
112 Ok(self.sender.flush().await?)
113 } else {
114 Err(error::Error::SendError("Disconnect".to_string()))
115 }
116 }
117
118 #[inline]
119 pub async fn flush(&mut self) -> Result<()> {
120 if !self.disconnect {
121 Ok(self.sender.flush().await?)
122 } else {
123 Err(error::Error::SendError("Disconnect".to_string()))
124 }
125 }
126}
127
128pub trait SocketClientTrait {
129 fn send<B: Deref<Target = [u8]> + Send + Sync + 'static>(
130 &self,
131 buff: B,
132 ) -> impl Future<Output = Result<usize>>;
133 fn send_all<B: Deref<Target = [u8]> + Send + Sync + 'static>(
134 &self,
135 buff: B,
136 ) -> impl Future<Output = Result<()>>;
137 fn send_ref(&self, buff: &[u8]) -> impl Future<Output = Result<usize>>;
138 fn send_all_ref(&self, buff: &[u8]) -> impl Future<Output = Result<()>>;
139 fn flush(&self) -> impl Future<Output = Result<()>>;
140 fn disconnect(&self) -> impl Future<Output = Result<()>>;
141}
142
143impl<T> SocketClientTrait for Actor<TcpClient<T>>
144where
145 T: AsyncRead + AsyncWrite + Send + 'static,
146{
147 #[inline]
148 async fn send<B: Deref<Target = [u8]> + Send + Sync + 'static>(
149 &self,
150 buff: B,
151 ) -> Result<usize> {
152 self.inner_call(|inner| async move { inner.get_mut().send(&buff).await })
153 .await
154 }
155 #[inline]
156 async fn send_all<B: Deref<Target = [u8]> + Send + Sync + 'static>(
157 &self,
158 buff: B,
159 ) -> Result<()> {
160 self.inner_call(|inner| async move { inner.get_mut().send_all(&buff).await })
161 .await
162 }
163 #[inline]
164 async fn send_ref(&self, buff: &[u8]) -> Result<usize> {
165 if buff.is_empty() {
166 return Err(error::Error::SendError("send buff is none".to_string()));
167 }
168 self.inner_call(|inner| async move { inner.get_mut().send(buff).await })
169 .await
170 }
171
172 #[inline]
173 async fn send_all_ref(&self, buff: &[u8]) -> Result<()> {
174 if buff.is_empty() {
175 return Err(error::Error::SendError("send buff is none".to_string()));
176 }
177 self.inner_call(|inner| async move { inner.get_mut().send_all(buff).await })
178 .await
179 }
180
181 #[inline]
182 async fn flush(&self) -> Result<()> {
183 self.inner_call(|inner| async move { inner.get_mut().flush().await })
184 .await
185 }
186
187 #[inline]
188 async fn disconnect(&self) -> Result<()> {
189 self.inner_call(|inner| async move { inner.get_mut().disconnect().await })
190 .await
191 }
192}