nexus-acto-rs 0.4.2

A Rust crate for Actors
Documentation
use std::marker::PhantomData;
use std::sync::Arc;

use async_trait::async_trait;
use tokio::sync::Mutex;

use crate::util::element::Element;
use crate::util::queue::{QueueBase, QueueError, QueueReader, QueueSize, QueueWriter};

pub const PRIORITY_LEVELS: usize = 8;
pub const DEFAULT_PRIORITY: i8 = (PRIORITY_LEVELS / 2) as i8;

pub trait PriorityMessage: Element {
  fn get_priority(&self) -> Option<i8>;
}

#[derive(Debug, Clone)]
pub struct PriorityQueue<E, Q> {
  priority_queues: Arc<Mutex<Vec<Q>>>,
  phantom_data: PhantomData<E>,
}

impl<E: PriorityMessage, Q: Clone + QueueReader<E> + QueueWriter<E>> PriorityQueue<E, Q> {
  pub fn new(queue_producer: impl Fn() -> Q + 'static) -> Self {
    let mut queues = Vec::with_capacity(PRIORITY_LEVELS);
    for _ in 0..PRIORITY_LEVELS {
      let queue = queue_producer();
      queues.push(queue);
    }
    Self {
      priority_queues: Arc::new(Mutex::new(queues)),
      phantom_data: PhantomData,
    }
  }
}

#[async_trait]
impl<E: PriorityMessage, Q: QueueReader<E> + QueueWriter<E>> QueueBase<E> for PriorityQueue<E, Q> {
  async fn len(&self) -> QueueSize {
    let queues_mg = self.priority_queues.lock().await;
    let mut len = QueueSize::Limited(0);
    for queue in queues_mg.iter() {
      len = len + queue.len().await;
    }
    len
  }

  async fn capacity(&self) -> QueueSize {
    let queues_mg = self.priority_queues.lock().await;
    let mut capacity = QueueSize::Limited(0);
    for queue in queues_mg.iter() {
      capacity = capacity + queue.capacity().await;
    }
    capacity
  }
}

#[async_trait]
impl<E: PriorityMessage, Q: QueueReader<E> + QueueWriter<E>> QueueReader<E> for PriorityQueue<E, Q> {
  async fn poll(&mut self) -> Result<Option<E>, QueueError<E>> {
    for p in (0..PRIORITY_LEVELS).rev() {
      let mut priority_queues_mg = self.priority_queues.lock().await;
      if let Ok(Some(item)) = priority_queues_mg[p].poll().await {
        return Ok(Some(item));
      }
    }
    Ok(None)
  }

  async fn clean_up(&mut self) {
    let mut mg = self.priority_queues.lock().await;
    for queue in mg.iter_mut() {
      queue.clean_up().await;
    }
  }
}

#[async_trait]
impl<E: PriorityMessage, Q: QueueReader<E> + QueueWriter<E>> QueueWriter<E> for PriorityQueue<E, Q> {
  async fn offer(&mut self, element: E) -> Result<(), QueueError<E>> {
    let mut item_priority = DEFAULT_PRIORITY.clone();
    if let Some(priority) = element.get_priority() {
      item_priority = priority;
      if item_priority < 0 {
        item_priority = 0;
      }
      if item_priority >= PRIORITY_LEVELS as i8 - 1 {
        item_priority = PRIORITY_LEVELS as i8 - 1;
      }
    }
    let mut mg = self.priority_queues.lock().await;
    mg[item_priority as usize].offer(element).await
  }
}