use super::LinkTrait;
use super::TcpClient;
use core::fmt;
use core::future::{pending, Future};
use core::pin::Pin;
use futures_util::future::Either;
use futures_util::future::{join_all, select, select_all};
use std::collections::HashMap;
use std::io;
use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
use tokio::net::TcpListener;
pub struct TcpServer {
addr: SocketAddr,
server: TcpListener,
sockets: HashMap<SocketAddr, TcpClient>,
}
impl TcpServer {
pub async fn new(addr: &str, port: u32) -> io::Result<TcpServer> {
let addr: IpAddr = addr.parse().unwrap();
let addr: SocketAddr = (addr, port as u16).into();
let server = TcpListener::bind(&addr).await?;
Ok(TcpServer {
addr,
server,
sockets: HashMap::new(),
})
}
}
#[async_trait::async_trait]
impl LinkTrait for TcpServer {
async fn send(&mut self, msg: &[u8]) -> io::Result<()> {
let send = self
.sockets
.iter_mut()
.map(|(_, socket)| Box::pin(socket.send(msg)));
join_all(send).await;
Ok(())
}
async fn recv(&mut self) -> io::Result<Arc<Vec<u8>>> {
loop {
let accept = Box::pin(self.server.accept());
let recv: Vec<_> = self
.sockets
.iter_mut()
.map(|(_, socket)| Box::pin(socket.recv()))
.collect();
let recv: Pin<Box<dyn Future<Output = _> + Send>> = if !recv.is_empty() {
Box::pin(select_all(recv))
} else {
drop(recv);
Box::pin(pending())
};
let data = match select(recv, accept).await {
Either::Left(((recv, _, _), _)) => Either::Left(recv),
Either::Right((accept, _)) => Either::Right(accept),
};
match data {
Either::Left(recv) => {
if let Ok(data) = recv {
log::info!("{} recv: {:02X?}", self, data);
return Ok(data);
}
}
Either::Right(accept) => {
let (socket, addr) = accept?;
if self.sockets.insert(addr, socket.into()).is_none() {
log::info!("{} accept: {}", self, addr);
}
}
}
}
}
}
impl fmt::Display for TcpServer {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_fmt(format_args!("TcpServer({:?})", self.addr))
}
}