use crate::IResult;
use futures_util::sink::Flush;
use futures_util::{SinkExt, StreamExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc::{Receiver};
use tokio::time::Duration;
use tokio_util::codec::{Framed, LinesCodec};
pub trait Req {
fn body(&self) -> String;
fn is_response(&self) -> bool{
false
}
fn response(&mut self, res: String);
}
impl Req for String {
fn body(&self) -> String {
self.clone()
}
fn response(&mut self, res: String) {
info!("response: {}", res);
}
}
async fn handle_client<Call: Fn(String) -> Option<String>, R: Req>(
stream: TcpStream,
mut req_rev: Receiver<R>,
res_call: Call,
) -> IResult {
let mut interval = tokio::time::interval(Duration::from_secs(30));
let mut framed = Framed::new(stream, LinesCodec::new());
loop {
tokio::select! {
Some(mut req) = req_rev.recv() => {
interval.reset();
let body = req.body();
framed.send(&body).await?;
if req.is_response(){
if let Some(msg) = framed.next().await{
req.response(msg?);
}
}
let r: Flush<_, String> = framed.flush();
r.await?;
}
Some(result) = framed.next() => {
interval.reset();
match result {
Ok(n) if n == "" => {
debug!("read tcp ping");
}
Ok(msg) => {
if let Some(r) = res_call(msg){
framed.send(&r).await?;
}
let r: Flush<_, String> = framed.flush();
r.await?;
}
Err(e) => {
return Err(e.to_string())?;
}
}
}
_ = interval.tick() => {
framed.send("").await?;
let r: Flush<_, String> = framed.flush();
r.await?;
}
}
}
}
async fn server() {
let addr = "127.0.0.1:8080";
let listener = TcpListener::bind(addr).await.unwrap();
info!("Server running on {}", addr);
loop {
let (stream, addr) = listener.accept().await.unwrap();
let (sender, recer) = tokio::sync::mpsc::channel(500);
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(5));
loop {
interval.tick().await;
let req = String::from("server hello");
sender.send(req).await.unwrap();
}
});
tokio::spawn(async move {
info!("new client: {}", addr);
if let Err(e) = handle_client(stream, recer, |res| {
info!("res: {}", res);
None
})
.await
{
error!("server handle_client error: {}", e);
}
});
}
}
async fn client() {
let addr = "127.0.0.1:8080";
let (sender, recer) = tokio::sync::mpsc::channel(500);
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(7));
loop {
interval.tick().await;
let req = String::from("client hello");
sender.send(req).await.unwrap();
}
});
let stream = TcpStream::connect(addr).await.unwrap();
info!("client running on {}", addr);
tokio::spawn(async move {
if let Err(e) = handle_client(stream, recer, |res| {
info!("res: {}", res);
Some(res)
})
.await
{
error!("client handle_client error: {}", e);
}
});
}
#[tokio::test]
async fn test() -> std::io::Result<()> {
crate::log4rs_mod::init().unwrap();
let server_handle = tokio::spawn(async { server().await });
let client_handle1 = tokio::spawn(async { client().await });
let client_handle2 = tokio::spawn(async { client().await });
tokio::try_join!(server_handle, client_handle1, client_handle2)?;
Ok(())
}