Skip to main content

palladium_runtime/multi_core/
handle.rs

1use std::sync::atomic::Ordering;
2use std::sync::Arc;
3use std::time::{Duration, Instant};
4
5use tokio::sync::oneshot;
6
7use palladium_actor::{ActorPath, AddrHash};
8use palladium_transport::{InProcessTransport, MailboxReceiver, TransportRegistry};
9
10use crate::engine::EngineHandle;
11use crate::introspection::{ActorInfo, ActorQuery, EngineSnapshot};
12use crate::placement::{CoreStats, PlacementMap};
13use crate::reactor::{Reactor, TokioReactor};
14use crate::registry::ActorRegistry;
15use crate::responses::ResponseRegistry;
16
17use super::errors::ShutdownError;
18
19use crate::fs::{FileSystem, TokioFileSystem};
20
21pub(crate) struct CoreHandle {
22    #[allow(dead_code)]
23    pub(crate) core_id: usize,
24    pub(crate) shutdown_tx: Option<oneshot::Sender<()>>,
25    pub(crate) thread: Option<std::thread::JoinHandle<()>>,
26    #[allow(dead_code)]
27    pub(crate) stats: Arc<CoreStats>,
28}
29
30/// Handle for a running [`MultiCoreEngine`].
31///
32/// Provides access to the shared transport, registry, and placement map, plus
33/// graceful shutdown.
34pub struct MultiCoreHandle<F: FileSystem = TokioFileSystem, R: Reactor = TokioReactor> {
35    pub(crate) handles: Vec<CoreHandle>,
36    pub(crate) transport: Arc<InProcessTransport>,
37    pub(crate) transport_registry: Arc<TransportRegistry>,
38    pub(crate) registry: Arc<ActorRegistry<R>>,
39    pub(crate) responses: Arc<ResponseRegistry>,
40    pub(crate) placement: Arc<PlacementMap>,
41    pub(crate) start_time: Instant,
42    pub(crate) ask_timeout: Duration,
43    pub(crate) num_cores: usize,
44    pub(crate) per_core_stats: Vec<Arc<CoreStats>>,
45    /// Fires when the control plane receives an `engine.stop` RPC.
46    /// `None` when no control plane socket was configured.
47    pub(crate) internal_shutdown_rx: Option<oneshot::Receiver<()>>,
48    pub(crate) federated_routing: Option<Arc<crate::federation::FederatedRouting>>,
49    pub(crate) fs: F,
50    pub(crate) reactor: R,
51}
52
53impl<F: FileSystem, R: Reactor> MultiCoreHandle<F, R> {
54    /// Return an [`EngineHandle`] backed by this engine's shared transport.
55    ///
56    /// Useful in tests to build `Addr<M>` objects and interact with actors.
57    pub fn engine_handle(&self) -> EngineHandle<R, palladium_transport::network::TokioNetwork, F> {
58        EngineHandle {
59            transport: self.transport.clone(),
60            transport_registry: self.transport_registry.clone(),
61            type_registry: self.transport_registry.type_registry(),
62            responses: self.responses.clone(),
63            registry: self.registry.clone(),
64            ask_timeout: self.ask_timeout,
65            reactor: self.reactor.clone(),
66            network: palladium_transport::network::TokioNetwork,
67            _fs: self.fs.clone(),
68            start_time: self.start_time,
69            source_addr: AddrHash::synthetic(b"engine-handle"),
70            federated_routing: self.federated_routing.clone(),
71            send_cache: Arc::new(dashmap::DashMap::new()),
72            // MultiCoreHandle does not register a pump mailbox; remote_addr_for
73            // ask() will time out without a pump task running.
74            pump_rx: Arc::new(std::sync::Mutex::new(None::<MailboxReceiver>)),
75        }
76    }
77
78    /// Access the shared transport directly.
79    pub fn transport(&self) -> &Arc<InProcessTransport> {
80        &self.transport
81    }
82
83    /// Access the placement map.
84    pub fn placement(&self) -> &Arc<PlacementMap> {
85        &self.placement
86    }
87
88    /// List actors, optionally filtered.
89    pub fn actor_list(&self, query: ActorQuery) -> Vec<ActorInfo> {
90        self.registry.snapshot(&query, 0)
91    }
92
93    /// Get info for a single actor by path.
94    pub fn actor_info(&self, path: &ActorPath) -> Option<ActorInfo> {
95        let core_id = self
96            .placement
97            .get(&AddrHash::new(path, 0).path_hash()) // Generation 0 lookups are fine for discovery
98            .map(|v| *v)
99            .unwrap_or(0);
100        self.registry.actor_info_by_path(path, core_id)
101    }
102
103    /// System-level snapshot aggregated across all cores.
104    pub fn snapshot(&self) -> EngineSnapshot {
105        let actors = self.actor_list(ActorQuery::default());
106        EngineSnapshot {
107            num_cores: self.num_cores,
108            uptime_secs: self.start_time.elapsed().as_secs(),
109            actors,
110        }
111    }
112
113    /// Per-core statistics counters.
114    pub fn core_stats(&self) -> &[Arc<CoreStats>] {
115        &self.per_core_stats
116    }
117
118    /// Block until actor at `path` is running, or `timeout` elapses.
119    ///
120    /// Returns `true` if the actor became running within `timeout`.
121    pub fn wait_for_actor(&self, path: &ActorPath, timeout: Duration) -> bool {
122        let deadline = Instant::now() + timeout;
123        while Instant::now() < deadline {
124            if self
125                .registry
126                .get_by_path(path)
127                .is_some_and(|s| s.running.load(Ordering::Relaxed))
128            {
129                return true;
130            }
131            std::thread::sleep(Duration::from_millis(10));
132        }
133        false
134    }
135
136    /// Block until the engine shuts down, then gracefully stop all cores.
137    ///
138    /// Waits for the internal shutdown signal (fired by the control plane's
139    /// `engine.stop` RPC) and then calls [`shutdown`](Self::shutdown).
140    ///
141    /// If no control plane socket was configured, this method returns
142    /// immediately (the caller is responsible for calling `shutdown()`).
143    pub fn run(self) -> Result<(), ShutdownError> {
144        let reactor = self.reactor.clone();
145        let Self {
146            handles,
147            transport,
148            transport_registry,
149            registry,
150            responses,
151            placement,
152            start_time,
153            ask_timeout,
154            num_cores,
155            per_core_stats,
156            internal_shutdown_rx,
157            federated_routing,
158            fs,
159            reactor: _,
160        } = self;
161
162        if let Some(rx) = internal_shutdown_rx {
163            // Block the calling thread until the control plane fires engine.stop.
164            let rt = tokio::runtime::Builder::new_current_thread()
165                .enable_all()
166                .build()
167                .expect("failed to build shutdown-wait runtime");
168            let _ = rt.block_on(rx);
169        }
170
171        MultiCoreHandle {
172            handles,
173            transport,
174            transport_registry,
175            registry,
176            responses,
177            placement,
178            start_time,
179            ask_timeout,
180            num_cores,
181            per_core_stats,
182            internal_shutdown_rx: None,
183            federated_routing,
184            fs,
185            reactor,
186        }
187        .shutdown()
188    }
189
190    /// Gracefully shut down all cores.
191    ///
192    /// Sends shutdown signals to every core, waits for threads to join, then
193    /// performs brutal cancellation of any remaining actor tasks.
194    pub fn shutdown(mut self) -> Result<(), ShutdownError> {
195        for h in &mut self.handles {
196            if let Some(tx) = h.shutdown_tx.take() {
197                let _ = tx.send(());
198            }
199        }
200        let mut panicked = false;
201        for h in &mut self.handles {
202            if let Some(thread) = h.thread.take() {
203                if thread.join().is_err() {
204                    panicked = true;
205                }
206            }
207        }
208        // Brutal escalation: cancel any tasks that didn't drain in time.
209        self.registry.cancel_all();
210        self.responses.drain();
211
212        if panicked {
213            Err(ShutdownError::ThreadPanic)
214        } else {
215            Ok(())
216        }
217    }
218}