1use std::collections::HashMap;
2use std::net::SocketAddr;
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use crate::fs::{FileSystem, TokioFileSystem};
7use dashmap::DashMap;
8use palladium_actor::{
9 ActorPath, AddrHash, AskError, ChildSpec, EngineId, Envelope, Message, MessagePayload,
10 RemoteMessage, SendError, StopReason,
11};
12use palladium_transport::network::{Network, TokioNetwork};
13use palladium_transport::{
14 InProcessTransport, MailboxMessage, MailboxSender, QuicTransport, QuicTransportConfig,
15 TcpTransport, TcpTransportConfig, TransportError, TransportRegistry, TypeRegistry,
16};
17
18use crate::addr::{make_ask_fn, make_send_fn};
19use crate::common::LifecycleSignal;
20use crate::introspection::{ActorInfo, ActorQuery, EngineSnapshot};
21use crate::reactor::{Reactor, TokioReactor};
22use crate::registry::ActorRegistry;
23use crate::responses::ResponseRegistry;
24
25#[derive(Clone)]
27pub struct EngineHandle<
28 R: Reactor = TokioReactor,
29 N: Network = TokioNetwork,
30 F: FileSystem = TokioFileSystem,
31> {
32 pub(crate) transport: Arc<InProcessTransport>,
33 pub(crate) transport_registry: Arc<TransportRegistry>,
34 pub(crate) type_registry: TypeRegistry,
35 pub(crate) responses: Arc<ResponseRegistry>,
36 pub(crate) registry: Arc<ActorRegistry<R>>,
37 pub(crate) ask_timeout: Duration,
38 pub(crate) reactor: R,
39 pub(crate) network: N,
40 pub(crate) _fs: F,
41 pub(crate) start_time: Instant,
42 pub(crate) source_addr: AddrHash,
43 pub(crate) federated_routing: Option<Arc<crate::federation::FederatedRouting>>,
44 pub(crate) send_cache: Arc<DashMap<AddrHash, MailboxSender>>,
51}
52
53impl<R: Reactor, N: Network, F: FileSystem> EngineHandle<R, N, F> {
54 pub fn type_registry(&self) -> TypeRegistry {
55 self.type_registry.clone()
56 }
57
58 pub fn send_to<M: Message>(&self, target: AddrHash, msg: M) -> Result<(), SendError> {
64 let envelope = Envelope::new(self.source_addr, target, M::TYPE_TAG, 0);
65 let payload = MessagePayload::local(msg);
66
67 if let Some(sender) = self.send_cache.get(&target) {
69 return match sender.try_send(MailboxMessage { envelope, payload }) {
70 Ok(()) => Ok(()),
71 Err(TransportError::MailboxFull) => Err(SendError::MailboxFull),
72 Err(_) => {
73 drop(sender); self.send_cache.remove(&target);
76 Err(SendError::ActorStopped)
77 }
78 };
79 }
80
81 if let Some(sender) = self.transport_registry.get_local_sender(target) {
83 let result = sender.try_send(MailboxMessage { envelope, payload });
84 match result {
85 Ok(()) => {
86 self.send_cache.insert(target, sender);
87 return Ok(());
88 }
89 Err(TransportError::MailboxFull) => {
90 self.send_cache.insert(target, sender);
92 return Err(SendError::MailboxFull);
93 }
94 Err(_) => return Err(SendError::ActorStopped),
95 }
96 }
97
98 self.transport_registry
100 .try_route(envelope, payload)
101 .map_err(|e| match e {
102 TransportError::MailboxFull => SendError::MailboxFull,
103 TransportError::ConnectionFailed => SendError::ConnectionFailed,
104 TransportError::Unroutable(_) => SendError::Unroutable,
105 TransportError::SerializationRequired | TransportError::SerializationError(_) => {
106 SendError::SerializationFailed
107 }
108 _ => SendError::ActorStopped,
109 })
110 }
111
112 pub fn send_many<M: Message + Clone>(
120 &self,
121 target: AddrHash,
122 msg: M,
123 n: usize,
124 ) -> Result<usize, SendError> {
125 if n == 0 {
126 return Ok(0);
127 }
128
129 let make_batch = || {
130 let mut batch = Vec::with_capacity(n);
131 for _ in 0..n {
132 batch.push(MailboxMessage {
133 envelope: Envelope::new(self.source_addr, target, M::TYPE_TAG, 0),
134 payload: MessagePayload::local(msg.clone()),
135 });
136 }
137 batch
138 };
139
140 if let Some(sender) = self.send_cache.get(&target) {
142 let (sent, err, _remaining) = sender.try_send_batch_owned(make_batch());
143 return match err {
144 None => Ok(sent),
145 Some(TransportError::MailboxFull) => Ok(sent),
146 _ => {
147 drop(sender);
148 self.send_cache.remove(&target);
149 Err(SendError::ActorStopped)
150 }
151 };
152 }
153
154 if let Some(sender) = self.transport_registry.get_local_sender(target) {
156 let (sent, err, _remaining) = sender.try_send_batch_owned(make_batch());
157 match err {
158 None => {
159 self.send_cache.insert(target, sender);
160 return Ok(sent);
161 }
162 Some(TransportError::MailboxFull) => {
163 self.send_cache.insert(target, sender);
164 return Ok(sent);
165 }
166 _ => return Err(SendError::ActorStopped),
167 }
168 }
169
170 let mut sent = 0usize;
172 for _ in 0..n {
173 match self.send_to::<M>(target, msg.clone()) {
174 Ok(()) => sent += 1,
175 Err(SendError::MailboxFull) => return Ok(sent),
176 Err(e) => return Err(e),
177 }
178 }
179 Ok(sent)
180 }
181
182 pub fn addr_for<M: Message>(&self, target: AddrHash) -> palladium_actor::Addr<M>
184 where
185 R: Send + Sync,
186 {
187 let send_fn = make_send_fn::<M>(target, self.source_addr, self.transport_registry.clone());
188 let ask_fn = make_ask_fn::<M, R>(
189 target,
190 self.source_addr,
191 self.transport.clone(),
192 self.transport_registry.clone(),
193 self.responses.clone(),
194 self.ask_timeout,
195 self.reactor.clone(),
196 );
197 palladium_actor::Addr::with_handlers(target, send_fn, ask_fn)
198 }
199
200 pub fn remote_addr<M: RemoteMessage>(&self, target: AddrHash) -> palladium_actor::Addr<M> {
205 let registry = self.transport_registry.clone();
206 let source = self.source_addr;
207 let send_fn = Arc::new(move |msg: M| -> Result<(), SendError> {
208 if registry.is_local(target) {
209 return Err(SendError::PolicyViolation);
210 }
211 let bytes = bincode::serialize(&msg).map_err(|_| SendError::SerializationFailed)?;
212 let envelope = Envelope::new(source, target, M::TYPE_TAG, bytes.len() as u32);
213 registry
214 .try_route(envelope, MessagePayload::serialized(bytes))
215 .map_err(|e| match e {
216 TransportError::MailboxFull => SendError::MailboxFull,
217 TransportError::ConnectionFailed => SendError::ConnectionFailed,
218 TransportError::Unroutable(_) => SendError::Unroutable,
219 TransportError::SerializationRequired
220 | TransportError::SerializationError(_) => SendError::SerializationFailed,
221 _ => SendError::ActorStopped,
222 })
223 });
224 let ask_fn: palladium_actor::AskFn<M> =
225 Arc::new(|_msg: M| -> palladium_actor::AskFuture<M> {
226 Box::pin(async { Err(AskError::Unbound) })
227 });
228 palladium_actor::Addr::with_handlers(target, send_fn, ask_fn)
229 }
230
231 pub fn transport(&self) -> &Arc<InProcessTransport> {
232 &self.transport
233 }
234
235 pub fn transport_registry(&self) -> &Arc<TransportRegistry> {
236 &self.transport_registry
237 }
238
239 pub async fn attach_quic_transport(
240 &self,
241 config: QuicTransportConfig,
242 peers: HashMap<EngineId, SocketAddr>,
243 ) -> Result<Arc<QuicTransport<R, N>>, TransportError>
244 where
245 R: Clone,
246 N: Clone,
247 {
248 let transport = QuicTransport::bind(
249 config,
250 peers,
251 self.transport_registry.clone(),
252 self.type_registry.clone(),
253 self.reactor.clone(),
254 self.network.clone(),
255 )
256 .await?;
257 self.transport_registry.add_transport(transport.clone())?;
258 Ok(transport)
259 }
260
261 pub async fn attach_tcp_transport(
262 &self,
263 config: TcpTransportConfig,
264 peers: HashMap<EngineId, SocketAddr>,
265 ) -> Result<Arc<TcpTransport<R, N>>, TransportError>
266 where
267 R: Clone,
268 N: Clone,
269 {
270 let transport = TcpTransport::bind(
271 config,
272 peers,
273 self.transport_registry.clone(),
274 self.type_registry.clone(),
275 self.reactor.clone(),
276 self.network.clone(),
277 )
278 .await?;
279 self.transport_registry.add_transport(transport.clone())?;
280 Ok(transport)
281 }
282
283 pub fn actor_list(&self, query: ActorQuery) -> Vec<ActorInfo> {
287 self.registry.snapshot(&query, 0)
288 }
289
290 pub fn actor_info(&self, path: &ActorPath) -> Option<ActorInfo> {
292 self.registry.actor_info_by_path(path, 0)
293 }
294
295 pub fn snapshot(&self) -> EngineSnapshot {
297 let actors = self.actor_list(ActorQuery::default());
298 let uptime_secs = self.start_time.elapsed().as_secs();
299 EngineSnapshot {
300 num_cores: 1,
301 uptime_secs,
302 actors,
303 }
304 }
305
306 pub fn lookup_path(&self, path: &ActorPath) -> Option<AddrHash> {
308 self.registry.get_by_path(path).map(|s| s.addr).or_else(|| {
309 self.federated_routing
310 .as_ref()
311 .and_then(|routing| routing.resolve_optional(path))
312 })
313 }
314
315 pub fn resolve_path(&self, path: &ActorPath) -> Result<AddrHash, SendError> {
317 if let Some(local) = self.registry.get_by_path(path).map(|s| s.addr) {
318 return Ok(local);
319 }
320 if let Some(routing) = &self.federated_routing {
321 return routing.resolve(path);
322 }
323 Err(SendError::Unroutable)
324 }
325
326 pub fn stop_actor(&self, path: &ActorPath) -> bool {
332 if let Some(slot) = self.registry.get_by_path(path) {
333 slot.ctrl_tx
334 .send(LifecycleSignal::Stop(StopReason::Supervisor))
335 .ok();
336 true
337 } else {
338 false
339 }
340 }
341
342 pub fn fill_mailbox(&self, path: &ActorPath) -> usize {
348 let addr = self.registry.get_by_path(path).map(|s| s.addr);
349 let Some(addr) = addr else { return 0 };
350 let src = AddrHash::synthetic(b"fault-fill-mailbox");
351 let mut count = 0;
352 loop {
353 let env = Envelope::new(src, addr, 0, 0);
354 match self.transport.try_deliver(env, MessagePayload::local(())) {
355 Ok(()) => count += 1,
356 Err(_) => break,
357 }
358 }
359 count
360 }
361
362 pub fn spawn_user_actor(&self, spec: ChildSpec<R>) {
364 let user_path = ActorPath::parse("/user").unwrap();
365 if let Some(slot) = self.registry.get_by_path(&user_path) {
366 slot.ctrl_tx.send(LifecycleSignal::SpawnChild(spec)).ok();
367 }
368 }
369}