1use std::sync::{Arc, Mutex};
2use std::time::{Duration, Instant};
3
4use tokio::sync::oneshot;
5
6use palladium_actor::{
7 ActorPath, AddrHash, ChildSpec, NamespacePolicy, RestartPolicy, ShutdownPolicy,
8};
9use palladium_transport::{InProcessTransport, TransportRegistry};
10
11use crate::control_plane::ControlPlaneActor;
12use crate::engine::EngineConfig;
13use crate::federation::FederatedRouting;
14use crate::fs::{FileSystem, TokioFileSystem};
15use crate::placement::{CoreStats, Placement, PlacementMap};
16use crate::registry::ActorRegistry;
17use crate::responses::ResponseRegistry;
18use crate::ring_buffer::{CacheLineAtomicBool, InterCoreQueue};
19
20use super::core_loop::core_run_async;
21use super::errors::EngineError;
22use super::handle::{CoreHandle, MultiCoreHandle};
23use super::helpers::{default_num_cores, pin_thread_to_core, resolve_placement};
24
25pub struct MultiCoreEngine<F: FileSystem = TokioFileSystem> {
45 config: EngineConfig<crate::reactor::TokioReactor>,
46 num_cores: usize,
47 user_specs_per_core: Vec<Vec<ChildSpec<crate::reactor::TokioReactor>>>,
49 placement: Arc<PlacementMap>,
50 rings: Arc<Vec<Vec<InterCoreQueue>>>,
51 fs: F,
52}
53
54impl MultiCoreEngine<TokioFileSystem> {
55 pub fn new(num_cores: usize) -> Self {
57 Self::with_config(EngineConfig {
58 num_cores: Some(num_cores),
59 ..Default::default()
60 })
61 }
62
63 pub fn with_config(config: EngineConfig<crate::reactor::TokioReactor>) -> Self {
68 Self::with_fs(config, TokioFileSystem)
69 }
70}
71
72impl<F: FileSystem> MultiCoreEngine<F> {
73 pub fn with_fs(config: EngineConfig<crate::reactor::TokioReactor>, fs: F) -> Self {
75 let num_cores = config.num_cores.unwrap_or_else(default_num_cores).max(1);
76 let capacity = config.inter_core_queue_capacity;
77 let rings = (0..num_cores)
78 .map(|_| {
79 (0..num_cores)
80 .map(|_| InterCoreQueue::new(capacity))
81 .collect()
82 })
83 .collect();
84 Self {
85 placement: Arc::new(PlacementMap::new()),
86 config,
87 num_cores,
88 user_specs_per_core: vec![Vec::new(); num_cores],
89 rings: Arc::new(rings),
90 fs,
91 }
92 }
93
94 pub fn add_user_actor(
99 &mut self,
100 spec: ChildSpec<crate::reactor::TokioReactor>,
101 placement: Placement,
102 ) {
103 let core_id = resolve_placement(&placement, &self.placement, self.num_cores, &spec.name);
104 let parent =
106 ActorPath::parse(&format!("/user/core{core_id}")).expect("valid core supervisor path");
107 if let Ok(child_path) = parent.child(&spec.name) {
108 let addr = AddrHash::new(&child_path, 0);
109 self.placement.insert(addr.path_hash(), core_id);
110 }
111 self.user_specs_per_core[core_id].push(spec);
112 }
113
114 pub fn start(self) -> Result<MultiCoreHandle<F>, EngineError> {
124 let registry = Arc::new(ActorRegistry::<crate::reactor::TokioReactor>::new());
125 let responses = Arc::new(ResponseRegistry::new(self.config.response_capacity));
126 let placement = self.placement;
127 let rings = self.rings;
128 let start_time = Instant::now();
129 let num_cores = self.num_cores;
130 let mut config = self.config;
131 let fs = self.fs;
132
133 let global_locals = Arc::new(dashmap::DashMap::new());
135 let death_listeners = Arc::new(parking_lot::Mutex::new(std::collections::BTreeMap::new()));
136
137 let has_work: Arc<Vec<CacheLineAtomicBool>> = Arc::new(
142 (0..num_cores)
143 .map(|_| CacheLineAtomicBool::new(false))
144 .collect(),
145 );
146 let core_idle: Arc<Vec<CacheLineAtomicBool>> = Arc::new(
147 (0..num_cores)
148 .map(|_| CacheLineAtomicBool::new(true))
149 .collect(),
150 );
151
152 let federated_routing = maybe_init_federation(&mut config);
153 ensure_consensus_config(&mut config);
154
155 let (internal_shutdown_tx, internal_shutdown_rx) = tokio::sync::oneshot::channel::<()>();
157 let internal_shutdown_tx = Arc::new(Mutex::new(Some(internal_shutdown_tx)));
158 let (tcp_addr, quic_addr, has_control_plane) = resolve_control_plane_addrs(&config);
159
160 let mut user_specs_per_core = self.user_specs_per_core;
161
162 let core0_local_t = Arc::new(InProcessTransport::with_shared_state(
165 global_locals.clone(),
166 death_listeners.clone(),
167 ));
168 let core0_tr = Arc::new(TransportRegistry::new_with_local(core0_local_t.local_map()));
169 let core0_inter_core_t = Arc::new(super::transport::InterCoreTransport {
170 my_core: 0,
171 rings: rings.clone(),
172 placement: placement.clone(),
173 local: core0_local_t.clone(),
174 has_work: has_work.clone(),
175 core_idle: core_idle.clone(),
176 });
177 core0_tr.add_transport(core0_inter_core_t).ok();
178
179 if has_control_plane {
180 inject_control_plane_actor(
181 &mut user_specs_per_core,
182 &config,
183 ®istry,
184 &responses,
185 &internal_shutdown_tx,
186 start_time,
187 tcp_addr,
188 quic_addr,
189 federated_routing.as_ref().map(|r| r.policy()),
190 self.num_cores,
191 crate::reactor::TokioReactor,
192 palladium_transport::network::TokioNetwork,
193 fs.clone(),
194 );
195 }
196
197 let mut handles = Vec::with_capacity(num_cores);
198 let mut per_core_stats = Vec::with_capacity(num_cores);
199
200 for (core_id, specs) in user_specs_per_core.into_iter().enumerate() {
201 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
202
203 let (local_t, tr) = if core_id == 0 {
204 (core0_local_t.clone(), core0_tr.clone())
205 } else {
206 let local_t = Arc::new(InProcessTransport::with_shared_state(
207 global_locals.clone(),
208 death_listeners.clone(),
209 ));
210 let tr = Arc::new(TransportRegistry::new_with_local(local_t.local_map()));
211 let inter_core_t = Arc::new(super::transport::InterCoreTransport {
212 my_core: core_id,
213 rings: rings.clone(),
214 placement: placement.clone(),
215 local: local_t.clone(),
216 has_work: has_work.clone(),
217 core_idle: core_idle.clone(),
218 });
219 tr.add_transport(inter_core_t).ok();
220 (local_t, tr)
221 };
222
223 let reg = registry.clone();
224 let res = responses.clone();
225 let cfg = config.clone();
226 let f = fs.clone();
227 let stats = Arc::new(CoreStats::new());
228 let stats_capture = stats.clone();
229 let rings_capture = rings.clone();
230 let has_work_capture = has_work.clone();
231 let core_idle_capture = core_idle.clone();
232
233 let thread = std::thread::Builder::new()
234 .name(format!("pd-core-{core_id}"))
235 .spawn(move || {
236 pin_thread_to_core(core_id);
237 let rt = tokio::runtime::Builder::new_current_thread()
238 .enable_all()
239 .build()
240 .expect("failed to build Tokio runtime");
241 let local = tokio::task::LocalSet::new();
242 local.block_on(
243 &rt,
244 core_run_async(
245 core_id,
246 num_cores,
247 local_t,
248 tr,
249 res,
250 reg,
251 rings_capture,
252 has_work_capture,
253 core_idle_capture,
254 shutdown_rx,
255 cfg,
256 specs,
257 f,
258 stats_capture,
259 ),
260 );
261 })
262 .map_err(EngineError::ThreadSpawnFailed)?;
263
264 per_core_stats.push(stats.clone());
265 handles.push(CoreHandle {
266 core_id,
267 shutdown_tx: Some(shutdown_tx),
268 thread: Some(thread),
269 stats,
270 });
271 }
272
273 Ok(MultiCoreHandle {
274 handles,
275 transport: core0_local_t,
276 transport_registry: core0_tr,
277 registry,
278 responses,
279 placement,
280 start_time,
281 ask_timeout: config.ask_timeout,
282 num_cores,
283 per_core_stats,
284 internal_shutdown_rx: if has_control_plane {
285 Some(internal_shutdown_rx)
286 } else {
287 None
288 },
289 federated_routing,
290 fs,
291 reactor: crate::reactor::TokioReactor,
292 })
293 }
294}
295
296fn maybe_init_federation(
297 config: &mut EngineConfig<crate::reactor::TokioReactor>,
298) -> Option<Arc<FederatedRouting>> {
299 if config.federation_enabled && config.cluster_membership.is_none() {
300 if let Some(bind) = &config.federation_bind_addr {
301 if let Ok(addr) = bind.parse::<std::net::SocketAddr>() {
302 let mut membership_cfg = config.federation_membership.clone();
303 if !config.federation_seed_nodes.is_empty() {
304 let mut seeds = Vec::new();
305 for seed in &config.federation_seed_nodes {
306 if let Ok(seed_addr) = seed.parse() {
307 seeds.push(seed_addr);
308 }
309 }
310 membership_cfg.seed_nodes = seeds;
311 }
312 let membership = palladium_federation::ClusterMembership::new(
313 config.engine_id.clone(),
314 addr,
315 membership_cfg,
316 );
317 let membership = Arc::new(parking_lot::RwLock::new(membership));
318 let registry_arc = Arc::new(parking_lot::RwLock::new(
319 palladium_federation::FederatedRegistry::default(),
320 ));
321 config.cluster_membership = Some(Arc::clone(&membership));
322 if config.federated_registry.is_none() {
323 config.federated_registry = Some(Arc::clone(®istry_arc));
324 }
325 return Some(Arc::new(FederatedRouting::new(
326 registry_arc,
327 config.federation_policy.clone(),
328 )));
329 }
330 eprintln!("invalid federation_bind_addr: {bind}");
331 } else {
332 eprintln!("federation_enabled=true but federation_bind_addr is not set");
333 }
334 }
335 config.federated_registry.clone().map(|registry_arc| {
336 Arc::new(FederatedRouting::new(
337 registry_arc,
338 config.federation_policy.clone(),
339 ))
340 })
341}
342
343fn ensure_consensus_config(config: &mut EngineConfig<crate::reactor::TokioReactor>) {
344 if config.consensus_engine.is_some() {
345 if config.consensus_groups.is_none() {
346 config.consensus_groups = Some(Arc::new(parking_lot::RwLock::new(
347 std::collections::HashMap::new(),
348 )));
349 }
350 if config.consensus_group_meta.is_none() {
351 config.consensus_group_meta = Some(Arc::new(parking_lot::RwLock::new(
352 std::collections::HashMap::new(),
353 )));
354 }
355 }
356}
357
358fn resolve_control_plane_addrs<R: crate::reactor::Reactor>(
359 config: &EngineConfig<R>,
360) -> (
361 Option<std::net::SocketAddr>,
362 Option<std::net::SocketAddr>,
363 bool,
364) {
365 let tcp_addr = config
366 .control_plane_tcp_addr
367 .as_deref()
368 .and_then(|addr| crate::control_plane::parse_socket_addr(addr).ok());
369 let quic_addr = config
370 .control_plane_quic_addr
371 .as_deref()
372 .and_then(|addr| crate::control_plane::parse_socket_addr(addr).ok());
373 let has_control_plane =
374 config.control_plane_socket.is_some() || tcp_addr.is_some() || quic_addr.is_some();
375 (tcp_addr, quic_addr, has_control_plane)
376}
377
378#[allow(clippy::too_many_arguments)]
379fn inject_control_plane_actor<
380 R: crate::reactor::Reactor,
381 N: palladium_transport::network::Network,
382 F: crate::fs::FileSystem,
383>(
384 user_specs_per_core: &mut [Vec<ChildSpec<R>>],
385 config: &EngineConfig<R>,
386 registry: &Arc<ActorRegistry<R>>,
387 responses: &Arc<ResponseRegistry>,
388 internal_shutdown_tx: &Arc<Mutex<Option<oneshot::Sender<()>>>>,
389 start_time: Instant,
390 tcp_addr: Option<std::net::SocketAddr>,
391 quic_addr: Option<std::net::SocketAddr>,
392 federation_policy: Option<Arc<parking_lot::RwLock<palladium_federation::FederationPolicy>>>,
393 num_cores: usize,
394 reactor: R,
395 network: N,
396 fs: F,
397) {
398 let cp_path = ActorPath::parse("/user/core0/control-plane").expect("valid control-plane path");
399 let ns = NamespacePolicy::default_for(&cp_path).expect("valid namespace");
400 let reg = Arc::clone(registry);
401 let responses = Arc::clone(responses);
402 let tx = Arc::clone(internal_shutdown_tx);
403 let prpc = config.plugin_rpc.clone();
404 let actor_spawn = config.actor_spawn.clone();
405 let cluster_membership = config.cluster_membership.clone();
406 let federated_registry = config.federated_registry.clone();
407 let engine_id = config.engine_id.clone();
408 let consensus_engine = config.consensus_engine.clone();
409 let consensus_groups = config.consensus_groups.clone();
410 let consensus_group_meta = config.consensus_group_meta.clone();
411 let consensus_engine_info = config.consensus_engine_info.clone();
412 let socket_path = config.control_plane_socket.clone();
413 let tls = config.control_plane_tls.clone();
414 let r = reactor.clone();
415 let n = network.clone();
416 let f = fs.clone();
417
418 user_specs_per_core[0].push(ChildSpec::new(
419 "control-plane",
420 RestartPolicy::Permanent,
421 ShutdownPolicy::Timeout(Duration::from_secs(5)),
422 ns,
423 move || {
424 let tx_final = tx.lock().unwrap().take();
425 Box::new(ControlPlaneActor::new(
426 socket_path.clone(),
427 tcp_addr,
428 quic_addr,
429 tls.clone(),
430 Arc::clone(®),
431 Arc::clone(&responses),
432 start_time,
433 tx_final,
434 num_cores,
435 prpc.clone(),
436 actor_spawn.clone(),
437 cluster_membership.clone(),
438 federated_registry.clone(),
439 federation_policy.clone(),
440 engine_id.clone(),
441 consensus_engine.clone(),
442 consensus_groups.clone(),
443 consensus_group_meta.clone(),
444 consensus_engine_info.clone(),
445 r.clone(),
446 n.clone(),
447 f.clone(),
448 ))
449 },
450 ));
451}