Skip to main content

palladium_runtime/
worker_pool.rs

1use crate::reactor::Reactor;
2use crate::responses::ResponseRegistry;
3use crate::stable_addr::make_stable_addr;
4use palladium_actor::{
5    Actor, ActorPath, AddrHash, ChildSpec, Message, NamespacePolicy, PoolConfig, WorkerPool,
6};
7use palladium_transport::{InProcessTransport, TransportRegistry};
8use std::sync::Arc;
9
10/// Spawn N worker actors and return a [`WorkerPool<M>`] routing handle.
11///
12/// For each worker slot `i`:
13///
14/// 1. Derives child name `{prefix}-{i}` and computes its expected path under
15///    `parent_path`.
16/// 2. Calls [`make_stable_addr`] to create a `(StableAddr<M>, AddrRefresher)` pair.
17/// 3. Builds a [`ChildSpec`] with the refresher attached so the supervisor
18///    updates the `StableAddr` after each (re)spawn.
19/// 4. Invokes `spawn_fn(spec)` — a caller-supplied callback that delivers the
20///    spec to the relevant supervisor.
21///
22/// The `StableAddr`s in the returned pool start in `None` state; they are
23/// populated once the supervisor processes the spawn requests and calls
24/// `refresher.refresh(addr)`.  Callers must yield at least once (e.g.
25/// `tokio::task::yield_now().await`) before using the pool.
26#[allow(clippy::too_many_arguments)]
27pub(crate) fn spawn_worker_pool<M, R, G>(
28    parent_path: &ActorPath,
29    config: &PoolConfig,
30    factory: G,
31    source_addr: AddrHash,
32    transport: &Arc<InProcessTransport>,
33    transport_registry: &Arc<TransportRegistry>,
34    responses: &Arc<ResponseRegistry>,
35    reactor: &R,
36    spawn_fn: impl Fn(ChildSpec<R>),
37) -> WorkerPool<M>
38where
39    M: Message,
40    R: Reactor + Clone + Send + Sync + 'static,
41    G: Fn() -> Box<dyn Actor<R>> + Send + Sync + Clone + 'static,
42{
43    let workers = (0..config.size)
44        .map(|i| {
45            let name = format!("{}-{}", config.name_prefix, i);
46
47            // Compute the expected child path so we can derive a sound
48            // namespace policy. Falls back to permissive /user + /system if
49            // the name is not a valid path segment.
50            let child_path = parent_path
51                .child(&name)
52                .unwrap_or_else(|_| ActorPath::parse("/user").expect("fallback path is valid"));
53
54            let ns = NamespacePolicy::default_for(&child_path).unwrap_or_else(|_| {
55                NamespacePolicy::new(vec![
56                    ActorPath::parse("/user").expect("valid"),
57                    ActorPath::parse("/system").expect("valid"),
58                ])
59            });
60
61            let (stable, refresher) = make_stable_addr::<M, R>(
62                source_addr,
63                transport.clone(),
64                transport_registry.clone(),
65                responses.clone(),
66                config.ask_timeout,
67                reactor.clone(),
68            );
69
70            let g = factory.clone();
71            let spec = ChildSpec::new(
72                name,
73                config.restart,
74                config.shutdown.clone(),
75                ns,
76                move || g(),
77            )
78            .with_mailbox_capacity(config.mailbox_capacity)
79            .with_stable_addr(refresher);
80
81            spawn_fn(spec);
82            stable
83        })
84        .collect();
85
86    WorkerPool::new(workers)
87}