effectum 0.7.0

An embeddable task queue based on SQLite
Documentation
use std::sync::Arc;

use ahash::HashMap;
use tokio::sync::{watch, Notify};

use crate::SmartString;
use crate::{Error, Result};

pub(crate) struct ListeningWorker {
    pub id: u64,
    pub notify_task_ready: Notify,
    pub job_types: Vec<SmartString>,
}

pub(crate) struct Workers {
    next_id: u64,
    workers: HashMap<u64, Arc<ListeningWorker>>,
    workers_by_type: HashMap<SmartString, Vec<Arc<ListeningWorker>>>,
    worker_count_tx: watch::Sender<usize>,
}

impl Workers {
    pub fn new(worker_count_tx: watch::Sender<usize>) -> Self {
        Workers {
            next_id: 0,
            workers: HashMap::default(),
            workers_by_type: HashMap::default(),
            worker_count_tx,
        }
    }

    /// Add a new worker, ready to accept jobs.
    pub(crate) fn add_worker(&mut self, job_types: &[SmartString]) -> Arc<ListeningWorker> {
        let worker_id = self.next_id;
        self.next_id += 1;

        let worker = Arc::new(ListeningWorker {
            id: worker_id,
            notify_task_ready: Notify::new(),
            job_types: job_types.to_vec(),
        });

        for job in job_types {
            self.workers_by_type
                .entry(job.clone())
                .or_default()
                .push(worker.clone());
        }

        self.workers.insert(worker.id, worker.clone());
        self.worker_count_tx.send_replace(self.workers.len());

        worker
    }

    pub(crate) fn remove_worker(&mut self, worker_id: u64) -> Result<()> {
        let worker = self
            .workers
            .remove(&worker_id)
            .ok_or(Error::WorkerNotFound(worker_id))?;

        for job in &worker.job_types {
            let type_workers = self.workers_by_type.get_mut(job);

            if let Some(type_workers) = type_workers {
                type_workers.retain(|w| !Arc::ptr_eq(w, &worker));
            }
        }

        self.worker_count_tx.send_replace(self.workers.len());

        Ok(())
    }

    pub(crate) fn new_job_available(&self, job_type: &str) {
        let workers = self.workers_by_type.get(job_type);
        if let Some(workers) = workers {
            for worker in workers {
                worker.notify_task_ready.notify_one();
            }
        }
    }
}

#[cfg(test)]
mod tests {
    #[test]
    #[ignore]
    fn add_worker() {
        todo!();
    }

    #[test]
    #[ignore]
    fn remove_worker() {
        todo!();
    }

    #[test]
    #[ignore]
    fn new_job_available() {
        todo!();
    }
}