Skip to main content

palladium_runtime/engine/
runtime.rs

1use std::sync::Arc;
2use std::time::Instant;
3
4use palladium_actor::{AddrHash, ChildSpec};
5use palladium_transport::{InProcessTransport, TransportRegistry, TypeRegistry};
6
7use crate::engine::config::EngineConfig;
8use crate::engine::init::build_engine;
9use crate::federation::FederatedRouting;
10use crate::fs::{FileSystem, TokioFileSystem};
11use crate::reactor::{Reactor, TokioReactor};
12use crate::registry::ActorRegistry;
13use crate::responses::ResponseRegistry;
14use palladium_transport::network::{Network, TokioNetwork};
15
16/// The single-engine actor runtime.
17///
18/// Generic over `R: Reactor`, `N: Network`, and `F: FileSystem`.
19/// Production code uses `TokioReactor`, `TokioNetwork`, and `TokioFileSystem`.
20/// Simulation tests swap these for virtual implementations.
21pub struct Engine<
22    R: Reactor = TokioReactor,
23    N: Network = TokioNetwork,
24    F: FileSystem = TokioFileSystem,
25> {
26    pub(crate) config: EngineConfig<R>,
27    pub(crate) transport: Arc<InProcessTransport>,
28    pub(crate) transport_registry: Arc<TransportRegistry>,
29    pub(crate) type_registry: TypeRegistry,
30    pub(crate) responses: Arc<ResponseRegistry>,
31    pub(crate) registry: Arc<ActorRegistry<R>>,
32    pub(crate) user_specs: Vec<ChildSpec<R>>,
33    pub(crate) system_specs: Vec<ChildSpec<R>>,
34    pub(crate) reactor: R,
35    pub(crate) network: N,
36    pub(crate) fs: F,
37    pub(crate) start_time: Instant,
38    pub(crate) federated_routing: Option<Arc<FederatedRouting>>,
39    pub(crate) cluster_membership:
40        Option<Arc<parking_lot::RwLock<palladium_federation::ClusterMembership>>>,
41    pub(crate) federated_registry:
42        Option<Arc<parking_lot::RwLock<palladium_federation::FederatedRegistry>>>,
43}
44
45impl Engine<TokioReactor, TokioNetwork, TokioFileSystem> {
46    pub fn new() -> Self {
47        Self::with_config(EngineConfig::default())
48    }
49
50    pub fn with_config(config: EngineConfig<TokioReactor>) -> Self {
51        Self::with_reactor(config, TokioReactor, TokioNetwork, TokioFileSystem)
52    }
53}
54
55impl<R: Reactor, N: Network, F: FileSystem> Engine<R, N, F> {
56    /// Create an engine with a custom reactor. Used by `pd-test` to inject
57    /// `SimReactor` for deterministic simulation (Phase 3).
58    pub fn with_reactor(config: EngineConfig<R>, reactor: R, network: N, fs: F) -> Self {
59        build_engine(
60            config,
61            reactor,
62            network,
63            fs,
64            Arc::new(InProcessTransport::new()),
65        )
66    }
67
68    /// Create an engine with a shared transport (and its own fresh registry).
69    ///
70    /// Used by `pd-test`'s `run_multi` to give all simulated engines access to
71    /// the same `InProcessTransport` so cross-engine actor-to-actor sends work
72    /// transparently without a real inter-core queue.
73    pub fn with_shared_transport(
74        config: EngineConfig<R>,
75        reactor: R,
76        network: N,
77        fs: F,
78        transport: Arc<InProcessTransport>,
79    ) -> Self {
80        build_engine(config, reactor, network, fs, transport)
81    }
82
83    /// Register an actor to be spawned under `/user` at launch.
84    pub fn add_user_actor(&mut self, spec: ChildSpec<R>) {
85        self.user_specs.push(spec);
86    }
87
88    /// Register an actor to be spawned under `/system` at launch.
89    pub fn add_system_actor(&mut self, spec: ChildSpec<R>) {
90        self.system_specs.push(spec);
91    }
92
93    /// Return an `EngineHandle` for sending messages, building `Addr<M>`s, and
94    /// injecting faults.
95    pub fn handle(&self) -> super::EngineHandle<R, N, F> {
96        super::EngineHandle {
97            transport: self.transport.clone(),
98            transport_registry: self.transport_registry.clone(),
99            type_registry: self.type_registry.clone(),
100            responses: self.responses.clone(),
101            registry: self.registry.clone(),
102            ask_timeout: self.config.ask_timeout,
103            reactor: self.reactor.clone(),
104            network: self.network.clone(),
105            _fs: self.fs.clone(),
106            start_time: self.start_time,
107            source_addr: AddrHash::synthetic(b"engine-handle"),
108            federated_routing: self.federated_routing.clone(),
109            send_cache: Arc::new(dashmap::DashMap::new()),
110        }
111    }
112
113    pub fn enable_federated_routing(
114        &mut self,
115        registry: Arc<parking_lot::RwLock<palladium_federation::FederatedRegistry>>,
116        policy: palladium_federation::FederationPolicy,
117    ) -> Arc<FederatedRouting> {
118        let routing = Arc::new(FederatedRouting::new(registry, policy));
119        self.federated_routing = Some(Arc::clone(&routing));
120        routing
121    }
122}
123
124impl Default for Engine<TokioReactor, TokioNetwork, TokioFileSystem> {
125    fn default() -> Self {
126        Self::new()
127    }
128}