common_uu 1.9.4

公共工具库
Documentation
/* 
tcp长连接模块, 以及发送和接收数据
1: 通过channel发送数据, 通过fn回调接收数据 (完成)
2: 双向接收与发送数据 (完成)
3: 心跳 (完成)
4: 粘包问题 (完成)
5: 断线重连
6: 定义通讯协议: 可选是否有响应数据
7: 定义通讯协议: 保持响应数据顺序
7: 序列化与反序列化
 */

use crate::IResult;
use futures_util::sink::Flush;
use futures_util::{SinkExt, StreamExt};
// use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc::{Receiver};
use tokio::time::Duration;
use tokio_util::codec::{Framed, LinesCodec};

pub trait Req {
    fn body(&self) -> String;
    fn is_response(&self) -> bool{
        false
    }
    fn response(&mut self, res: String);
}

impl Req for String {
    fn body(&self) -> String {
        self.clone()
    }
    fn response(&mut self, res: String) {
        info!("response: {}", res);
    }
}

async fn handle_client<Call: Fn(String) -> Option<String>, R: Req>(
    stream: TcpStream,
    mut req_rev: Receiver<R>,
    res_call: Call,
) -> IResult {
    let mut interval = tokio::time::interval(Duration::from_secs(30));

    // 数据按桢包装, 解决粘包问题
    let mut framed = Framed::new(stream, LinesCodec::new());
    
    loop {
        tokio::select! {
            // 从channel接收数据, 发送消息
            Some(mut req) = req_rev.recv() => {
                interval.reset();
                let body = req.body();
                framed.send(&body).await?;
                if req.is_response(){
                    if let Some(msg) = framed.next().await{
                        req.response(msg?);
                    }
                }
                let r: Flush<_, String> = framed.flush();
                r.await?;
            }
            // 接收消息
            Some(result) = framed.next() => {
                interval.reset();
                match result {
                    Ok(n) if n == "" => {
                        debug!("read tcp ping");
                    }
                    Ok(msg) => {
                        if let Some(r) = res_call(msg){
                            framed.send(&r).await?;
                        }
                        let r: Flush<_, String> = framed.flush();
                        r.await?;
                    }
                    Err(e) => {
                        return Err(e.to_string())?;
                    }
                }
            }
            // 心跳
            _ = interval.tick() => {
                // ping
                framed.send("").await?;
                let r: Flush<_, String> = framed.flush();
                r.await?;
            }
        }
    }
}

async fn server() {
    let addr = "127.0.0.1:8080";
    let listener = TcpListener::bind(addr).await.unwrap();

    info!("Server running on {}", addr);

    loop {
        let (stream, addr) = listener.accept().await.unwrap();

        let (sender, recer) = tokio::sync::mpsc::channel(500);

        tokio::spawn(async move {
            let mut interval = tokio::time::interval(Duration::from_secs(5));
            loop {
                interval.tick().await;
                let req = String::from("server hello");
                sender.send(req).await.unwrap();
            }
        });

        tokio::spawn(async move {
            info!("new client: {}", addr);
            if let Err(e) = handle_client(stream, recer, |res| {
                info!("res: {}", res);
                // res.push_str(" server res data");
                None
            })
            .await
            {
                error!("server handle_client error: {}", e);
            }
        });
    }
}

async fn client() {
    let addr = "127.0.0.1:8080";

    let (sender, recer) = tokio::sync::mpsc::channel(500);

    tokio::spawn(async move {
        let mut interval = tokio::time::interval(Duration::from_secs(7));
        loop {
            interval.tick().await;
            let req = String::from("client hello");
            sender.send(req).await.unwrap();
        }
    });

    let stream = TcpStream::connect(addr).await.unwrap();

    info!("client running on {}", addr);

    tokio::spawn(async move {
        if let Err(e) = handle_client(stream, recer, |res| {
            info!("res: {}", res);
            Some(res)
        })
        .await
        {
            error!("client handle_client error: {}", e);
        }
    });
}


#[tokio::test]
async fn test() -> std::io::Result<()> {
    crate::log4rs_mod::init().unwrap();
    let server_handle = tokio::spawn(async { server().await });
    let client_handle1 = tokio::spawn(async { client().await });
    let client_handle2 = tokio::spawn(async { client().await });
    tokio::try_join!(server_handle, client_handle1, client_handle2)?;
    Ok(())
}