use std::sync::atomic::AtomicU8;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use crate::concurrency::sleep;
use crate::factory::*;
use crate::Actor;
use crate::ActorProcessingErr;
use crate::ActorRef;
struct TestWorker;
#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Worker for TestWorker {
type State = ();
type Key = ();
type Message = ();
type Arguments = ();
async fn pre_start(
&self,
_wid: WorkerId,
_factory: &ActorRef<FactoryMessage<Self::Key, Self::Message>>,
_args: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
tracing::debug!("Worker started");
Ok(())
}
async fn handle(
&self,
_wid: WorkerId,
_factory: &ActorRef<FactoryMessage<Self::Key, Self::Message>>,
_job: Job<Self::Key, Self::Message>,
_state: &mut Self::State,
) -> Result<Self::Key, ActorProcessingErr> {
tracing::debug!("Worker received dispatch");
sleep(Duration::from_millis(10)).await;
Ok(())
}
}
struct TestWorkerBuilder;
impl WorkerBuilder<TestWorker, ()> for TestWorkerBuilder {
fn build(&mut self, _wid: crate::factory::WorkerId) -> (TestWorker, ()) {
(TestWorker, ())
}
}
#[crate::concurrency::test]
#[cfg_attr(
not(all(target_arch = "wasm32", target_os = "unknown")),
tracing_test::traced_test
)]
async fn test_dynamic_settings() {
let counter_one = Arc::new(AtomicU8::new(0));
let counter_two = Arc::new(AtomicU8::new(0));
struct TestDiscardHandler {
counter: Arc<AtomicU8>,
}
impl DiscardHandler<(), ()> for TestDiscardHandler {
fn discard(&self, _reason: DiscardReason, _job: &mut Job<(), ()>) {
self.counter.fetch_add(1, Ordering::SeqCst);
}
}
let factory_definition = Factory::<
(),
(),
(),
TestWorker,
routing::QueuerRouting<(), ()>,
queues::DefaultQueue<(), ()>,
>::default();
let args = FactoryArguments::builder()
.num_initial_workers(1)
.queue(Default::default())
.router(Default::default())
.worker_builder(Box::new(TestWorkerBuilder))
.discard_handler(Arc::new(TestDiscardHandler {
counter: counter_one.clone(),
}))
.discard_settings(DiscardSettings::Static {
limit: 0,
mode: DiscardMode::Newest,
})
.build();
let (factory, factory_handle) = Actor::spawn(None, factory_definition, args)
.await
.expect("Failed to spawn factory");
let worker_count = factory
.call(
FactoryMessage::GetAvailableCapacity,
Some(Duration::from_secs(1)),
)
.await
.expect("Failed to message factory")
.expect("Failed to get reply from factory");
assert_eq!(1, worker_count);
for _ in 0..2 {
factory
.cast(FactoryMessage::Dispatch(Job {
accepted: None,
key: (),
msg: (),
options: JobOptions::default(),
}))
.expect("Failed to message factory");
}
_ = factory
.call(
FactoryMessage::GetAvailableCapacity,
Some(Duration::from_secs(1)),
)
.await
.expect("Failed to message factory")
.expect("Failed to get reply from factory");
sleep(Duration::from_millis(100)).await;
factory
.cast(FactoryMessage::UpdateSettings(
UpdateSettingsRequest::builder()
.worker_count(2)
.discard_handler(Some(Arc::new(TestDiscardHandler {
counter: counter_two.clone(),
})))
.capacity_controller(None)
.dead_mans_switch(None)
.discard_settings(DiscardSettings::Static {
limit: 0,
mode: DiscardMode::Newest,
})
.lifecycle_hooks(None)
.stats(None)
.build(),
))
.expect("Failed to send request to update factory");
let worker_count = factory
.call(
FactoryMessage::GetAvailableCapacity,
Some(Duration::from_secs(1)),
)
.await
.expect("Failed to message factory")
.expect("Failed to get reply from factory");
assert_eq!(2, worker_count);
for _ in 0..3 {
factory
.cast(FactoryMessage::Dispatch(Job {
accepted: None,
key: (),
msg: (),
options: JobOptions::default(),
}))
.expect("Failed to message factory");
}
_ = factory
.call(
FactoryMessage::GetAvailableCapacity,
Some(Duration::from_secs(1)),
)
.await
.expect("Failed to message factory")
.expect("Failed to get reply from factory");
assert_eq!(counter_one.load(Ordering::SeqCst), 1);
assert_eq!(counter_two.load(Ordering::SeqCst), 1);
factory.stop(None);
factory_handle.await.unwrap();
}