1use std::collections::HashMap;
2use std::net::SocketAddr;
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use crate::fs::{FileSystem, TokioFileSystem};
7use dashmap::DashMap;
8use palladium_actor::{
9 Actor, ActorPath, AddrHash, AskError, ChildSpec, EngineId, Envelope, Message, MessagePayload,
10 PoolConfig, RemoteMessage, SendError, StopReason, WorkerPool,
11};
12use palladium_transport::network::{Network, TokioNetwork};
13use palladium_transport::{
14 InProcessTransport, MailboxMessage, MailboxReceiver, MailboxSender, QuicTransport,
15 QuicTransportConfig, TcpTransport, TcpTransportConfig, TransportError, TransportRegistry,
16 TypeRegistry,
17};
18
19use crate::addr::{make_ask_fn, make_remote_ask_fn, make_remote_send_fn, make_send_fn};
20use crate::common::LifecycleSignal;
21use crate::introspection::{ActorInfo, ActorQuery, EngineSnapshot};
22use crate::reactor::{Reactor, TokioReactor};
23use crate::registry::ActorRegistry;
24use crate::responses::ResponseRegistry;
25
26#[derive(Clone)]
28pub struct EngineHandle<
29 R: Reactor = TokioReactor,
30 N: Network = TokioNetwork,
31 F: FileSystem = TokioFileSystem,
32> {
33 pub(crate) transport: Arc<InProcessTransport>,
34 pub(crate) transport_registry: Arc<TransportRegistry>,
35 pub(crate) type_registry: TypeRegistry,
36 pub(crate) responses: Arc<ResponseRegistry>,
37 pub(crate) registry: Arc<ActorRegistry<R>>,
38 pub(crate) ask_timeout: Duration,
39 pub(crate) reactor: R,
40 pub(crate) network: N,
41 pub(crate) _fs: F,
42 pub(crate) start_time: Instant,
43 pub(crate) source_addr: AddrHash,
44 pub(crate) federated_routing: Option<Arc<crate::federation::FederatedRouting>>,
45 pub(crate) send_cache: Arc<DashMap<AddrHash, MailboxSender>>,
52 pub(crate) pump_rx: Arc<std::sync::Mutex<Option<MailboxReceiver>>>,
55}
56
57impl<R: Reactor, N: Network, F: FileSystem> EngineHandle<R, N, F> {
58 pub fn type_registry(&self) -> TypeRegistry {
59 self.type_registry.clone()
60 }
61
62 pub fn send_to<M: Message>(&self, target: AddrHash, msg: M) -> Result<(), SendError> {
68 let envelope = Envelope::new(self.source_addr, target, M::TYPE_TAG, 0);
69 let payload = MessagePayload::local(msg);
70
71 if let Some(sender) = self.send_cache.get(&target) {
73 return match sender.try_send(MailboxMessage { envelope, payload }) {
74 Ok(()) => Ok(()),
75 Err(TransportError::MailboxFull) => Err(SendError::MailboxFull),
76 Err(_) => {
77 drop(sender); self.send_cache.remove(&target);
80 Err(SendError::ActorStopped)
81 }
82 };
83 }
84
85 if let Some(sender) = self.transport_registry.get_local_sender(target) {
87 let result = sender.try_send(MailboxMessage { envelope, payload });
88 match result {
89 Ok(()) => {
90 self.send_cache.insert(target, sender);
91 return Ok(());
92 }
93 Err(TransportError::MailboxFull) => {
94 self.send_cache.insert(target, sender);
96 return Err(SendError::MailboxFull);
97 }
98 Err(_) => return Err(SendError::ActorStopped),
99 }
100 }
101
102 self.transport_registry
104 .try_route(envelope, payload)
105 .map_err(|e| match e {
106 TransportError::MailboxFull => SendError::MailboxFull,
107 TransportError::ConnectionFailed => SendError::ConnectionFailed,
108 TransportError::Unroutable(_) => SendError::Unroutable,
109 TransportError::SerializationRequired | TransportError::SerializationError(_) => {
110 SendError::SerializationFailed
111 }
112 _ => SendError::ActorStopped,
113 })
114 }
115
116 pub fn send_many<M: Message + Clone>(
124 &self,
125 target: AddrHash,
126 msg: M,
127 n: usize,
128 ) -> Result<usize, SendError> {
129 if n == 0 {
130 return Ok(0);
131 }
132
133 let make_batch = || {
134 let mut batch = Vec::with_capacity(n);
135 for _ in 0..n {
136 batch.push(MailboxMessage {
137 envelope: Envelope::new(self.source_addr, target, M::TYPE_TAG, 0),
138 payload: MessagePayload::local(msg.clone()),
139 });
140 }
141 batch
142 };
143
144 if let Some(sender) = self.send_cache.get(&target) {
146 let (sent, err, _remaining) = sender.try_send_batch_owned(make_batch());
147 return match err {
148 None => Ok(sent),
149 Some(TransportError::MailboxFull) => Ok(sent),
150 _ => {
151 drop(sender);
152 self.send_cache.remove(&target);
153 Err(SendError::ActorStopped)
154 }
155 };
156 }
157
158 if let Some(sender) = self.transport_registry.get_local_sender(target) {
160 let (sent, err, _remaining) = sender.try_send_batch_owned(make_batch());
161 match err {
162 None => {
163 self.send_cache.insert(target, sender);
164 return Ok(sent);
165 }
166 Some(TransportError::MailboxFull) => {
167 self.send_cache.insert(target, sender);
168 return Ok(sent);
169 }
170 _ => return Err(SendError::ActorStopped),
171 }
172 }
173
174 let mut sent = 0usize;
176 for _ in 0..n {
177 match self.send_to::<M>(target, msg.clone()) {
178 Ok(()) => sent += 1,
179 Err(SendError::MailboxFull) => return Ok(sent),
180 Err(e) => return Err(e),
181 }
182 }
183 Ok(sent)
184 }
185
186 pub fn addr_for<M: Message>(&self, target: AddrHash) -> palladium_actor::Addr<M>
188 where
189 R: Send + Sync,
190 {
191 let send_fn = make_send_fn::<M>(target, self.source_addr, self.transport_registry.clone());
192 let ask_fn = make_ask_fn::<M, R>(
193 target,
194 self.source_addr,
195 self.transport.clone(),
196 self.transport_registry.clone(),
197 self.responses.clone(),
198 self.ask_timeout,
199 self.reactor.clone(),
200 );
201 palladium_actor::Addr::with_handlers(target, send_fn, ask_fn)
202 }
203
204 pub fn remote_addr<M: RemoteMessage>(&self, target: AddrHash) -> palladium_actor::Addr<M> {
209 let registry = self.transport_registry.clone();
210 let source = self.source_addr;
211 let send_fn = Arc::new(move |msg: M| -> Result<(), SendError> {
212 if registry.is_local(target) {
213 return Err(SendError::PolicyViolation);
214 }
215 let bytes = bincode::serialize(&msg).map_err(|_| SendError::SerializationFailed)?;
216 let envelope = Envelope::new(source, target, M::TYPE_TAG, bytes.len() as u32);
217 registry
218 .try_route(envelope, MessagePayload::serialized(bytes))
219 .map_err(|e| match e {
220 TransportError::MailboxFull => SendError::MailboxFull,
221 TransportError::ConnectionFailed => SendError::ConnectionFailed,
222 TransportError::Unroutable(_) => SendError::Unroutable,
223 TransportError::SerializationRequired
224 | TransportError::SerializationError(_) => SendError::SerializationFailed,
225 _ => SendError::ActorStopped,
226 })
227 });
228 let ask_fn: palladium_actor::AskFn<M> =
229 Arc::new(|_msg: M| -> palladium_actor::AskFuture<M> {
230 Box::pin(async { Err(AskError::Unbound) })
231 });
232 palladium_actor::Addr::with_handlers(target, send_fn, ask_fn)
233 }
234
235 pub fn remote_addr_for<M: RemoteMessage>(&self, target: AddrHash) -> palladium_actor::Addr<M>
249 where
250 M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
251 R: Clone + Send + Sync,
252 {
253 let send_fn =
254 make_remote_send_fn::<M>(target, self.source_addr, self.transport_registry.clone());
255 let ask_fn = make_remote_ask_fn::<M, R>(
256 target,
257 self.source_addr,
258 self.transport_registry.clone(),
259 self.responses.clone(),
260 self.ask_timeout,
261 self.reactor.clone(),
262 Arc::clone(&self.pump_rx),
263 );
264 palladium_actor::Addr::with_handlers(target, send_fn, ask_fn)
265 }
266
267 pub fn transport(&self) -> &Arc<InProcessTransport> {
268 &self.transport
269 }
270
271 pub fn transport_registry(&self) -> &Arc<TransportRegistry> {
272 &self.transport_registry
273 }
274
275 pub async fn attach_quic_transport(
276 &self,
277 config: QuicTransportConfig,
278 peers: HashMap<EngineId, SocketAddr>,
279 ) -> Result<Arc<QuicTransport<R, N>>, TransportError>
280 where
281 R: Clone,
282 N: Clone,
283 {
284 let transport = QuicTransport::bind(
285 config,
286 peers,
287 self.transport_registry.clone(),
288 self.type_registry.clone(),
289 self.reactor.clone(),
290 self.network.clone(),
291 )
292 .await?;
293 self.transport_registry.add_transport(transport.clone())?;
294 Ok(transport)
295 }
296
297 pub async fn attach_tcp_transport(
298 &self,
299 config: TcpTransportConfig,
300 peers: HashMap<EngineId, SocketAddr>,
301 ) -> Result<Arc<TcpTransport<R, N>>, TransportError>
302 where
303 R: Clone,
304 N: Clone,
305 {
306 let transport = TcpTransport::bind(
307 config,
308 peers,
309 self.transport_registry.clone(),
310 self.type_registry.clone(),
311 self.reactor.clone(),
312 self.network.clone(),
313 )
314 .await?;
315 self.transport_registry.add_transport(transport.clone())?;
316 Ok(transport)
317 }
318
319 pub fn actor_list(&self, query: ActorQuery) -> Vec<ActorInfo> {
323 self.registry.snapshot(&query, 0)
324 }
325
326 pub fn actor_info(&self, path: &ActorPath) -> Option<ActorInfo> {
328 self.registry.actor_info_by_path(path, 0)
329 }
330
331 pub fn snapshot(&self) -> EngineSnapshot {
333 let actors = self.actor_list(ActorQuery::default());
334 let uptime_secs = self.start_time.elapsed().as_secs();
335 EngineSnapshot {
336 num_cores: 1,
337 uptime_secs,
338 actors,
339 }
340 }
341
342 pub fn lookup_path(&self, path: &ActorPath) -> Option<AddrHash> {
344 self.registry.get_by_path(path).map(|s| s.addr).or_else(|| {
345 self.federated_routing
346 .as_ref()
347 .and_then(|routing| routing.resolve_optional(path))
348 })
349 }
350
351 pub fn resolve_path(&self, path: &ActorPath) -> Result<AddrHash, SendError> {
353 if let Some(local) = self.registry.get_by_path(path).map(|s| s.addr) {
354 return Ok(local);
355 }
356 if let Some(routing) = &self.federated_routing {
357 return routing.resolve(path);
358 }
359 Err(SendError::Unroutable)
360 }
361
362 pub fn stop_actor(&self, path: &ActorPath) -> bool {
368 if let Some(slot) = self.registry.get_by_path(path) {
369 slot.ctrl_tx
370 .send(LifecycleSignal::Stop(StopReason::Supervisor))
371 .ok();
372 true
373 } else {
374 false
375 }
376 }
377
378 pub fn fill_mailbox(&self, path: &ActorPath) -> usize {
384 let addr = self.registry.get_by_path(path).map(|s| s.addr);
385 let Some(addr) = addr else { return 0 };
386 let src = AddrHash::synthetic(b"fault-fill-mailbox");
387 let mut count = 0;
388 loop {
389 let env = Envelope::new(src, addr, 0, 0);
390 match self.transport.try_deliver(env, MessagePayload::local(())) {
391 Ok(()) => count += 1,
392 Err(_) => break,
393 }
394 }
395 count
396 }
397
398 pub fn spawn_user_actor(&self, spec: ChildSpec<R>) {
400 let user_path = ActorPath::parse("/user").unwrap();
401 if let Some(slot) = self.registry.get_by_path(&user_path) {
402 slot.ctrl_tx.send(LifecycleSignal::SpawnChild(spec)).ok();
403 }
404 }
405
406 pub fn spawn_worker_pool<M, G>(&self, config: &PoolConfig, factory: G) -> WorkerPool<M>
424 where
425 M: Message,
426 R: Clone + Send + Sync + 'static,
427 G: Fn() -> Box<dyn Actor<R>> + Send + Sync + Clone + 'static,
428 {
429 let user_path = ActorPath::parse("/user").unwrap();
430 let handle = self.clone();
431 crate::worker_pool::spawn_worker_pool(
432 &user_path,
433 config,
434 factory,
435 self.source_addr,
436 &self.transport,
437 &self.transport_registry,
438 &self.responses,
439 &self.reactor,
440 move |spec| handle.spawn_user_actor(spec),
441 )
442 }
443}