1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
#![feature(async_closure)] use tokio::net::{TcpStream, ToSocketAddrs}; use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; use tokio::io::AsyncWriteExt; use aqueue::Actor; use std::ops::Deref; use std::sync::Arc; use log::*; use std::future::Future; use anyhow::*; pub struct TcpClient { disconnect:bool, sender:OwnedWriteHalf } impl TcpClient { #[inline] pub async fn connect<T:ToSocketAddrs,F:Future<Output=Result<bool>>+Send+'static,A:Send+'static>(addr:T, f:impl FnOnce(A,Arc<Actor<TcpClient>>,OwnedReadHalf)->F+Send+'static,token:A) ->Result<Arc<Actor<TcpClient>>>{ let stream= TcpStream::connect(addr).await?; let target= stream.peer_addr()?; let(reader,sender)= stream.into_split(); let client=Arc::new(Actor::new(TcpClient { disconnect:false, sender })); let read_client=client.clone(); tokio::spawn(async move{ let disconnect_client=read_client.clone(); let need_disconnect= match f(token,read_client,reader).await{ Ok(disconnect)=>{ disconnect }, Err(err)=>{ error!("reader error:{}",err); true } }; if need_disconnect { if let Err(er)= disconnect_client.disconnect().await{ error!("disconnect to{} err:{}",target,er); } else{ debug!("disconnect to {}",target) } } else{ debug!("{} reader is close",target); } }); Ok(client) } #[inline] pub async fn disconnect(&mut self)->Result<()>{ if !self.disconnect { self.sender.shutdown().await?; self.disconnect = true; } Ok(()) } #[inline] pub async fn send(&mut self, buff:&[u8])->Result<usize>{ if !self.disconnect { Ok(self.sender.write(buff).await?) }else{ bail!("Send Error,Disconnect") } } } #[async_trait::async_trait] pub trait SocketClientTrait{ async fn send<T:Deref<Target=[u8]>+Send+Sync+'static>(&self,buff:T)->Result<usize>; async fn disconnect(&self)->Result<()>; } #[async_trait::async_trait] impl SocketClientTrait for Actor<TcpClient>{ #[inline] async fn send<T:Deref<Target=[u8]>+Send+Sync+'static>(&self, buff:T)->Result<usize>{ self.inner_call(async move |inner|{ inner.get_mut().send(&buff).await }).await } #[inline] async fn disconnect(&self) ->Result<()> { self.inner_call(async move |inner| { inner.get_mut().disconnect().await }).await } }