Skip to main content

palladium_runtime/engine/
runtime.rs

1use std::sync::Arc;
2use std::time::Instant;
3
4use palladium_actor::{AddrHash, ChildSpec};
5use palladium_transport::{InProcessTransport, TransportRegistry, TypeRegistry};
6
7use crate::engine::config::EngineConfig;
8use crate::engine::init::build_engine;
9use crate::federation::FederatedRouting;
10use crate::fs::{FileSystem, TokioFileSystem};
11use crate::reactor::{Reactor, TokioReactor};
12use crate::registry::ActorRegistry;
13use crate::responses::ResponseRegistry;
14use palladium_transport::network::{Network, TokioNetwork};
15
16/// The single-engine actor runtime.
17///
18/// Generic over `R: Reactor`, `N: Network`, and `F: FileSystem`.
19/// Production code uses `TokioReactor`, `TokioNetwork`, and `TokioFileSystem`.
20/// Simulation tests swap these for virtual implementations.
21pub struct Engine<
22    R: Reactor = TokioReactor,
23    N: Network = TokioNetwork,
24    F: FileSystem = TokioFileSystem,
25> {
26    pub(crate) config: EngineConfig<R>,
27    pub(crate) transport: Arc<InProcessTransport>,
28    pub(crate) transport_registry: Arc<TransportRegistry>,
29    pub(crate) type_registry: TypeRegistry,
30    pub(crate) responses: Arc<ResponseRegistry>,
31    pub(crate) registry: Arc<ActorRegistry<R>>,
32    pub(crate) user_specs: Vec<ChildSpec<R>>,
33    pub(crate) system_specs: Vec<ChildSpec<R>>,
34    pub(crate) reactor: R,
35    pub(crate) network: N,
36    pub(crate) fs: F,
37    pub(crate) start_time: Instant,
38    pub(crate) federated_routing: Option<Arc<FederatedRouting>>,
39    pub(crate) cluster_membership:
40        Option<Arc<parking_lot::RwLock<palladium_federation::ClusterMembership>>>,
41    pub(crate) federated_registry:
42        Option<Arc<parking_lot::RwLock<palladium_federation::FederatedRegistry>>>,
43    /// Unique source address for this engine's handle, derived from `engine_id`.
44    pub(crate) source_addr: AddrHash,
45    /// Receiving end of the response pump mailbox.  Taken by the first
46    /// `make_remote_ask_fn` call, which spawns a pump task.
47    pub(crate) pump_rx: Arc<std::sync::Mutex<Option<palladium_transport::MailboxReceiver>>>,
48}
49
50impl Engine<TokioReactor, TokioNetwork, TokioFileSystem> {
51    pub fn new() -> Self {
52        Self::with_config(EngineConfig::default())
53    }
54
55    pub fn with_config(config: EngineConfig<TokioReactor>) -> Self {
56        Self::with_reactor(config, TokioReactor, TokioNetwork, TokioFileSystem)
57    }
58}
59
60impl<R: Reactor, N: Network, F: FileSystem> Engine<R, N, F> {
61    /// Create an engine with a custom reactor. Used by `pd-test` to inject
62    /// `SimReactor` for deterministic simulation (Phase 3).
63    pub fn with_reactor(config: EngineConfig<R>, reactor: R, network: N, fs: F) -> Self {
64        build_engine(
65            config,
66            reactor,
67            network,
68            fs,
69            Arc::new(InProcessTransport::new()),
70        )
71    }
72
73    /// Create an engine with a shared transport (and its own fresh registry).
74    ///
75    /// Used by `pd-test`'s `run_multi` to give all simulated engines access to
76    /// the same `InProcessTransport` so cross-engine actor-to-actor sends work
77    /// transparently without a real inter-core queue.
78    pub fn with_shared_transport(
79        config: EngineConfig<R>,
80        reactor: R,
81        network: N,
82        fs: F,
83        transport: Arc<InProcessTransport>,
84    ) -> Self {
85        build_engine(config, reactor, network, fs, transport)
86    }
87
88    /// Register an actor to be spawned under `/user` at launch.
89    pub fn add_user_actor(&mut self, spec: ChildSpec<R>) {
90        self.user_specs.push(spec);
91    }
92
93    /// Register an actor to be spawned under `/system` at launch.
94    pub fn add_system_actor(&mut self, spec: ChildSpec<R>) {
95        self.system_specs.push(spec);
96    }
97
98    /// Return an `EngineHandle` for sending messages, building `Addr<M>`s, and
99    /// injecting faults.
100    pub fn handle(&self) -> super::EngineHandle<R, N, F> {
101        super::EngineHandle {
102            transport: self.transport.clone(),
103            transport_registry: self.transport_registry.clone(),
104            type_registry: self.type_registry.clone(),
105            responses: self.responses.clone(),
106            registry: self.registry.clone(),
107            ask_timeout: self.config.ask_timeout,
108            reactor: self.reactor.clone(),
109            network: self.network.clone(),
110            _fs: self.fs.clone(),
111            start_time: self.start_time,
112            source_addr: self.source_addr,
113            federated_routing: self.federated_routing.clone(),
114            send_cache: Arc::new(dashmap::DashMap::new()),
115            pump_rx: Arc::clone(&self.pump_rx),
116        }
117    }
118
119    pub fn enable_federated_routing(
120        &mut self,
121        registry: Arc<parking_lot::RwLock<palladium_federation::FederatedRegistry>>,
122        policy: palladium_federation::FederationPolicy,
123    ) -> Arc<FederatedRouting> {
124        let routing = Arc::new(FederatedRouting::new(registry, policy));
125        self.federated_routing = Some(Arc::clone(&routing));
126        routing
127    }
128}
129
130impl Default for Engine<TokioReactor, TokioNetwork, TokioFileSystem> {
131    fn default() -> Self {
132        Self::new()
133    }
134}