nexus-acto-rs 0.4.2

A Rust crate for Actors
Documentation
#[cfg(test)]
mod tests {
  use crate::actor::message::message::Message;
  use crate::util::element::Element;
  use crate::util::queue::mpsc_unbounded_channel_queue::MpscUnboundedChannelQueue;
  use crate::util::queue::priority_queue::{PriorityMessage, PriorityQueue};
  use crate::util::queue::{QueueBase, QueueReader, QueueSize, QueueWriter};
  use std::any::Any;
  use std::fmt::Debug;
  use std::sync::Arc;

  #[derive(Debug, Clone, PartialEq, Eq)]
  struct TestPriorityMessage {
    message: String,
    priority: i8,
  }

  impl TestPriorityMessage {
    fn new(message: String, priority: i8) -> Self {
      Self { message, priority }
    }
  }

  impl Element for TestPriorityMessage {}

  impl PriorityMessage for TestPriorityMessage {
    fn get_priority(&self) -> Option<i8> {
      Some(self.priority)
    }
  }

  impl Message for TestPriorityMessage {
    fn eq_message(&self, other: &dyn Message) -> bool {
      if let Some(other) = other.as_any().downcast_ref::<Self>() {
        self.message == other.message
      } else {
        false
      }
    }

    fn as_any(&self) -> &(dyn Any + Send + Sync + 'static) {
      self
    }
  }

  impl TestMessageBase for TestPriorityMessage {
    fn get_message(&self) -> String {
      self.message.clone()
    }
  }

  #[derive(Debug, Clone)]
  struct TestMessage {
    message: String,
  }

  impl Element for TestMessage {}

  impl Message for TestMessage {
    fn eq_message(&self, other: &dyn Message) -> bool {
      if let Some(other) = other.as_any().downcast_ref::<Self>() {
        self.message == other.message
      } else {
        false
      }
    }

    fn as_any(&self) -> &(dyn Any + Send + Sync + 'static) {
      self
    }
  }

  impl TestMessageBase for TestMessage {
    fn get_message(&self) -> String {
      self.message.clone()
    }
  }

  impl PriorityMessage for TestMessage {
    fn get_priority(&self) -> Option<i8> {
      None
    }
  }

  impl TestMessage {
    fn new(message: String) -> Self {
      Self { message }
    }
  }

  trait TestMessageBase: PriorityMessage {
    fn get_message(&self) -> String;
  }

  #[derive(Debug, Clone)]
  struct TestMessageBaseHandle(Arc<dyn TestMessageBase>);

  impl TestMessageBaseHandle {
    fn new(msg: impl TestMessageBase) -> Self {
      TestMessageBaseHandle(Arc::new(msg))
    }
  }

  impl PriorityMessage for TestMessageBaseHandle {
    fn get_priority(&self) -> Option<i8> {
      self.0.get_priority()
    }
  }

  impl Element for TestMessageBaseHandle {}

  impl TestMessageBase for TestMessageBaseHandle {
    fn get_message(&self) -> String {
      self.0.get_message()
    }
  }

  async fn new_priority_ring_queue<M>() -> PriorityQueue<M, MpscUnboundedChannelQueue<M>>
  where
    M: TestMessageBase + Clone, {
    let queue = PriorityQueue::new(|| MpscUnboundedChannelQueue::new());
    assert_eq!(queue.len().await, QueueSize::Limited(0));
    assert_eq!(queue.capacity().await, QueueSize::Limitless);
    queue
  }

  async fn new_priority_mspc_queue<M>() -> PriorityQueue<M, MpscUnboundedChannelQueue<M>>
  where
    M: TestMessageBase + Clone, {
    let queue = PriorityQueue::new(|| MpscUnboundedChannelQueue::new());
    assert_eq!(queue.len().await, QueueSize::Limited(0));
    assert_eq!(queue.capacity().await, QueueSize::Limited(0));
    queue
  }

  #[tokio::test]
  async fn test_push_pop_ring() {
    let mut q = new_priority_ring_queue().await;
    let msg = TestPriorityMessage::new("hello".to_string(), 0);
    q.offer(msg.clone()).await.unwrap();
    let result = q.poll().await.unwrap();
    assert_eq!(result, Some(msg));
  }

  #[tokio::test]
  async fn test_push_pop_ring_2() {
    let mut q: PriorityQueue<TestMessageBaseHandle, MpscUnboundedChannelQueue<TestMessageBaseHandle>> =
      new_priority_ring_queue().await;

    for _ in 0..2 {
      let msg = TestMessageBaseHandle::new(TestPriorityMessage::new("7 hello".to_string(), 7));
      q.offer(msg.clone()).await.unwrap();
    }

    for _ in 0..2 {
      let msg = TestMessageBaseHandle::new(TestPriorityMessage::new("5 hello".to_string(), 5));
      q.offer(msg.clone()).await.unwrap();
    }

    for _ in 0..2 {
      let msg = TestMessageBaseHandle::new(TestPriorityMessage::new("0 hello".to_string(), 0));
      q.offer(msg.clone()).await.unwrap();
    }

    for _ in 0..2 {
      let msg = TestMessageBaseHandle::new(TestPriorityMessage::new("6 hello".to_string(), 6));
      q.offer(msg.clone()).await.unwrap();
    }

    for _ in 0..2 {
      let msg = TestMessageBaseHandle::new(TestMessage::new("hello".to_string()));
      q.offer(msg.clone()).await.unwrap();
    }

    for _ in 0..2 {
      let result = q.poll().await.unwrap();
      assert_eq!(result.unwrap().get_message(), "7 hello");
    }

    for _ in 0..2 {
      let result = q.poll().await.unwrap();
      assert_eq!(result.unwrap().get_message(), "6 hello");
    }

    for _ in 0..2 {
      let result = q.poll().await.unwrap();
      assert_eq!(result.unwrap().get_message(), "5 hello");
    }

    for _ in 0..2 {
      let result = q.poll().await.unwrap();
      assert_eq!(result.unwrap().get_message(), "hello");
    }

    for _ in 0..2 {
      let result = q.poll().await.unwrap();
      assert_eq!(result.unwrap().get_message(), "0 hello");
    }
  }
}