palladium_runtime/multi_core/
handle.rs1use std::sync::atomic::Ordering;
2use std::sync::Arc;
3use std::time::{Duration, Instant};
4
5use tokio::sync::oneshot;
6
7use palladium_actor::{ActorPath, AddrHash};
8use palladium_transport::{InProcessTransport, TransportRegistry};
9
10use crate::engine::EngineHandle;
11use crate::introspection::{ActorInfo, ActorQuery, EngineSnapshot};
12use crate::placement::{CoreStats, PlacementMap};
13use crate::reactor::{Reactor, TokioReactor};
14use crate::registry::ActorRegistry;
15use crate::responses::ResponseRegistry;
16
17use super::errors::ShutdownError;
18
19use crate::fs::{FileSystem, TokioFileSystem};
20
21pub(crate) struct CoreHandle {
22 #[allow(dead_code)]
23 pub(crate) core_id: usize,
24 pub(crate) shutdown_tx: Option<oneshot::Sender<()>>,
25 pub(crate) thread: Option<std::thread::JoinHandle<()>>,
26 #[allow(dead_code)]
27 pub(crate) stats: Arc<CoreStats>,
28}
29
30pub struct MultiCoreHandle<F: FileSystem = TokioFileSystem, R: Reactor = TokioReactor> {
35 pub(crate) handles: Vec<CoreHandle>,
36 pub(crate) transport: Arc<InProcessTransport>,
37 pub(crate) transport_registry: Arc<TransportRegistry>,
38 pub(crate) registry: Arc<ActorRegistry<R>>,
39 pub(crate) responses: Arc<ResponseRegistry>,
40 pub(crate) placement: Arc<PlacementMap>,
41 pub(crate) start_time: Instant,
42 pub(crate) ask_timeout: Duration,
43 pub(crate) num_cores: usize,
44 pub(crate) per_core_stats: Vec<Arc<CoreStats>>,
45 pub(crate) internal_shutdown_rx: Option<oneshot::Receiver<()>>,
48 pub(crate) federated_routing: Option<Arc<crate::federation::FederatedRouting>>,
49 pub(crate) fs: F,
50 pub(crate) reactor: R,
51}
52
53impl<F: FileSystem, R: Reactor> MultiCoreHandle<F, R> {
54 pub fn engine_handle(&self) -> EngineHandle<R, palladium_transport::network::TokioNetwork, F> {
58 EngineHandle {
59 transport: self.transport.clone(),
60 transport_registry: self.transport_registry.clone(),
61 type_registry: self.transport_registry.type_registry(),
62 responses: self.responses.clone(),
63 registry: self.registry.clone(),
64 ask_timeout: self.ask_timeout,
65 reactor: self.reactor.clone(),
66 network: palladium_transport::network::TokioNetwork,
67 _fs: self.fs.clone(),
68 start_time: self.start_time,
69 source_addr: AddrHash::synthetic(b"engine-handle"),
70 federated_routing: self.federated_routing.clone(),
71 send_cache: Arc::new(dashmap::DashMap::new()),
72 }
73 }
74
75 pub fn transport(&self) -> &Arc<InProcessTransport> {
77 &self.transport
78 }
79
80 pub fn placement(&self) -> &Arc<PlacementMap> {
82 &self.placement
83 }
84
85 pub fn actor_list(&self, query: ActorQuery) -> Vec<ActorInfo> {
87 self.registry.snapshot(&query, 0)
88 }
89
90 pub fn actor_info(&self, path: &ActorPath) -> Option<ActorInfo> {
92 let core_id = self
93 .placement
94 .get(&AddrHash::new(path, 0).path_hash()) .map(|v| *v)
96 .unwrap_or(0);
97 self.registry.actor_info_by_path(path, core_id)
98 }
99
100 pub fn snapshot(&self) -> EngineSnapshot {
102 let actors = self.actor_list(ActorQuery::default());
103 EngineSnapshot {
104 num_cores: self.num_cores,
105 uptime_secs: self.start_time.elapsed().as_secs(),
106 actors,
107 }
108 }
109
110 pub fn core_stats(&self) -> &[Arc<CoreStats>] {
112 &self.per_core_stats
113 }
114
115 pub fn wait_for_actor(&self, path: &ActorPath, timeout: Duration) -> bool {
119 let deadline = Instant::now() + timeout;
120 while Instant::now() < deadline {
121 if self
122 .registry
123 .get_by_path(path)
124 .is_some_and(|s| s.running.load(Ordering::Relaxed))
125 {
126 return true;
127 }
128 std::thread::sleep(Duration::from_millis(10));
129 }
130 false
131 }
132
133 pub fn run(self) -> Result<(), ShutdownError> {
141 let reactor = self.reactor.clone();
142 let Self {
143 handles,
144 transport,
145 transport_registry,
146 registry,
147 responses,
148 placement,
149 start_time,
150 ask_timeout,
151 num_cores,
152 per_core_stats,
153 internal_shutdown_rx,
154 federated_routing,
155 fs,
156 reactor: _,
157 } = self;
158
159 if let Some(rx) = internal_shutdown_rx {
160 let rt = tokio::runtime::Builder::new_current_thread()
162 .enable_all()
163 .build()
164 .expect("failed to build shutdown-wait runtime");
165 let _ = rt.block_on(rx);
166 }
167
168 MultiCoreHandle {
169 handles,
170 transport,
171 transport_registry,
172 registry,
173 responses,
174 placement,
175 start_time,
176 ask_timeout,
177 num_cores,
178 per_core_stats,
179 internal_shutdown_rx: None,
180 federated_routing,
181 fs,
182 reactor,
183 }
184 .shutdown()
185 }
186
187 pub fn shutdown(mut self) -> Result<(), ShutdownError> {
192 for h in &mut self.handles {
193 if let Some(tx) = h.shutdown_tx.take() {
194 let _ = tx.send(());
195 }
196 }
197 let mut panicked = false;
198 for h in &mut self.handles {
199 if let Some(thread) = h.thread.take() {
200 if thread.join().is_err() {
201 panicked = true;
202 }
203 }
204 }
205 self.registry.cancel_all();
207 self.responses.drain();
208
209 if panicked {
210 Err(ShutdownError::ThreadPanic)
211 } else {
212 Ok(())
213 }
214 }
215}