palladium_runtime/multi_core/
handle.rs1use std::sync::atomic::Ordering;
2use std::sync::Arc;
3use std::time::{Duration, Instant};
4
5use tokio::sync::oneshot;
6
7use palladium_actor::{ActorPath, AddrHash};
8use palladium_transport::{InProcessTransport, MailboxReceiver, TransportRegistry};
9
10use crate::engine::EngineHandle;
11use crate::introspection::{ActorInfo, ActorQuery, EngineSnapshot};
12use crate::placement::{CoreStats, PlacementMap};
13use crate::reactor::{Reactor, TokioReactor};
14use crate::registry::ActorRegistry;
15use crate::responses::ResponseRegistry;
16
17use super::errors::ShutdownError;
18
19use crate::fs::{FileSystem, TokioFileSystem};
20
21pub(crate) struct CoreHandle {
22 #[allow(dead_code)]
23 pub(crate) core_id: usize,
24 pub(crate) shutdown_tx: Option<oneshot::Sender<()>>,
25 pub(crate) thread: Option<std::thread::JoinHandle<()>>,
26 #[allow(dead_code)]
27 pub(crate) stats: Arc<CoreStats>,
28}
29
30pub struct MultiCoreHandle<F: FileSystem = TokioFileSystem, R: Reactor = TokioReactor> {
35 pub(crate) handles: Vec<CoreHandle>,
36 pub(crate) transport: Arc<InProcessTransport>,
37 pub(crate) transport_registry: Arc<TransportRegistry>,
38 pub(crate) registry: Arc<ActorRegistry<R>>,
39 pub(crate) responses: Arc<ResponseRegistry>,
40 pub(crate) placement: Arc<PlacementMap>,
41 pub(crate) start_time: Instant,
42 pub(crate) ask_timeout: Duration,
43 pub(crate) num_cores: usize,
44 pub(crate) per_core_stats: Vec<Arc<CoreStats>>,
45 pub(crate) internal_shutdown_rx: Option<oneshot::Receiver<()>>,
48 pub(crate) federated_routing: Option<Arc<crate::federation::FederatedRouting>>,
49 pub(crate) fs: F,
50 pub(crate) reactor: R,
51}
52
53impl<F: FileSystem, R: Reactor> MultiCoreHandle<F, R> {
54 pub fn engine_handle(&self) -> EngineHandle<R, palladium_transport::network::TokioNetwork, F> {
58 EngineHandle {
59 transport: self.transport.clone(),
60 transport_registry: self.transport_registry.clone(),
61 type_registry: self.transport_registry.type_registry(),
62 responses: self.responses.clone(),
63 registry: self.registry.clone(),
64 ask_timeout: self.ask_timeout,
65 reactor: self.reactor.clone(),
66 network: palladium_transport::network::TokioNetwork,
67 _fs: self.fs.clone(),
68 start_time: self.start_time,
69 source_addr: AddrHash::synthetic(b"engine-handle"),
70 federated_routing: self.federated_routing.clone(),
71 send_cache: Arc::new(dashmap::DashMap::new()),
72 pump_rx: Arc::new(std::sync::Mutex::new(None::<MailboxReceiver>)),
75 }
76 }
77
78 pub fn transport(&self) -> &Arc<InProcessTransport> {
80 &self.transport
81 }
82
83 pub fn placement(&self) -> &Arc<PlacementMap> {
85 &self.placement
86 }
87
88 pub fn actor_list(&self, query: ActorQuery) -> Vec<ActorInfo> {
90 self.registry.snapshot(&query, 0)
91 }
92
93 pub fn actor_info(&self, path: &ActorPath) -> Option<ActorInfo> {
95 let core_id = self
96 .placement
97 .get(&AddrHash::new(path, 0).path_hash()) .map(|v| *v)
99 .unwrap_or(0);
100 self.registry.actor_info_by_path(path, core_id)
101 }
102
103 pub fn snapshot(&self) -> EngineSnapshot {
105 let actors = self.actor_list(ActorQuery::default());
106 EngineSnapshot {
107 num_cores: self.num_cores,
108 uptime_secs: self.start_time.elapsed().as_secs(),
109 actors,
110 }
111 }
112
113 pub fn core_stats(&self) -> &[Arc<CoreStats>] {
115 &self.per_core_stats
116 }
117
118 pub fn wait_for_actor(&self, path: &ActorPath, timeout: Duration) -> bool {
122 let deadline = Instant::now() + timeout;
123 while Instant::now() < deadline {
124 if self
125 .registry
126 .get_by_path(path)
127 .is_some_and(|s| s.running.load(Ordering::Relaxed))
128 {
129 return true;
130 }
131 std::thread::sleep(Duration::from_millis(10));
132 }
133 false
134 }
135
136 pub fn run(self) -> Result<(), ShutdownError> {
144 let reactor = self.reactor.clone();
145 let Self {
146 handles,
147 transport,
148 transport_registry,
149 registry,
150 responses,
151 placement,
152 start_time,
153 ask_timeout,
154 num_cores,
155 per_core_stats,
156 internal_shutdown_rx,
157 federated_routing,
158 fs,
159 reactor: _,
160 } = self;
161
162 if let Some(rx) = internal_shutdown_rx {
163 let rt = tokio::runtime::Builder::new_current_thread()
165 .enable_all()
166 .build()
167 .expect("failed to build shutdown-wait runtime");
168 let _ = rt.block_on(rx);
169 }
170
171 MultiCoreHandle {
172 handles,
173 transport,
174 transport_registry,
175 registry,
176 responses,
177 placement,
178 start_time,
179 ask_timeout,
180 num_cores,
181 per_core_stats,
182 internal_shutdown_rx: None,
183 federated_routing,
184 fs,
185 reactor,
186 }
187 .shutdown()
188 }
189
190 pub fn shutdown(mut self) -> Result<(), ShutdownError> {
195 for h in &mut self.handles {
196 if let Some(tx) = h.shutdown_tx.take() {
197 let _ = tx.send(());
198 }
199 }
200 let mut panicked = false;
201 for h in &mut self.handles {
202 if let Some(thread) = h.thread.take() {
203 if thread.join().is_err() {
204 panicked = true;
205 }
206 }
207 }
208 self.registry.cancel_all();
210 self.responses.drain();
211
212 if panicked {
213 Err(ShutdownError::ThreadPanic)
214 } else {
215 Ok(())
216 }
217 }
218}