Skip to main content

palladium_runtime/multi_core/
handle.rs

1use std::sync::atomic::Ordering;
2use std::sync::Arc;
3use std::time::{Duration, Instant};
4
5use tokio::sync::oneshot;
6
7use palladium_actor::{ActorPath, AddrHash};
8use palladium_transport::{InProcessTransport, TransportRegistry};
9
10use crate::engine::EngineHandle;
11use crate::introspection::{ActorInfo, ActorQuery, EngineSnapshot};
12use crate::placement::{CoreStats, PlacementMap};
13use crate::reactor::{Reactor, TokioReactor};
14use crate::registry::ActorRegistry;
15use crate::responses::ResponseRegistry;
16
17use super::errors::ShutdownError;
18
19use crate::fs::{FileSystem, TokioFileSystem};
20
21pub(crate) struct CoreHandle {
22    #[allow(dead_code)]
23    pub(crate) core_id: usize,
24    pub(crate) shutdown_tx: Option<oneshot::Sender<()>>,
25    pub(crate) thread: Option<std::thread::JoinHandle<()>>,
26    #[allow(dead_code)]
27    pub(crate) stats: Arc<CoreStats>,
28}
29
30/// Handle for a running [`MultiCoreEngine`].
31///
32/// Provides access to the shared transport, registry, and placement map, plus
33/// graceful shutdown.
34pub struct MultiCoreHandle<F: FileSystem = TokioFileSystem, R: Reactor = TokioReactor> {
35    pub(crate) handles: Vec<CoreHandle>,
36    pub(crate) transport: Arc<InProcessTransport>,
37    pub(crate) transport_registry: Arc<TransportRegistry>,
38    pub(crate) registry: Arc<ActorRegistry<R>>,
39    pub(crate) responses: Arc<ResponseRegistry>,
40    pub(crate) placement: Arc<PlacementMap>,
41    pub(crate) start_time: Instant,
42    pub(crate) ask_timeout: Duration,
43    pub(crate) num_cores: usize,
44    pub(crate) per_core_stats: Vec<Arc<CoreStats>>,
45    /// Fires when the control plane receives an `engine.stop` RPC.
46    /// `None` when no control plane socket was configured.
47    pub(crate) internal_shutdown_rx: Option<oneshot::Receiver<()>>,
48    pub(crate) federated_routing: Option<Arc<crate::federation::FederatedRouting>>,
49    pub(crate) fs: F,
50    pub(crate) reactor: R,
51}
52
53impl<F: FileSystem, R: Reactor> MultiCoreHandle<F, R> {
54    /// Return an [`EngineHandle`] backed by this engine's shared transport.
55    ///
56    /// Useful in tests to build `Addr<M>` objects and interact with actors.
57    pub fn engine_handle(&self) -> EngineHandle<R, palladium_transport::network::TokioNetwork, F> {
58        EngineHandle {
59            transport: self.transport.clone(),
60            transport_registry: self.transport_registry.clone(),
61            type_registry: self.transport_registry.type_registry(),
62            responses: self.responses.clone(),
63            registry: self.registry.clone(),
64            ask_timeout: self.ask_timeout,
65            reactor: self.reactor.clone(),
66            network: palladium_transport::network::TokioNetwork,
67            _fs: self.fs.clone(),
68            start_time: self.start_time,
69            source_addr: AddrHash::synthetic(b"engine-handle"),
70            federated_routing: self.federated_routing.clone(),
71            send_cache: Arc::new(dashmap::DashMap::new()),
72        }
73    }
74
75    /// Access the shared transport directly.
76    pub fn transport(&self) -> &Arc<InProcessTransport> {
77        &self.transport
78    }
79
80    /// Access the placement map.
81    pub fn placement(&self) -> &Arc<PlacementMap> {
82        &self.placement
83    }
84
85    /// List actors, optionally filtered.
86    pub fn actor_list(&self, query: ActorQuery) -> Vec<ActorInfo> {
87        self.registry.snapshot(&query, 0)
88    }
89
90    /// Get info for a single actor by path.
91    pub fn actor_info(&self, path: &ActorPath) -> Option<ActorInfo> {
92        let core_id = self
93            .placement
94            .get(&AddrHash::new(path, 0).path_hash()) // Generation 0 lookups are fine for discovery
95            .map(|v| *v)
96            .unwrap_or(0);
97        self.registry.actor_info_by_path(path, core_id)
98    }
99
100    /// System-level snapshot aggregated across all cores.
101    pub fn snapshot(&self) -> EngineSnapshot {
102        let actors = self.actor_list(ActorQuery::default());
103        EngineSnapshot {
104            num_cores: self.num_cores,
105            uptime_secs: self.start_time.elapsed().as_secs(),
106            actors,
107        }
108    }
109
110    /// Per-core statistics counters.
111    pub fn core_stats(&self) -> &[Arc<CoreStats>] {
112        &self.per_core_stats
113    }
114
115    /// Block until actor at `path` is running, or `timeout` elapses.
116    ///
117    /// Returns `true` if the actor became running within `timeout`.
118    pub fn wait_for_actor(&self, path: &ActorPath, timeout: Duration) -> bool {
119        let deadline = Instant::now() + timeout;
120        while Instant::now() < deadline {
121            if self
122                .registry
123                .get_by_path(path)
124                .is_some_and(|s| s.running.load(Ordering::Relaxed))
125            {
126                return true;
127            }
128            std::thread::sleep(Duration::from_millis(10));
129        }
130        false
131    }
132
133    /// Block until the engine shuts down, then gracefully stop all cores.
134    ///
135    /// Waits for the internal shutdown signal (fired by the control plane's
136    /// `engine.stop` RPC) and then calls [`shutdown`](Self::shutdown).
137    ///
138    /// If no control plane socket was configured, this method returns
139    /// immediately (the caller is responsible for calling `shutdown()`).
140    pub fn run(self) -> Result<(), ShutdownError> {
141        let reactor = self.reactor.clone();
142        let Self {
143            handles,
144            transport,
145            transport_registry,
146            registry,
147            responses,
148            placement,
149            start_time,
150            ask_timeout,
151            num_cores,
152            per_core_stats,
153            internal_shutdown_rx,
154            federated_routing,
155            fs,
156            reactor: _,
157        } = self;
158
159        if let Some(rx) = internal_shutdown_rx {
160            // Block the calling thread until the control plane fires engine.stop.
161            let rt = tokio::runtime::Builder::new_current_thread()
162                .enable_all()
163                .build()
164                .expect("failed to build shutdown-wait runtime");
165            let _ = rt.block_on(rx);
166        }
167
168        MultiCoreHandle {
169            handles,
170            transport,
171            transport_registry,
172            registry,
173            responses,
174            placement,
175            start_time,
176            ask_timeout,
177            num_cores,
178            per_core_stats,
179            internal_shutdown_rx: None,
180            federated_routing,
181            fs,
182            reactor,
183        }
184        .shutdown()
185    }
186
187    /// Gracefully shut down all cores.
188    ///
189    /// Sends shutdown signals to every core, waits for threads to join, then
190    /// performs brutal cancellation of any remaining actor tasks.
191    pub fn shutdown(mut self) -> Result<(), ShutdownError> {
192        for h in &mut self.handles {
193            if let Some(tx) = h.shutdown_tx.take() {
194                let _ = tx.send(());
195            }
196        }
197        let mut panicked = false;
198        for h in &mut self.handles {
199            if let Some(thread) = h.thread.take() {
200                if thread.join().is_err() {
201                    panicked = true;
202                }
203            }
204        }
205        // Brutal escalation: cancel any tasks that didn't drain in time.
206        self.registry.cancel_all();
207        self.responses.drain();
208
209        if panicked {
210            Err(ShutdownError::ThreadPanic)
211        } else {
212            Ok(())
213        }
214    }
215}