common_uu 1.9.4

公共工具库
Documentation
use tokio::io::AsyncWriteExt;
// use std::{sync::{Arc, mpsc::{Sender, Receiver}}, thread};
// use crossbeam::channel::{Receiver, Sender};
use tokio::sync::mpsc::{Receiver, Sender};

use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;

use crate::IResult;
// use tokio::net::TcpStream;
// use tokio::{spawn, task::JoinHandle};

use tokio::net::{TcpListener, TcpStream};
use tokio::spawn;

// use std::net::TcpStream;
// use tokio::net::TcpStream;

use bytes::{Buf, BufMut};
use tokio::io::{AsyncReadExt};

use super::*;

#[async_trait::async_trait]
pub trait TcpStreamAsyncExt {
    async fn read_ext(&mut self) -> IResult<Vec<u8>>;
    async fn write_ext(&mut self, data: Vec<u8>) -> IResult<usize>;
}

#[async_trait::async_trait]
impl TcpStreamAsyncExt for TcpStream {
    async fn read_ext(&mut self) -> IResult<Vec<u8>> {
        self.readable().await?;
        let len = match self.read_i32_le().await {
            Ok(v) => v,
            Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
                tokio::time::sleep(Duration::from_millis(600)).await;
                return Ok("".into());
            }
            Err(e) => {
                return Err(e)?;
            }
        };

        let mut data = vec![];
        for _ in 0..len {
            data.push(0);
        }
        let data_mut = data.as_mut_slice();
        self.read(data_mut).await?;

        Ok(data)
    }

    async fn write_ext(&mut self, mut data: Vec<u8>) -> IResult<usize> {
        self.writable().await?;
        let len = data.len() as i32;
        let mut b = bytes::BytesMut::new();
        b.put_i32_le(len);
        let mut b = b.to_vec();
        b.append(&mut data);
        let size = self.write(b.as_slice()).await?;
        if let Err(e) = self.flush().await {
            error!("socket flush: {e:?}");
        }
        Ok(size)
    }
}

pub type SendType = Receiver<(Vec<u8>, Option<Sender<Vec<u8>>>)>;
pub type RevType = Sender<Vec<u8>>;

pub async fn server_run<F>(addrs: String, call: F) -> IResult<()>
where
    F: Fn(TcpStream),
{
    let conn = TcpListener::bind(addrs.clone()).await?;
    debug!("accept start accept start");

    loop {
        let steam = conn.accept().await;
        debug!("accept start accept done");
        let (tcp_stream, _) = match steam {
            Ok(v) => v,
            Err(e) => {
                error!("conn_c.accept: {:?}", e);
                // thread::sleep(Duration::from_millis(1000));
                tokio::time::sleep(Duration::from_millis(10 * 1000)).await;
                continue;
            }
        };
        call(tcp_stream);
    }
}

pub async fn client_run(addrs: String) -> IResult<TcpStream> {
    let stream = TcpStream::connect(addrs).await?;
    Ok(stream)
}

#[tokio::test]
async fn test_server() -> IResult {

    // 初始化日志
    crate::log4rs_mod::init().unwrap();

    let addrs = "127.0.0.1:8001";

    server_run(addrs.to_string(), |mut stream| {
        // 异步收取数据
        spawn(async move {
            // stream.write_str(format!("")).await;
            let mut stream = stream;
            for i in 0..50 {
                // let msg = stream.read_str().await.unwrap_or_default();
                // if msg.is_empty() {
                //     warn!("收到空数据");
                //     continue;
                // }
                // info!("server收到的内容: {msg}");
                tokio::time::sleep(Duration::from_millis(500)).await;
                if i != 0 && i % 10 == 0 {
                    if let Err(e) = stream.write_ext(format!("server send data - {i}").into_bytes()).await {
                        error!("stream.write_str: {e:?}");
                        break;
                    };
                }
            }
        });
    })
    .await?;

    Ok(())
}

#[tokio::test]
async fn test_client() -> IResult {

    // 初始化日志
    crate::log4rs_mod::init().unwrap();

    let addrs = "127.0.0.1:8001";

    let mut stream = client_run(addrs.to_string()).await?;

    // 异步收取数据
    spawn(async move {
        loop {
            let msg = match stream.read_ext().await {
                Ok(v) => v,
                Err(e) => {
                    error!("stream.read_str(): {e:?}");
                    tokio::time::sleep(Duration::from_secs(1)).await;
                    break;
                }
            };
            if msg.len() < 1 {
                warn!("数据长度为零");
                continue;
            }
            info!("client收到的内容: {}", String::from_utf8(msg).unwrap_or_default());
        }
    });

    tokio::signal::ctrl_c().await?;
    Ok(())
}