use tokio::sync::mpsc::{Receiver, Sender};
use std::time::Duration;
use std::{
net::SocketAddr,
sync::Arc,
};
use crate::IResult;
use crate::tcp_mod::comm::TcpStreamExt;
use tokio::net::{TcpListener, TcpStream};
pub mod comm {
use bytes::{Buf, BufMut};
use super::*;
pub async fn read0(tcp_stream: &TcpStream) -> IResult<Vec<Vec<u8>>> {
tcp_stream.readable().await?;
let mut buf = [0; 1024 * 256];
match tcp_stream.try_read(&mut buf) {
Ok(0) => {
return Ok(vec![]);
}
Ok(n) => {
let buf = &buf[..n];
let mut buf = bytes::BytesMut::from(buf);
let mut r = vec![];
while !buf.is_empty() {
let len = buf.get_i32_le();
let mut data = vec![];
for _ in 0..len {
data.push(0);
}
let data_mut = data.as_mut_slice();
buf.copy_to_slice(data_mut);
r.push(data);
}
return Ok(r);
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
return Ok(vec![]);
}
Err(e) => {
return Err(e)?;
}
}
}
pub async fn write(tcp_stream: &mut std::io::Result<(TcpStream, SocketAddr)>, data: Vec<u8>) -> IResult<usize> {
match tcp_stream {
Ok((stream, _)) => {
return write0(stream, data).await;
}
Err(e) => {
return Err(e.to_string())?;
}
}
}
pub async fn write2(tcp_stream: &mut std::io::Result<TcpStream>, data: Vec<u8>) -> IResult<usize> {
match tcp_stream {
Ok(stream) => {
return write0(stream, data).await;
}
Err(e) => {
return Err(e.to_string())?;
}
}
}
pub async fn write0(tcp_stream: &TcpStream, mut input: Vec<u8>) -> IResult<usize> {
tcp_stream.writable().await?;
let len = input.len() as i32;
let mut b = bytes::BytesMut::new();
b.put_i32_le(len);
let mut b = b.to_vec();
b.append(&mut input);
match tcp_stream.try_write(b.as_slice()) {
Ok(_n) => {
return Ok(_n);
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
return Ok(0);
}
Err(e) => {
return Err(e)?;
}
};
}
#[allow(async_fn_in_trait)]
pub trait TcpStreamExt {
async fn read_vec(&self) -> IResult<Vec<Vec<u8>>>;
async fn write_vec(&self, data: Vec<u8>) -> IResult<usize>;
}
impl TcpStreamExt for TcpStream {
async fn read_vec(&self) -> IResult<Vec<Vec<u8>>> {
read0(self).await
}
async fn write_vec(&self, data: Vec<u8>) -> IResult<usize> {
write0(self, data).await
}
}
}
pub type SendType = Receiver<(Vec<u8>, Option<Sender<Vec<u8>>>)>;
pub type RevType = Sender<Vec<u8>>;
pub async fn server_run<F>(addrs: String, call: F) -> IResult<()>
where
F: Fn(Arc<TcpStream>),
{
let conn = TcpListener::bind(addrs.clone()).await?;
debug!("accept start accept start");
loop {
let steam = conn.accept().await;
debug!("accept start accept done");
let (tcp_stream, _) = match steam {
Ok(v) => v,
Err(e) => {
error!("conn_c.accept: {:?}", e);
tokio::time::sleep(Duration::from_millis(10 * 1000)).await;
continue;
}
};
call(Arc::new(tcp_stream));
}
}
pub async fn client_run(addrs: String) -> IResult<Arc<TcpStream>> {
let stream = TcpStream::connect(addrs).await?;
Ok(Arc::new(stream))
}
#[tokio::test]
async fn test_server() -> IResult {
crate::log4rs_mod::init().unwrap();
let addrs = "127.0.0.1:8001";
server_run(addrs.to_string(), |stream| {
let stream_clone = stream.clone();
tokio::spawn(async move {
loop {
let msgs = stream_clone.read_vec().await.unwrap_or_default();
if msgs.is_empty() {
continue;
}
if msgs.len() > 1 {
warn!("粘包了: {}条数据", msgs.len());
}
for msg in msgs {
let msg = String::from_utf8(msg).unwrap_or_default();
info!("server收到的内容: {msg}");
}
}
});
let stream_clone = stream.clone();
tokio::spawn(async move {
for i in 1..100 {
let data = format!("server data: {i}");
if let Err(e) = stream_clone.write_vec(data.clone().into_bytes()).await {
error!("write_vec: {e:?}");
}
info!("server发送数据: {data}");
tokio::time::sleep(Duration::from_millis(3000)).await;
}
});
})
.await?;
tokio::signal::ctrl_c().await?;
Ok(())
}
#[tokio::test]
async fn test_client() -> IResult {
crate::log4rs_mod::init().unwrap();
let addrs = "127.0.0.1:8001";
let stream = client_run(addrs.to_string()).await?;
let stream_clone = stream.clone();
tokio::spawn(async move {
loop {
let msgs = stream_clone.read_vec().await.unwrap_or_default();
if msgs.is_empty() {
continue;
}
if msgs.len() > 1 {
warn!("粘包了: {}条数据", msgs.len());
}
for msg in msgs {
let msg = String::from_utf8(msg).unwrap_or_default();
info!("client收到的内容: {msg}");
}
}
});
let stream_clone = stream.clone();
tokio::spawn(async move {
for i in 1..100 {
let data = format!("client data: {i}");
if let Err(e) = stream_clone.write_vec(data.clone().into_bytes()).await {
error!("write_vec: {e:?}");
}
info!("client发送数据: {data}");
}
});
let stream_clone = stream.clone();
tokio::spawn(async move {
for i in 1..100 {
let data = format!("client data: {i}");
if let Err(e) = stream_clone.write_vec(data.clone().into_bytes()).await {
error!("write_vec: {e:?}");
}
info!("client发送数据: {data}");
}
});
tokio::signal::ctrl_c().await?;
Ok(())
}
#[tokio::test]
async fn test_client2() -> IResult {
crate::log4rs_mod::init().unwrap();
let addrs = "127.0.0.1:8001";
{
let stream = client_run(addrs.to_string()).await?;
let stream_clone = stream.clone();
tokio::spawn(async move {
loop {
let msgs = stream_clone.read_vec().await.unwrap_or_default();
if msgs.is_empty() {
continue;
}
if msgs.len() > 1 {
warn!("粘包了: {}条数据", msgs.len());
}
for msg in msgs {
let msg = String::from_utf8(msg).unwrap_or_default();
info!("client收到的内容: {msg}");
}
}
});
let stream_clone = stream.clone();
tokio::spawn(async move {
for i in 1..100 {
let data = format!("client data: {i}");
if let Err(e) = stream_clone.write_vec(data.clone().into_bytes()).await {
error!("write_vec: {e:?}");
}
info!("client发送数据: {data}");
tokio::time::sleep(Duration::from_millis(3000)).await;
}
});
}
{
let stream = client_run(addrs.to_string()).await?;
let stream_clone = stream.clone();
tokio::spawn(async move {
loop {
let msgs = stream_clone.read_vec().await.unwrap_or_default();
if msgs.is_empty() {
continue;
}
if msgs.len() > 1 {
warn!("粘包了: {}条数据", msgs.len());
}
for msg in msgs {
let msg = String::from_utf8(msg).unwrap_or_default();
info!("client收到的内容2: {msg}");
}
}
});
let stream_clone = stream.clone();
tokio::spawn(async move {
for i in 1..100 {
let data = format!("client data: {i}");
if let Err(e) = stream_clone.write_vec(data.clone().into_bytes()).await {
error!("write_vec: {e:?}");
}
info!("client发送数据2: {data}");
tokio::time::sleep(Duration::from_millis(3000)).await;
}
});
}
tokio::signal::ctrl_c().await?;
Ok(())
}