Skip to main content

palladium_runtime/multi_core/
engine.rs

1use std::sync::{Arc, Mutex};
2use std::time::{Duration, Instant};
3
4use tokio::sync::oneshot;
5
6use palladium_actor::{
7    ActorPath, AddrHash, ChildSpec, NamespacePolicy, RestartPolicy, ShutdownPolicy,
8};
9use palladium_transport::{InProcessTransport, TransportRegistry};
10
11use crate::control_plane::ControlPlaneActor;
12use crate::engine::EngineConfig;
13use crate::federation::FederatedRouting;
14use crate::fs::{FileSystem, TokioFileSystem};
15use crate::placement::{CoreStats, Placement, PlacementMap};
16use crate::registry::ActorRegistry;
17use crate::responses::ResponseRegistry;
18use crate::ring_buffer::{CacheLineAtomicBool, InterCoreQueue};
19
20use super::core_loop::core_run_async;
21use super::errors::EngineError;
22use super::handle::{CoreHandle, MultiCoreHandle};
23use super::helpers::{default_num_cores, pin_thread_to_core, resolve_placement};
24
25/// Builder for a multi-core actor engine.
26///
27/// Each core runs an independent `current_thread` Tokio runtime (optionally
28/// pinned to a physical CPU). All cores share a single [`InProcessTransport`]
29/// so cross-core sends work transparently: a message from core 0 to an actor
30/// on core 1 is pushed directly into that actor's Tokio mpsc mailbox, which
31/// the receiving core's task wakes up to process.
32///
33/// # Example
34/// ```rust,no_run
35/// use palladium_runtime::{MultiCoreEngine, EngineConfig};
36/// use palladium_actor::ChildSpec;
37/// use palladium_runtime::Placement;
38///
39/// let mut engine = MultiCoreEngine::new(2);
40/// // engine.add_user_actor(spec, Placement::Core(0));
41/// let handle = engine.start().unwrap();
42/// handle.shutdown().unwrap();
43/// ```
44pub struct MultiCoreEngine<F: FileSystem = TokioFileSystem> {
45    config: EngineConfig<crate::reactor::TokioReactor>,
46    num_cores: usize,
47    /// Actor specs indexed by core id.
48    user_specs_per_core: Vec<Vec<ChildSpec<crate::reactor::TokioReactor>>>,
49    placement: Arc<PlacementMap>,
50    rings: Arc<Vec<Vec<InterCoreQueue>>>,
51    fs: F,
52}
53
54impl MultiCoreEngine<TokioFileSystem> {
55    /// Create a new engine with `num_cores` cores and default config.
56    pub fn new(num_cores: usize) -> Self {
57        Self::with_config(EngineConfig {
58            num_cores: Some(num_cores),
59            ..Default::default()
60        })
61    }
62
63    /// Create a new engine from an [`EngineConfig`].
64    ///
65    /// `num_cores` is taken from `config.num_cores`, falling back to
66    /// `std::thread::available_parallelism()`.
67    pub fn with_config(config: EngineConfig<crate::reactor::TokioReactor>) -> Self {
68        Self::with_fs(config, TokioFileSystem)
69    }
70}
71
72impl<F: FileSystem> MultiCoreEngine<F> {
73    /// Create a new engine with a specific filesystem.
74    pub fn with_fs(config: EngineConfig<crate::reactor::TokioReactor>, fs: F) -> Self {
75        let num_cores = config.num_cores.unwrap_or_else(default_num_cores).max(1);
76        let capacity = config.inter_core_queue_capacity;
77        let rings = (0..num_cores)
78            .map(|_| {
79                (0..num_cores)
80                    .map(|_| InterCoreQueue::new(capacity))
81                    .collect()
82            })
83            .collect();
84        Self {
85            placement: Arc::new(PlacementMap::new()),
86            config,
87            num_cores,
88            user_specs_per_core: vec![Vec::new(); num_cores],
89            rings: Arc::new(rings),
90            fs,
91        }
92    }
93
94    /// Register an actor to be spawned at startup on a specific core.
95    ///
96    /// Pre-populates the placement map so cross-core routing is immediately
97    /// available once the engine starts.
98    pub fn add_user_actor(
99        &mut self,
100        spec: ChildSpec<crate::reactor::TokioReactor>,
101        placement: Placement,
102    ) {
103        let core_id = resolve_placement(&placement, &self.placement, self.num_cores, &spec.name);
104        // Pre-compute the actor's path so the placement map is ready before start.
105        let parent =
106            ActorPath::parse(&format!("/user/core{core_id}")).expect("valid core supervisor path");
107        if let Ok(child_path) = parent.child(&spec.name) {
108            let addr = AddrHash::new(&child_path, 0);
109            self.placement.insert(addr.path_hash(), core_id);
110        }
111        self.user_specs_per_core[core_id].push(spec);
112    }
113
114    /// Start the engine: spawn N threads, each running a Tokio runtime.
115    ///
116    /// Returns a [`MultiCoreHandle`] immediately — actors are started
117    /// asynchronously inside the spawned threads. Use
118    /// [`MultiCoreHandle::wait_for_actor`] to synchronise before sending.
119    ///
120    /// If [`EngineConfig::control_plane_socket`] is set, a
121    /// [`ControlPlaneActor`] is added to core 0's user supervisor so the
122    /// engine can be managed via `pd status`, `pd actor`, and `pd stop`.
123    pub fn start(self) -> Result<MultiCoreHandle<F>, EngineError> {
124        let registry = Arc::new(ActorRegistry::<crate::reactor::TokioReactor>::new());
125        let responses = Arc::new(ResponseRegistry::new(self.config.response_capacity));
126        let placement = self.placement;
127        let rings = self.rings;
128        let start_time = Instant::now();
129        let num_cores = self.num_cores;
130        let mut config = self.config;
131        let fs = self.fs;
132
133        // Shared state for all InProcessTransport instances.
134        let global_locals = Arc::new(dashmap::DashMap::new());
135        let death_listeners = Arc::new(parking_lot::Mutex::new(std::collections::BTreeMap::new()));
136
137        // Per-core work-available flags (cache-line padded, one per core).
138        // Sender cores set has_work[dst] after a ring push; the drain task on
139        // core `dst` uses the flag to avoid unnecessary yield_now() / kernel
140        // wakeups when more inter-core traffic is already in-flight.
141        let has_work: Arc<Vec<CacheLineAtomicBool>> = Arc::new(
142            (0..num_cores)
143                .map(|_| CacheLineAtomicBool::new(false))
144                .collect(),
145        );
146        let core_idle: Arc<Vec<CacheLineAtomicBool>> = Arc::new(
147            (0..num_cores)
148                .map(|_| CacheLineAtomicBool::new(true))
149                .collect(),
150        );
151
152        let federated_routing = maybe_init_federation(&mut config);
153        ensure_consensus_config(&mut config);
154
155        // Internal shutdown channel
156        let (internal_shutdown_tx, internal_shutdown_rx) = tokio::sync::oneshot::channel::<()>();
157        let internal_shutdown_tx = Arc::new(Mutex::new(Some(internal_shutdown_tx)));
158        let (tcp_addr, quic_addr, has_control_plane) = resolve_control_plane_addrs(&config);
159
160        let mut user_specs_per_core = self.user_specs_per_core;
161
162        // Pre-create Core 0's registry so the ControlPlaneActor can use it.
163        // The CP runs on Core 0 and needs to talk to other cores via rings.
164        let core0_local_t = Arc::new(InProcessTransport::with_shared_state(
165            global_locals.clone(),
166            death_listeners.clone(),
167        ));
168        let core0_tr = Arc::new(TransportRegistry::new_with_local(core0_local_t.local_map()));
169        let core0_inter_core_t = Arc::new(super::transport::InterCoreTransport {
170            my_core: 0,
171            rings: rings.clone(),
172            placement: placement.clone(),
173            local: core0_local_t.clone(),
174            has_work: has_work.clone(),
175            core_idle: core_idle.clone(),
176        });
177        core0_tr.add_transport(core0_inter_core_t).ok();
178
179        if has_control_plane {
180            inject_control_plane_actor(
181                &mut user_specs_per_core,
182                &config,
183                &registry,
184                &responses,
185                &internal_shutdown_tx,
186                start_time,
187                tcp_addr,
188                quic_addr,
189                federated_routing.as_ref().map(|r| r.policy()),
190                self.num_cores,
191                crate::reactor::TokioReactor,
192                palladium_transport::network::TokioNetwork,
193                fs.clone(),
194            );
195        }
196
197        let mut handles = Vec::with_capacity(num_cores);
198        let mut per_core_stats = Vec::with_capacity(num_cores);
199
200        for (core_id, specs) in user_specs_per_core.into_iter().enumerate() {
201            let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
202
203            let (local_t, tr) = if core_id == 0 {
204                (core0_local_t.clone(), core0_tr.clone())
205            } else {
206                let local_t = Arc::new(InProcessTransport::with_shared_state(
207                    global_locals.clone(),
208                    death_listeners.clone(),
209                ));
210                let tr = Arc::new(TransportRegistry::new_with_local(local_t.local_map()));
211                let inter_core_t = Arc::new(super::transport::InterCoreTransport {
212                    my_core: core_id,
213                    rings: rings.clone(),
214                    placement: placement.clone(),
215                    local: local_t.clone(),
216                    has_work: has_work.clone(),
217                    core_idle: core_idle.clone(),
218                });
219                tr.add_transport(inter_core_t).ok();
220                (local_t, tr)
221            };
222
223            let reg = registry.clone();
224            let res = responses.clone();
225            let cfg = config.clone();
226            let f = fs.clone();
227            let stats = Arc::new(CoreStats::new());
228            let stats_capture = stats.clone();
229            let rings_capture = rings.clone();
230            let has_work_capture = has_work.clone();
231            let core_idle_capture = core_idle.clone();
232
233            let thread = std::thread::Builder::new()
234                .name(format!("pd-core-{core_id}"))
235                .spawn(move || {
236                    pin_thread_to_core(core_id);
237                    let rt = tokio::runtime::Builder::new_current_thread()
238                        .enable_all()
239                        .build()
240                        .expect("failed to build Tokio runtime");
241                    let local = tokio::task::LocalSet::new();
242                    local.block_on(
243                        &rt,
244                        core_run_async(
245                            core_id,
246                            num_cores,
247                            local_t,
248                            tr,
249                            res,
250                            reg,
251                            rings_capture,
252                            has_work_capture,
253                            core_idle_capture,
254                            shutdown_rx,
255                            cfg,
256                            specs,
257                            f,
258                            stats_capture,
259                        ),
260                    );
261                })
262                .map_err(EngineError::ThreadSpawnFailed)?;
263
264            per_core_stats.push(stats.clone());
265            handles.push(CoreHandle {
266                core_id,
267                shutdown_tx: Some(shutdown_tx),
268                thread: Some(thread),
269                stats,
270            });
271        }
272
273        Ok(MultiCoreHandle {
274            handles,
275            transport: core0_local_t,
276            transport_registry: core0_tr,
277            registry,
278            responses,
279            placement,
280            start_time,
281            ask_timeout: config.ask_timeout,
282            num_cores,
283            per_core_stats,
284            internal_shutdown_rx: if has_control_plane {
285                Some(internal_shutdown_rx)
286            } else {
287                None
288            },
289            federated_routing,
290            fs,
291            reactor: crate::reactor::TokioReactor,
292        })
293    }
294}
295
296fn maybe_init_federation(
297    config: &mut EngineConfig<crate::reactor::TokioReactor>,
298) -> Option<Arc<FederatedRouting>> {
299    if config.federation_enabled && config.cluster_membership.is_none() {
300        if let Some(bind) = &config.federation_bind_addr {
301            if let Ok(addr) = bind.parse::<std::net::SocketAddr>() {
302                let mut membership_cfg = config.federation_membership.clone();
303                if !config.federation_seed_nodes.is_empty() {
304                    let mut seeds = Vec::new();
305                    for seed in &config.federation_seed_nodes {
306                        if let Ok(seed_addr) = seed.parse() {
307                            seeds.push(seed_addr);
308                        }
309                    }
310                    membership_cfg.seed_nodes = seeds;
311                }
312                let membership = palladium_federation::ClusterMembership::new(
313                    config.engine_id.clone(),
314                    addr,
315                    membership_cfg,
316                );
317                let membership = Arc::new(parking_lot::RwLock::new(membership));
318                let registry_arc = Arc::new(parking_lot::RwLock::new(
319                    palladium_federation::FederatedRegistry::default(),
320                ));
321                config.cluster_membership = Some(Arc::clone(&membership));
322                if config.federated_registry.is_none() {
323                    config.federated_registry = Some(Arc::clone(&registry_arc));
324                }
325                return Some(Arc::new(FederatedRouting::new(
326                    registry_arc,
327                    config.federation_policy.clone(),
328                )));
329            }
330            eprintln!("invalid federation_bind_addr: {bind}");
331        } else {
332            eprintln!("federation_enabled=true but federation_bind_addr is not set");
333        }
334    }
335    config.federated_registry.clone().map(|registry_arc| {
336        Arc::new(FederatedRouting::new(
337            registry_arc,
338            config.federation_policy.clone(),
339        ))
340    })
341}
342
343fn ensure_consensus_config(config: &mut EngineConfig<crate::reactor::TokioReactor>) {
344    if config.consensus_engine.is_some() {
345        if config.consensus_groups.is_none() {
346            config.consensus_groups = Some(Arc::new(parking_lot::RwLock::new(
347                std::collections::HashMap::new(),
348            )));
349        }
350        if config.consensus_group_meta.is_none() {
351            config.consensus_group_meta = Some(Arc::new(parking_lot::RwLock::new(
352                std::collections::HashMap::new(),
353            )));
354        }
355    }
356}
357
358fn resolve_control_plane_addrs<R: crate::reactor::Reactor>(
359    config: &EngineConfig<R>,
360) -> (
361    Option<std::net::SocketAddr>,
362    Option<std::net::SocketAddr>,
363    bool,
364) {
365    let tcp_addr = config
366        .control_plane_tcp_addr
367        .as_deref()
368        .and_then(|addr| crate::control_plane::parse_socket_addr(addr).ok());
369    let quic_addr = config
370        .control_plane_quic_addr
371        .as_deref()
372        .and_then(|addr| crate::control_plane::parse_socket_addr(addr).ok());
373    let has_control_plane =
374        config.control_plane_socket.is_some() || tcp_addr.is_some() || quic_addr.is_some();
375    (tcp_addr, quic_addr, has_control_plane)
376}
377
378#[allow(clippy::too_many_arguments)]
379fn inject_control_plane_actor<
380    R: crate::reactor::Reactor,
381    N: palladium_transport::network::Network,
382    F: crate::fs::FileSystem,
383>(
384    user_specs_per_core: &mut [Vec<ChildSpec<R>>],
385    config: &EngineConfig<R>,
386    registry: &Arc<ActorRegistry<R>>,
387    responses: &Arc<ResponseRegistry>,
388    internal_shutdown_tx: &Arc<Mutex<Option<oneshot::Sender<()>>>>,
389    start_time: Instant,
390    tcp_addr: Option<std::net::SocketAddr>,
391    quic_addr: Option<std::net::SocketAddr>,
392    federation_policy: Option<Arc<parking_lot::RwLock<palladium_federation::FederationPolicy>>>,
393    num_cores: usize,
394    reactor: R,
395    network: N,
396    fs: F,
397) {
398    let cp_path = ActorPath::parse("/user/core0/control-plane").expect("valid control-plane path");
399    let ns = NamespacePolicy::default_for(&cp_path).expect("valid namespace");
400    let reg = Arc::clone(registry);
401    let responses = Arc::clone(responses);
402    let tx = Arc::clone(internal_shutdown_tx);
403    let prpc = config.plugin_rpc.clone();
404    let actor_spawn = config.actor_spawn.clone();
405    let cluster_membership = config.cluster_membership.clone();
406    let federated_registry = config.federated_registry.clone();
407    let engine_id = config.engine_id.clone();
408    let consensus_engine = config.consensus_engine.clone();
409    let consensus_groups = config.consensus_groups.clone();
410    let consensus_group_meta = config.consensus_group_meta.clone();
411    let consensus_engine_info = config.consensus_engine_info.clone();
412    let socket_path = config.control_plane_socket.clone();
413    let tls = config.control_plane_tls.clone();
414    let r = reactor.clone();
415    let n = network.clone();
416    let f = fs.clone();
417
418    user_specs_per_core[0].push(ChildSpec::new(
419        "control-plane",
420        RestartPolicy::Permanent,
421        ShutdownPolicy::Timeout(Duration::from_secs(5)),
422        ns,
423        move || {
424            let tx_final = tx.lock().unwrap().take();
425            Box::new(ControlPlaneActor::new(
426                socket_path.clone(),
427                tcp_addr,
428                quic_addr,
429                tls.clone(),
430                Arc::clone(&reg),
431                Arc::clone(&responses),
432                start_time,
433                tx_final,
434                num_cores,
435                prpc.clone(),
436                actor_spawn.clone(),
437                cluster_membership.clone(),
438                federated_registry.clone(),
439                federation_policy.clone(),
440                engine_id.clone(),
441                consensus_engine.clone(),
442                consensus_groups.clone(),
443                consensus_group_meta.clone(),
444                consensus_engine_info.clone(),
445                r.clone(),
446                n.clone(),
447                f.clone(),
448            ))
449        },
450    ));
451}