common_uu 1.9.4

公共工具库
Documentation
// use std::{sync::{Arc, mpsc::{Sender, Receiver}}, thread};
// use crossbeam::channel::{Receiver, Sender};
use tokio::sync::mpsc::{Receiver, Sender};

use std::time::Duration;
use std::{
    // io::{Read, Write},
    net::SocketAddr,
    sync::Arc,
};

use crate::IResult;
use crate::tcp_mod::comm::TcpStreamExt;
// use tokio::net::TcpStream;
// use tokio::{spawn, task::JoinHandle};

use tokio::net::{TcpListener, TcpStream};

pub mod comm {
    // use std::net::TcpStream;
    // use tokio::net::TcpStream;

    use bytes::{Buf, BufMut};

    use super::*;

    pub async fn read0(tcp_stream: &TcpStream) -> IResult<Vec<Vec<u8>>> {
        tcp_stream.readable().await?;
        let mut buf = [0; 1024 * 256];
        match tcp_stream.try_read(&mut buf) {
            Ok(0) => {
                return Ok(vec![]);
            }
            Ok(n) => {
                let buf = &buf[..n];
                let mut buf = bytes::BytesMut::from(buf);

                let mut r = vec![];
                while !buf.is_empty() {
                    let len = buf.get_i32_le();
                    let mut data = vec![];
                    for _ in 0..len {
                        data.push(0);
                    }
                    let data_mut = data.as_mut_slice();
                    buf.copy_to_slice(data_mut);
                    r.push(data);
                }

                return Ok(r);
            }
            Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
                return Ok(vec![]);
            }
            Err(e) => {
                return Err(e)?;
            }
        }
    }

    pub async fn write(tcp_stream: &mut std::io::Result<(TcpStream, SocketAddr)>, data: Vec<u8>) -> IResult<usize> {
        match tcp_stream {
            Ok((stream, _)) => {
                return write0(stream, data).await;
            }
            Err(e) => {
                return Err(e.to_string())?;
            }
        }
    }

    pub async fn write2(tcp_stream: &mut std::io::Result<TcpStream>, data: Vec<u8>) -> IResult<usize> {
        match tcp_stream {
            Ok(stream) => {
                return write0(stream, data).await;
            }
            Err(e) => {
                return Err(e.to_string())?;
            }
        }
    }

    pub async fn write0(tcp_stream: &TcpStream, mut input: Vec<u8>) -> IResult<usize> {
        tcp_stream.writable().await?;

        let len = input.len() as i32;
        let mut b = bytes::BytesMut::new();
        b.put_i32_le(len);
        let mut b = b.to_vec();
        b.append(&mut input);

        match tcp_stream.try_write(b.as_slice()) {
            Ok(_n) => {
                return Ok(_n);
            }
            Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
                return Ok(0);
            }
            Err(e) => {
                return Err(e)?;
            }
        };
    }

    #[allow(async_fn_in_trait)]
    pub trait TcpStreamExt {
        async fn read_vec(&self) -> IResult<Vec<Vec<u8>>>;
        async fn write_vec(&self, data: Vec<u8>) -> IResult<usize>;
    }

    impl TcpStreamExt for TcpStream {
        async fn read_vec(&self) -> IResult<Vec<Vec<u8>>> {
            read0(self).await
        }

        async fn write_vec(&self, data: Vec<u8>) -> IResult<usize> {
            write0(self, data).await
        }
    }
}

pub type SendType = Receiver<(Vec<u8>, Option<Sender<Vec<u8>>>)>;
pub type RevType = Sender<Vec<u8>>;

pub async fn server_run<F>(addrs: String, call: F) -> IResult<()>
where
    F: Fn(Arc<TcpStream>),
{
    let conn = TcpListener::bind(addrs.clone()).await?;
    debug!("accept start accept start");

    loop {
        let steam = conn.accept().await;
        debug!("accept start accept done");
        let (tcp_stream, _) = match steam {
            Ok(v) => v,
            Err(e) => {
                error!("conn_c.accept: {:?}", e);
                // thread::sleep(Duration::from_millis(1000));
                tokio::time::sleep(Duration::from_millis(10 * 1000)).await;
                continue;
            }
        };
        call(Arc::new(tcp_stream));
    }
}

pub async fn client_run(addrs: String) -> IResult<Arc<TcpStream>> {
    let stream = TcpStream::connect(addrs).await?;
    Ok(Arc::new(stream))
}

#[tokio::test]
async fn test_server() -> IResult {
    // 初始化日志
    crate::log4rs_mod::init().unwrap();

    let addrs = "127.0.0.1:8001";

    server_run(addrs.to_string(), |stream| {
        // 异步收取数据
        let stream_clone = stream.clone();
        tokio::spawn(async move {
            loop {
                let msgs = stream_clone.read_vec().await.unwrap_or_default();
                if msgs.is_empty() {
                    continue;
                }
                if msgs.len() > 1 {
                    warn!("粘包了: {}条数据", msgs.len());
                }
                for msg in msgs {
                    let msg = String::from_utf8(msg).unwrap_or_default();
                    info!("server收到的内容: {msg}");
                }
            }
        });

        // 异步发送数据
        let stream_clone = stream.clone();
        tokio::spawn(async move {
            for i in 1..100 {
                let data = format!("server data: {i}");
                if let Err(e) = stream_clone.write_vec(data.clone().into_bytes()).await {
                    error!("write_vec: {e:?}");
                }
                info!("server发送数据: {data}");
                tokio::time::sleep(Duration::from_millis(3000)).await;
            }
        });
    })
    .await?;

    tokio::signal::ctrl_c().await?;

    Ok(())
}

#[tokio::test]
async fn test_client() -> IResult {
    // 初始化日志
    crate::log4rs_mod::init().unwrap();

    let addrs = "127.0.0.1:8001";

    let stream = client_run(addrs.to_string()).await?;

    // 异步收取数据
    let stream_clone = stream.clone();
    tokio::spawn(async move {
        loop {
            let msgs = stream_clone.read_vec().await.unwrap_or_default();
            if msgs.is_empty() {
                continue;
            }
            if msgs.len() > 1 {
                warn!("粘包了: {}条数据", msgs.len());
            }
            for msg in msgs {
                let msg = String::from_utf8(msg).unwrap_or_default();
                info!("client收到的内容: {msg}");
            }
        }
    });

    // 异步发送数据
    let stream_clone = stream.clone();
    tokio::spawn(async move {
        for i in 1..100 {
            let data = format!("client data: {i}");
            if let Err(e) = stream_clone.write_vec(data.clone().into_bytes()).await {
                error!("write_vec: {e:?}");
            }
            info!("client发送数据: {data}");
            // tokio::time::sleep(Duration::from_millis(3000)).await;
        }
    });
    let stream_clone = stream.clone();
    tokio::spawn(async move {
        for i in 1..100 {
            let data = format!("client data: {i}");
            if let Err(e) = stream_clone.write_vec(data.clone().into_bytes()).await {
                error!("write_vec: {e:?}");
            }
            info!("client发送数据: {data}");
            // tokio::time::sleep(Duration::from_millis(3000)).await;
        }
    });

    tokio::signal::ctrl_c().await?;
    Ok(())
}

#[tokio::test]
async fn test_client2() -> IResult {
    // 初始化日志
    crate::log4rs_mod::init().unwrap();

    let addrs = "127.0.0.1:8001";

    {
        let stream = client_run(addrs.to_string()).await?;

        // 异步收取数据
        let stream_clone = stream.clone();
        tokio::spawn(async move {
            loop {
                let msgs = stream_clone.read_vec().await.unwrap_or_default();
                if msgs.is_empty() {
                    continue;
                }
                if msgs.len() > 1 {
                    warn!("粘包了: {}条数据", msgs.len());
                }
                for msg in msgs {
                    let msg = String::from_utf8(msg).unwrap_or_default();
                    info!("client收到的内容: {msg}");
                }
            }
        });

        // 异步发送数据
        let stream_clone = stream.clone();
        tokio::spawn(async move {
            for i in 1..100 {
                let data = format!("client data: {i}");
                if let Err(e) = stream_clone.write_vec(data.clone().into_bytes()).await {
                    error!("write_vec: {e:?}");
                }
                info!("client发送数据: {data}");
                tokio::time::sleep(Duration::from_millis(3000)).await;
            }
        });
    }

    {
        let stream = client_run(addrs.to_string()).await?;

        // 异步收取数据
        let stream_clone = stream.clone();
        tokio::spawn(async move {
            loop {
                let msgs = stream_clone.read_vec().await.unwrap_or_default();
                if msgs.is_empty() {
                    continue;
                }
                if msgs.len() > 1 {
                    warn!("粘包了: {}条数据", msgs.len());
                }
                for msg in msgs {
                    let msg = String::from_utf8(msg).unwrap_or_default();
                    info!("client收到的内容2: {msg}");
                }
            }
        });

        // 异步发送数据
        let stream_clone = stream.clone();
        tokio::spawn(async move {
            for i in 1..100 {
                let data = format!("client data: {i}");
                if let Err(e) = stream_clone.write_vec(data.clone().into_bytes()).await {
                    error!("write_vec: {e:?}");
                }
                info!("client发送数据2: {data}");
                tokio::time::sleep(Duration::from_millis(3000)).await;
            }
        });
    }

    tokio::signal::ctrl_c().await?;
    Ok(())
}