use std::sync::Arc;
use crate::model::descriptor::FlattenDataFlowDescriptor;
use crate::runtime::resources::convert;
use crate::runtime::{resources::DataStore, Job};
use crate::Result as ZFResult;
use async_std::task::JoinHandle;
use async_trait::async_trait;
use flume::{unbounded, Receiver, Sender};
use futures::stream::{AbortHandle, Abortable, Aborted};
use uhlc::HLC;
use uuid::Uuid;
use zenoh::prelude::ZenohId;
#[async_trait]
pub trait WorkerTrait: Send + Sync {
async fn run(&self) -> ZFResult<()>;
}
pub trait FnNewWorkerTrait: Send + Sync {
fn call(&self, id: usize, rx: Arc<Receiver<Job>>, hlc: Arc<HLC>) -> Box<dyn WorkerTrait>;
}
impl<F> FnNewWorkerTrait for F
where
F: FnOnce(usize, Arc<Receiver<Job>>, Arc<HLC>) -> Box<dyn WorkerTrait> + Clone + Send + Sync,
{
fn call(&self, id: usize, rx: Arc<Receiver<Job>>, hlc: Arc<HLC>) -> Box<dyn WorkerTrait> {
self.clone()(id, rx, hlc)
}
}
pub type FnNewWorker = Arc<dyn FnNewWorkerTrait>;
pub struct WorkerPool {
rtid: ZenohId,
pool_size: usize,
new_worker_fn: FnNewWorker,
workers: Vec<Box<dyn WorkerTrait>>,
handlers: Vec<JoinHandle<Result<ZFResult<()>, Aborted>>>,
handle: Option<JoinHandle<Result<ZFResult<()>, Aborted>>>,
abort_handle: Option<AbortHandle>,
abort_handlers: Vec<AbortHandle>,
tx: Sender<Job>,
rx: Arc<Receiver<Job>>,
session: DataStore,
hlc: Arc<HLC>,
}
unsafe impl Send for WorkerPool {}
unsafe impl Sync for WorkerPool {}
impl WorkerPool {
pub fn new(
pool_size: usize,
session: DataStore,
rtid: ZenohId,
hlc: Arc<HLC>,
new_worker_fn: FnNewWorker,
) -> Self {
let (tx, rx) = unbounded();
let rx = Arc::new(rx);
let mut workers = Vec::with_capacity(pool_size);
for i in 0..pool_size {
let new_fn_clone = new_worker_fn.clone();
let worker = new_fn_clone.call(i, rx.clone(), hlc.clone());
workers.push(worker);
}
Self {
rtid,
new_worker_fn,
pool_size,
workers,
handlers: Vec::with_capacity(pool_size),
abort_handlers: Vec::with_capacity(pool_size),
handle: None,
abort_handle: None,
tx,
rx,
session,
hlc,
}
}
pub fn start(&mut self) {
if self.handle.is_some() && self.abort_handle.is_some() {
log::warn!(
"[Job Queue: {}] Trying to start while it is already started, aborting",
self.rtid
);
return;
}
for worker in self.workers.drain(..) {
let worker_loop = async move { worker.run().await };
let (abort_handle, abort_registration) = AbortHandle::new_pair();
let handle = async_std::task::spawn(Abortable::new(worker_loop, abort_registration));
self.handlers.push(handle);
self.abort_handlers.push(abort_handle);
}
let c_tx = self.tx.clone();
let c_session = self.session.clone();
let c_id = self.rtid;
let run_loop = async move {
let j_stream = c_session.subscribe_sumbitted_jobs(&c_id).await?;
log::info!("[Job Queue {c_id:?} ] Started");
loop {
match j_stream.recv_async().await.map(convert::<Job>)? {
Ok(job) => {
log::info!("[Job Queue: {c_id:?}] Received Job {job:?}");
c_tx.send_async(job).await?;
}
Err(e) => {
log::warn!("[Job Queue: {c_id:?}] Error when receiving job {e:?}");
}
}
}
};
let (abort_handle, abort_registration) = AbortHandle::new_pair();
let handle = async_std::task::spawn(Abortable::new(run_loop, abort_registration));
self.handle = Some(handle);
self.abort_handle = Some(abort_handle);
}
pub async fn stop(&mut self) {
if let Some(abort_handle) = self.abort_handle.take() {
abort_handle.abort()
}
for ah in self.abort_handlers.drain(..) {
ah.abort()
}
if let Some(handle) = self.handle.take() {
log::trace!(
"[Job Queue: {:?}] handler finished with {:?}",
self.rtid,
handle.await
);
}
for wh in self.handlers.drain(..) {
log::trace!(
"[Job Queue: {:?} - Worker] handler finished with {:?}",
self.rtid,
wh.await
);
}
for i in 0..self.pool_size {
let new_fn_clone = self.new_worker_fn.clone();
let worker = new_fn_clone.call(i, self.rx.clone(), self.hlc.clone());
self.workers.push(worker);
}
}
pub async fn submit_instantiate(
&self,
dfd: &FlattenDataFlowDescriptor,
instance_id: &Uuid,
) -> ZFResult<Job> {
let jid = Uuid::new_v4();
let job = Job::new_instantiate(dfd.clone(), *instance_id, jid, self.hlc.new_timestamp());
self.session.add_submitted_job(&self.rtid, &job).await?;
Ok(job)
}
pub async fn submit_create(
&self,
dfd: &FlattenDataFlowDescriptor,
instance_id: &Uuid,
) -> ZFResult<Job> {
let jid = Uuid::new_v4();
let job = Job::new_create(dfd.clone(), *instance_id, jid, self.hlc.new_timestamp());
self.session.add_submitted_job(&self.rtid, &job).await?;
Ok(job)
}
pub async fn submit_teardown(&self, fid: &Uuid) -> ZFResult<Job> {
let jid = Uuid::new_v4();
let job = Job::new_teardown(*fid, jid, self.hlc.new_timestamp());
self.session.add_submitted_job(&self.rtid, &job).await?;
Ok(job)
}
pub async fn submit_delete(&self, fid: &Uuid) -> ZFResult<Job> {
let jid = Uuid::new_v4();
let job = Job::new_delete(*fid, jid, self.hlc.new_timestamp());
self.session.add_submitted_job(&self.rtid, &job).await?;
Ok(job)
}
pub async fn submit_start(&self, fid: &Uuid) -> ZFResult<Job> {
let jid = Uuid::new_v4();
let job = Job::new_start(*fid, jid, self.hlc.new_timestamp());
self.session.add_submitted_job(&self.rtid, &job).await?;
Ok(job)
}
pub async fn submit_stop(&self, fid: &Uuid) -> ZFResult<Job> {
let jid = Uuid::new_v4();
let job = Job::new_stop(*fid, jid, self.hlc.new_timestamp());
self.session.add_submitted_job(&self.rtid, &job).await?;
Ok(job)
}
pub async fn submit_start_node(&self, fid: &Uuid, node_id: &str) -> ZFResult<Job> {
let jid = Uuid::new_v4();
let job = Job::new_start_node(*fid, node_id.to_owned(), jid, self.hlc.new_timestamp());
self.session.add_submitted_job(&self.rtid, &job).await?;
Ok(job)
}
pub async fn submit_stop_node(&self, fid: &Uuid, node_id: &str) -> ZFResult<Job> {
let jid = Uuid::new_v4();
let job = Job::new_stop_node(*fid, node_id.to_owned(), jid, self.hlc.new_timestamp());
self.session.add_submitted_job(&self.rtid, &job).await?;
Ok(job)
}
}