use std::sync::Arc;
#[cfg(not(feature = "async-trait"))]
use futures::future::BoxFuture;
#[cfg(not(feature = "async-trait"))]
use futures::FutureExt;
use crate::concurrency::Duration;
use crate::factory::*;
use crate::Actor;
use crate::ActorProcessingErr;
use crate::ActorRef;
#[derive(Debug, Hash, Clone, Eq, PartialEq)]
struct TestKey {
id: u64,
}
#[derive(Debug)]
enum TestMessage {
#[allow(dead_code)]
Count(u16),
}
#[cfg(feature = "cluster")]
impl crate::BytesConvertable for TestKey {
fn from_bytes(bytes: Vec<u8>) -> Self {
Self {
id: u64::from_bytes(bytes),
}
}
fn into_bytes(self) -> Vec<u8> {
self.id.into_bytes()
}
}
struct TestWorker {
id_map: Arc<dashmap::DashSet<usize>>,
}
#[cfg(feature = "cluster")]
impl crate::Message for TestMessage {}
#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Worker for TestWorker {
type Key = TestKey;
type Message = TestMessage;
type State = Self::Arguments;
type Arguments = ();
async fn pre_start(
&self,
_wid: WorkerId,
_factory: &ActorRef<FactoryMessage<Self::Key, Self::Message>>,
startup_context: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
Ok(startup_context)
}
async fn handle(
&self,
wid: WorkerId,
_factory: &ActorRef<FactoryMessage<Self::Key, Self::Message>>,
Job { msg, key, .. }: Job<Self::Key, Self::Message>,
_state: &mut Self::State,
) -> Result<Self::Key, ActorProcessingErr> {
tracing::debug!("Worker received {:?}", msg);
self.id_map.insert(wid);
Ok(key)
}
}
struct TestWorkerBuilder {
id_map: Arc<dashmap::DashSet<usize>>,
}
impl WorkerBuilder<TestWorker, ()> for TestWorkerBuilder {
fn build(&mut self, _wid: usize) -> (TestWorker, ()) {
(
TestWorker {
id_map: self.id_map.clone(),
},
(),
)
}
}
#[crate::concurrency::test]
#[cfg_attr(
not(all(target_arch = "wasm32", target_os = "unknown")),
tracing_test::traced_test
)]
async fn test_worker_pool_adjustment_manual() {
let id_map = Arc::new(dashmap::DashSet::new());
let worker_builder = TestWorkerBuilder {
id_map: id_map.clone(),
};
let factory_definition = Factory::<
TestKey,
TestMessage,
(),
TestWorker,
routing::RoundRobinRouting<TestKey, TestMessage>,
queues::DefaultQueue<TestKey, TestMessage>,
>::default();
let (factory, factory_handle) = Actor::spawn(
None,
factory_definition,
FactoryArguments {
num_initial_workers: 4,
queue: queues::DefaultQueue::default(),
router: Default::default(),
capacity_controller: None,
dead_mans_switch: None,
discard_handler: None,
discard_settings: DiscardSettings::None,
lifecycle_hooks: None,
worker_builder: Box::new(worker_builder),
stats: None,
},
)
.await
.expect("Failed to spawn factory");
for i in 0..50 {
factory
.cast(FactoryMessage::Dispatch(Job {
key: TestKey { id: 1 },
msg: TestMessage::Count(i),
options: JobOptions::default(),
accepted: None,
}))
.expect("Failed to send to factory");
}
crate::periodic_check(
|| {
id_map.len() == 4
},
Duration::from_millis(200),
)
.await;
id_map.clear();
factory
.cast(FactoryMessage::AdjustWorkerPool(25))
.expect("Failed to send to factory");
for i in 0..50 {
factory
.cast(FactoryMessage::Dispatch(Job {
key: TestKey { id: 1 },
msg: TestMessage::Count(i),
options: JobOptions::default(),
accepted: None,
}))
.expect("Failed to send to factory");
}
crate::periodic_check(
|| {
id_map.len() == 25
},
Duration::from_millis(200),
)
.await;
factory.stop(None);
factory_handle.await.unwrap();
}
#[crate::concurrency::test]
#[cfg_attr(
not(all(target_arch = "wasm32", target_os = "unknown")),
tracing_test::traced_test
)]
async fn test_worker_pool_adjustment_automatic() {
struct DynamicWorkerController;
#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl WorkerCapacityController for DynamicWorkerController {
#[cfg(feature = "async-trait")]
async fn get_pool_size(&mut self, _current: usize) -> usize {
10
}
#[cfg(not(feature = "async-trait"))]
fn get_pool_size(&mut self, _current: usize) -> BoxFuture<'_, usize> {
async { 10 }.boxed()
}
}
let id_map = Arc::new(dashmap::DashSet::new());
let worker_builder = TestWorkerBuilder {
id_map: id_map.clone(),
};
let factory_definition = Factory::<
TestKey,
TestMessage,
(),
TestWorker,
routing::RoundRobinRouting<TestKey, TestMessage>,
queues::DefaultQueue<TestKey, TestMessage>,
>::default();
let (factory, factory_handle) = Actor::spawn(
None,
factory_definition,
FactoryArguments {
num_initial_workers: 4,
queue: queues::DefaultQueue::default(),
router: Default::default(),
capacity_controller: Some(Box::new(DynamicWorkerController)),
dead_mans_switch: None,
discard_handler: None,
discard_settings: DiscardSettings::None,
lifecycle_hooks: None,
worker_builder: Box::new(worker_builder),
stats: None,
},
)
.await
.expect("Failed to spawn factory");
for i in 0..50 {
factory
.cast(FactoryMessage::Dispatch(Job {
key: TestKey { id: 1 },
msg: TestMessage::Count(i),
options: JobOptions::default(),
accepted: None,
}))
.expect("Failed to send to factory");
}
crate::periodic_check(
|| {
id_map.len() == 4
},
Duration::from_millis(200),
)
.await;
id_map.clear();
crate::concurrency::sleep(Duration::from_millis(300)).await;
for i in 0..50 {
factory
.cast(FactoryMessage::Dispatch(Job {
key: TestKey { id: 1 },
msg: TestMessage::Count(i),
options: JobOptions::default(),
accepted: None,
}))
.expect("Failed to send to factory");
}
crate::periodic_check(
|| {
id_map.len() == 10
},
Duration::from_millis(200),
)
.await;
factory.stop(None);
factory_handle.await.unwrap();
}