use crate::common::*;
use crate::link::LinkTrait;
use crate::link::*;
use futures_util::future::{select, Either};
use futures_util::StreamExt;
use tokio::task;
pub struct Starter {
recv: &'static Publisher,
send: &'static Publisher,
}
impl Starter {
pub fn new(send: &'static Publisher, recv: &'static Publisher) -> Self {
Self { recv, send }
}
pub async fn start(&self, link: Link) {
let mut link: Box<dyn LinkTrait + Send> = match link.r#type {
LinkType::TcpClient => Box::new(TcpClient::new(&link.addr, link.port).await.unwrap()),
LinkType::TcpServer => Box::new(TcpServer::new(&link.addr, link.port).await.unwrap()),
LinkType::UdpClient => Box::new(UdpClient::new(&link.addr, link.port).await.unwrap()),
LinkType::UdpServer => Box::new(UdpServer::new(&link.addr, link.port).await.unwrap()),
LinkType::Serial => Box::new(Serial::new(&link.addr, link.port).await.unwrap()),
};
let subscriber = self.recv.subscribe().unwrap();
let publisher = self.send;
let _ = task::spawn(async move {
let mut sub = subscriber.recv();
loop {
let recv_socket = Box::pin(link.recv());
let recv_sub = sub.next();
let data = match select(recv_socket, recv_sub).await {
Either::Left((a, _)) => Either::Left(a),
Either::Right((b, _)) => Either::Right(b),
};
match data {
Either::Left(socket) => {
if let Ok(data) = socket {
publisher.send(data);
}
}
Either::Right(sub) => {
if let Some(data) = sub {
let _err = link.send(&data).await;
}
}
}
}
});
}
}