use std::sync::atomic::AtomicU16;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use tokio::sync::Notify;
use crate::factory::queues::Priority;
use crate::factory::queues::PriorityManager;
use crate::factory::queues::StandardPriority;
use crate::factory::*;
use crate::Actor;
use crate::ActorProcessingErr;
use crate::ActorRef;
type TestKey = StandardPriority;
#[derive(Debug)]
enum TestMessage {
#[allow(dead_code)]
Count(u16),
}
#[cfg(feature = "cluster")]
impl crate::Message for TestMessage {}
struct TestWorker {
counters: [Arc<AtomicU16>; 5],
signal: Arc<Notify>,
}
struct TestPriorityManager;
impl PriorityManager<StandardPriority, StandardPriority> for TestPriorityManager {
fn get_priority(&self, job: &StandardPriority) -> Option<StandardPriority> {
Some(*job)
}
fn is_discardable(&self, _job: &StandardPriority) -> bool {
true
}
}
#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Actor for TestWorker {
type Msg = WorkerMessage<TestKey, TestMessage>;
type State = (Self::Arguments, u16);
type Arguments = WorkerStartContext<TestKey, TestMessage, ()>;
async fn pre_start(
&self,
_myself: ActorRef<Self::Msg>,
startup_context: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
Ok((startup_context, 0))
}
async fn handle(
&self,
_myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
WorkerMessage::FactoryPing(time) => {
state
.0
.factory
.cast(FactoryMessage::WorkerPong(state.0.wid, time.elapsed()))?;
}
WorkerMessage::Dispatch(Job {
key,
msg: TestMessage::Count(count),
..
}) => {
self.counters[key.get_index()].fetch_add(count, Ordering::Relaxed);
state
.0
.factory
.cast(FactoryMessage::Finished(state.0.wid, key))?;
state.1 += 1;
if state.1 == 5 {
self.signal.notify_one();
self.signal.notified().await;
state.1 = 0;
}
}
}
Ok(())
}
}
struct TestWorkerBuilder {
counters: [Arc<AtomicU16>; 5],
signal: Arc<Notify>,
}
impl WorkerBuilder<TestWorker, ()> for TestWorkerBuilder {
fn build(&mut self, _wid: usize) -> (TestWorker, ()) {
(
TestWorker {
counters: self.counters.clone(),
signal: self.signal.clone(),
},
(),
)
}
}
#[crate::concurrency::test]
#[cfg_attr(
not(all(target_arch = "wasm32", target_os = "unknown")),
tracing_test::traced_test
)]
async fn test_basic_priority_queueing() {
let counters = [
Arc::new(AtomicU16::new(0)),
Arc::new(AtomicU16::new(0)),
Arc::new(AtomicU16::new(0)),
Arc::new(AtomicU16::new(0)),
Arc::new(AtomicU16::new(0)),
];
let signal = Arc::new(Notify::new());
let factory_definition = Factory::<
TestKey,
TestMessage,
(),
TestWorker,
routing::QueuerRouting<TestKey, TestMessage>,
queues::PriorityQueue<
TestKey,
TestMessage,
StandardPriority,
TestPriorityManager,
{ StandardPriority::size() },
>,
>::default();
let args = FactoryArguments::builder()
.queue(queues::PriorityQueue::new(TestPriorityManager))
.router(Default::default())
.worker_builder(Box::new(TestWorkerBuilder {
counters: counters.clone(),
signal: signal.clone(),
}))
.build();
let (factory, factory_handle) = Actor::spawn(None, factory_definition, args)
.await
.expect("Failed to spawn factory");
let pri = StandardPriority::Highest;
for _i in 0..5 {
factory
.cast(FactoryMessage::Dispatch(Job {
key: pri,
msg: TestMessage::Count(1),
options: JobOptions::default(),
accepted: None,
}))
.expect("Failed to send to factory");
}
let pri = StandardPriority::BestEffort;
for _i in 0..5 {
factory
.cast(FactoryMessage::Dispatch(Job {
key: pri,
msg: TestMessage::Count(1),
options: JobOptions::default(),
accepted: None,
}))
.expect("Failed to send to factory");
}
signal.notified().await;
let hpc = counters[0].load(Ordering::Relaxed);
let lpc = counters[4].load(Ordering::Relaxed);
assert_eq!(hpc, 5);
assert_eq!(lpc, 0);
signal.notify_one();
signal.notified().await;
signal.notify_one();
let hpc = counters[0].load(Ordering::Relaxed);
let lpc = counters[4].load(Ordering::Relaxed);
assert_eq!(hpc, 5);
assert_eq!(lpc, 5);
factory.stop(None);
factory_handle.await.unwrap();
}