use std::{
collections::HashMap,
sync::{Arc, Mutex, MutexGuard, RwLock},
};
use bytes::Bytes;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use crate::{
client::{JobStatus, WorkUpdate, WorkerJob},
conn::ServerHandle,
};
pub type JobCreated = ServerHandle;
#[derive(Debug)]
pub struct ClientReceivers {
pub echo_rx: Receiver<Bytes>,
pub job_created_rx: Receiver<JobCreated>,
pub status_res_rx: Receiver<JobStatus>,
pub error_rx: Receiver<(Bytes, Bytes)>,
pub worker_job_rx: Receiver<WorkerJob>,
}
#[derive(Debug)]
struct ClientSenders {
pub senders_by_handle: HashMap<ServerHandle, Sender<WorkUpdate>>,
pub jobs_tx_by_func: HashMap<Vec<u8>, Sender<WorkerJob>>,
pub echo_tx: Sender<Bytes>,
pub job_created_tx: Sender<JobCreated>,
pub status_res_tx: Sender<JobStatus>,
pub error_tx: Sender<(Bytes, Bytes)>,
pub worker_job_tx: Sender<WorkerJob>,
}
impl ClientSenders {
pub fn new(
echo_tx: Sender<Bytes>,
job_created_tx: Sender<JobCreated>,
status_res_tx: Sender<JobStatus>,
error_tx: Sender<(Bytes, Bytes)>,
worker_job_tx: Sender<WorkerJob>,
) -> ClientSenders {
ClientSenders {
senders_by_handle: HashMap::new(),
jobs_tx_by_func: HashMap::new(),
echo_tx: echo_tx,
job_created_tx: job_created_tx,
status_res_tx: status_res_tx,
error_tx: error_tx,
worker_job_tx: worker_job_tx,
}
}
}
#[derive(Debug)]
pub struct ClientData {
receivers: Arc<Mutex<ClientReceivers>>,
senders: Arc<RwLock<ClientSenders>>,
}
impl Clone for ClientData {
fn clone(&self) -> Self {
Self {
receivers: self.receivers.clone(),
senders: self.senders.clone(),
}
}
}
impl ClientReceivers {
pub fn new(
echo_rx: Receiver<Bytes>,
job_created_rx: Receiver<JobCreated>,
status_res_rx: Receiver<JobStatus>,
error_rx: Receiver<(Bytes, Bytes)>,
worker_job_rx: Receiver<WorkerJob>,
) -> ClientReceivers {
ClientReceivers {
echo_rx: echo_rx,
job_created_rx: job_created_rx,
status_res_rx: status_res_rx,
error_rx: error_rx,
worker_job_rx: worker_job_rx,
}
}
}
impl ClientData {
pub fn new() -> ClientData {
const CLIENT_CHANNEL_BOUND_SIZE: usize = 100;
let (echo_tx, echo_rx) = channel(CLIENT_CHANNEL_BOUND_SIZE);
let (job_created_tx, job_created_rx) = channel(CLIENT_CHANNEL_BOUND_SIZE);
let (status_res_tx, status_res_rx) = channel(CLIENT_CHANNEL_BOUND_SIZE);
let (error_tx, error_rx) = channel(CLIENT_CHANNEL_BOUND_SIZE);
let (worker_job_tx, worker_job_rx) = channel(CLIENT_CHANNEL_BOUND_SIZE);
ClientData {
receivers: Arc::new(Mutex::new(ClientReceivers::new(
echo_rx,
job_created_rx,
status_res_rx,
error_rx,
worker_job_rx,
))),
senders: Arc::new(RwLock::new(ClientSenders::new(
echo_tx,
job_created_tx,
status_res_tx,
error_tx,
worker_job_tx,
))),
}
}
pub fn receivers(&self) -> MutexGuard<ClientReceivers> {
trace!("Locking receivers");
self.receivers
.lock()
.expect("Threads should not panic while holding lock.")
}
pub fn echo_tx(&self) -> Sender<Bytes> {
self.senders
.read()
.expect("Threads should not panic while holding lock.")
.echo_tx
.clone()
}
pub fn job_created_tx(&self) -> Sender<JobCreated> {
self.senders
.read()
.expect("Threads should not panic while holding lock.")
.job_created_tx
.clone()
}
pub fn error_tx(&self) -> Sender<(Bytes, Bytes)> {
self.senders
.read()
.expect("Threads should not panic while holding lock.")
.error_tx
.clone()
}
pub fn status_res_tx(&self) -> Sender<JobStatus> {
self.senders
.read()
.expect("Threads should not panic while holding lock.")
.status_res_tx
.clone()
}
pub fn worker_job_tx(&self) -> Sender<WorkerJob> {
self.senders
.read()
.expect("Threads should not panic while holding lock.")
.worker_job_tx
.clone()
}
pub fn get_sender_by_handle(&self, handle: &ServerHandle) -> Option<Sender<WorkUpdate>> {
match self
.senders
.read()
.expect("Threads should not panic while holding lock.")
.senders_by_handle
.get(handle)
{
None => None,
Some(sender) => Some(sender.clone()),
}
}
pub fn set_sender_by_handle(&mut self, handle: ServerHandle, tx: Sender<WorkUpdate>) {
self.senders
.write()
.expect("Threads should not panic while holding lock.")
.senders_by_handle
.insert(handle, tx);
}
pub fn get_jobs_tx_by_func(&self, func: &Vec<u8>) -> Option<Sender<WorkerJob>> {
match self
.senders
.read()
.expect("Threads should not panic while holding lock.")
.jobs_tx_by_func
.get(func)
{
None => None,
Some(tx) => Some(tx.clone()),
}
}
pub fn set_jobs_tx_by_func(&mut self, func: Vec<u8>, tx: Sender<WorkerJob>) {
self.senders
.write()
.expect("Threads should not panic while holding lock.")
.jobs_tx_by_func
.insert(func, tx);
}
}