mod transport;
pub use transport::{Transport, TransportError, TransportReadHalf, TransportWriteHalf};
use crate::{
constants::CLIENT_BROADCAST_CHANNEL_CAPACITY,
data::{Request, Response},
utils::Session,
};
use log::*;
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use tokio::{
io,
sync::{broadcast, oneshot},
};
use tokio_stream::wrappers::BroadcastStream;
type Callbacks = Arc<Mutex<HashMap<usize, oneshot::Sender<Response>>>>;
pub struct Client {
t_write: TransportWriteHalf,
callbacks: Callbacks,
broadcast: broadcast::Sender<Response>,
init_broadcast_receiver: Option<broadcast::Receiver<Response>>,
}
impl Client {
pub async fn connect(session: Session) -> io::Result<Self> {
let transport = Transport::connect(session).await?;
debug!(
"Client has connected to {}",
transport
.peer_addr()
.map(|x| x.to_string())
.unwrap_or_else(|_| String::from("???"))
);
let (mut t_read, t_write) = transport.into_split();
let callbacks: Callbacks = Arc::new(Mutex::new(HashMap::new()));
let (broadcast, init_broadcast_receiver) =
broadcast::channel(CLIENT_BROADCAST_CHANNEL_CAPACITY);
let callbacks_2 = Arc::clone(&callbacks);
let broadcast_2 = broadcast.clone();
tokio::spawn(async move {
loop {
match t_read.receive::<Response>().await {
Ok(Some(res)) => {
trace!("Client got response: {:?}", res);
let maybe_callback = res
.origin_id
.as_ref()
.and_then(|id| callbacks_2.lock().unwrap().remove(id));
if let Some(tx) = maybe_callback {
trace!("Client has callback! Triggering!");
if let Err(res) = tx.send(res) {
error!("Failed to trigger callback for response {}", res.id);
}
} else {
trace!("Client does not have callback! Broadcasting!");
if let Err(x) = broadcast_2.send(res) {
error!("Failed to trigger broadcast: {}", x);
}
}
}
Ok(None) => break,
Err(x) => {
error!("{}", x);
break;
}
}
}
});
Ok(Self {
t_write,
callbacks,
broadcast,
init_broadcast_receiver: Some(init_broadcast_receiver),
})
}
pub async fn send(&mut self, req: Request) -> Result<Response, TransportError> {
let (tx, rx) = oneshot::channel();
self.callbacks.lock().unwrap().insert(req.id, tx);
self.t_write.send(req).await?;
rx.await
.map_err(|x| TransportError::from(io::Error::new(io::ErrorKind::ConnectionAborted, x)))
}
pub fn to_response_stream(&mut self) -> BroadcastStream<Response> {
BroadcastStream::new(
self.init_broadcast_receiver
.take()
.unwrap_or_else(|| self.broadcast.subscribe()),
)
}
}