palladium_runtime/
worker_pool.rs1use crate::reactor::Reactor;
2use crate::responses::ResponseRegistry;
3use crate::stable_addr::make_stable_addr;
4use palladium_actor::{
5 Actor, ActorPath, AddrHash, ChildSpec, Message, NamespacePolicy, PoolConfig, WorkerPool,
6};
7use palladium_transport::{InProcessTransport, TransportRegistry};
8use std::sync::Arc;
9
10#[allow(clippy::too_many_arguments)]
27pub(crate) fn spawn_worker_pool<M, R, G>(
28 parent_path: &ActorPath,
29 config: &PoolConfig,
30 factory: G,
31 source_addr: AddrHash,
32 transport: &Arc<InProcessTransport>,
33 transport_registry: &Arc<TransportRegistry>,
34 responses: &Arc<ResponseRegistry>,
35 reactor: &R,
36 spawn_fn: impl Fn(ChildSpec<R>),
37) -> WorkerPool<M>
38where
39 M: Message,
40 R: Reactor + Clone + Send + Sync + 'static,
41 G: Fn() -> Box<dyn Actor<R>> + Send + Sync + Clone + 'static,
42{
43 let workers = (0..config.size)
44 .map(|i| {
45 let name = format!("{}-{}", config.name_prefix, i);
46
47 let child_path = parent_path
51 .child(&name)
52 .unwrap_or_else(|_| ActorPath::parse("/user").expect("fallback path is valid"));
53
54 let ns = NamespacePolicy::default_for(&child_path).unwrap_or_else(|_| {
55 NamespacePolicy::new(vec![
56 ActorPath::parse("/user").expect("valid"),
57 ActorPath::parse("/system").expect("valid"),
58 ])
59 });
60
61 let (stable, refresher) = make_stable_addr::<M, R>(
62 source_addr,
63 transport.clone(),
64 transport_registry.clone(),
65 responses.clone(),
66 config.ask_timeout,
67 reactor.clone(),
68 );
69
70 let g = factory.clone();
71 let spec = ChildSpec::new(
72 name,
73 config.restart,
74 config.shutdown.clone(),
75 ns,
76 move || g(),
77 )
78 .with_mailbox_capacity(config.mailbox_capacity)
79 .with_stable_addr(refresher);
80
81 spawn_fn(spec);
82 stable
83 })
84 .collect();
85
86 WorkerPool::new(workers)
87}