use tokio::io::AsyncWriteExt;
use tokio::sync::mpsc::{Receiver, Sender};
use std::time::Duration;
use crate::IResult;
use tokio::net::{TcpListener, TcpStream};
use bytes::{BufMut};
use tokio::io::{AsyncReadExt};
#[allow(async_fn_in_trait)]
pub trait TcpStreamAsyncExt {
async fn read_str(&mut self) -> IResult<String>;
async fn write_str(&mut self, data: String) -> IResult<usize>;
}
impl TcpStreamAsyncExt for TcpStream {
async fn read_str(&mut self) -> IResult<String> {
self.readable().await?;
let len = match self.read_i32_le().await {
Ok(v) => v,
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
tokio::time::sleep(Duration::from_millis(600)).await;
return Ok("".into());
}
Err(e) => {
return Err(e)?;
}
};
let mut data = vec![];
for _ in 0..len {
data.push(0);
}
let data_mut = data.as_mut_slice();
self.read(data_mut).await?;
let r = String::from_utf8(data)?;
Ok(r)
}
async fn write_str(&mut self, data: String) -> IResult<usize> {
self.writable().await?;
let mut data = data.into_bytes();
let len = data.len() as i32;
let mut b = bytes::BytesMut::new();
b.put_i32_le(len);
let mut b = b.to_vec();
b.append(&mut data);
let size = self.write(b.as_slice()).await?;
if let Err(e) = self.flush().await {
error!("socket flush: {e:?}");
}
Ok(size)
}
}
pub type SendType = Receiver<(Vec<u8>, Option<Sender<Vec<u8>>>)>;
pub type RevType = Sender<Vec<u8>>;
pub async fn server_run<F>(addrs: String, call: F) -> IResult<()>
where
F: Fn(TcpStream),
{
let conn = TcpListener::bind(addrs.clone()).await?;
debug!("accept start accept start");
loop {
let steam = conn.accept().await;
debug!("accept start accept done");
let (tcp_stream, _) = match steam {
Ok(v) => v,
Err(e) => {
error!("conn_c.accept: {:?}", e);
tokio::time::sleep(Duration::from_millis(10 * 1000)).await;
continue;
}
};
call(tcp_stream);
}
}
pub async fn client_run(addrs: String) -> IResult<TcpStream> {
let stream = TcpStream::connect(addrs).await?;
Ok(stream)
}
#[tokio::test]
async fn test_server_str() -> IResult {
crate::log4rs_mod::init().unwrap();
let addrs = "127.0.0.1:8001";
server_run(addrs.to_string(), |mut stream| {
tokio::spawn(async move {
let mut stream = stream;
for i in 0..50 {
tokio::time::sleep(Duration::from_millis(500)).await;
if i != 0 && i % 10 == 0 {
if let Err(e) = stream.write_str(format!("server send data - {i}")).await {
error!("stream.write_str: {e:?}");
break;
};
}
}
});
})
.await?;
Ok(())
}
#[tokio::test]
async fn test_client_str() -> IResult {
crate::log4rs_mod::init().unwrap();
let addrs = "127.0.0.1:8001";
let mut stream = client_run(addrs.to_string()).await?;
tokio::spawn(async move {
loop {
let msg = match stream.read_str().await {
Ok(v) => v,
Err(e) => {
error!("stream.read_str(): {e:?}");
tokio::time::sleep(Duration::from_secs(1)).await;
break;
}
};
if msg.len() < 1 {
warn!("数据长度为零");
continue;
}
info!("client收到的内容: {msg}");
}
});
tokio::signal::ctrl_c().await?;
Ok(())
}