palladium_runtime/engine/
runtime.rs1use std::sync::Arc;
2use std::time::Instant;
3
4use palladium_actor::{AddrHash, ChildSpec};
5use palladium_transport::{InProcessTransport, TransportRegistry, TypeRegistry};
6
7use crate::engine::config::EngineConfig;
8use crate::engine::init::build_engine;
9use crate::federation::FederatedRouting;
10use crate::fs::{FileSystem, TokioFileSystem};
11use crate::reactor::{Reactor, TokioReactor};
12use crate::registry::ActorRegistry;
13use crate::responses::ResponseRegistry;
14use palladium_transport::network::{Network, TokioNetwork};
15
16pub struct Engine<
22 R: Reactor = TokioReactor,
23 N: Network = TokioNetwork,
24 F: FileSystem = TokioFileSystem,
25> {
26 pub(crate) config: EngineConfig<R>,
27 pub(crate) transport: Arc<InProcessTransport>,
28 pub(crate) transport_registry: Arc<TransportRegistry>,
29 pub(crate) type_registry: TypeRegistry,
30 pub(crate) responses: Arc<ResponseRegistry>,
31 pub(crate) registry: Arc<ActorRegistry<R>>,
32 pub(crate) user_specs: Vec<ChildSpec<R>>,
33 pub(crate) system_specs: Vec<ChildSpec<R>>,
34 pub(crate) reactor: R,
35 pub(crate) network: N,
36 pub(crate) fs: F,
37 pub(crate) start_time: Instant,
38 pub(crate) federated_routing: Option<Arc<FederatedRouting>>,
39 pub(crate) cluster_membership:
40 Option<Arc<parking_lot::RwLock<palladium_federation::ClusterMembership>>>,
41 pub(crate) federated_registry:
42 Option<Arc<parking_lot::RwLock<palladium_federation::FederatedRegistry>>>,
43 pub(crate) source_addr: AddrHash,
45 pub(crate) pump_rx: Arc<std::sync::Mutex<Option<palladium_transport::MailboxReceiver>>>,
48}
49
50impl Engine<TokioReactor, TokioNetwork, TokioFileSystem> {
51 pub fn new() -> Self {
52 Self::with_config(EngineConfig::default())
53 }
54
55 pub fn with_config(config: EngineConfig<TokioReactor>) -> Self {
56 Self::with_reactor(config, TokioReactor, TokioNetwork, TokioFileSystem)
57 }
58}
59
60impl<R: Reactor, N: Network, F: FileSystem> Engine<R, N, F> {
61 pub fn with_reactor(config: EngineConfig<R>, reactor: R, network: N, fs: F) -> Self {
64 build_engine(
65 config,
66 reactor,
67 network,
68 fs,
69 Arc::new(InProcessTransport::new()),
70 )
71 }
72
73 pub fn with_shared_transport(
79 config: EngineConfig<R>,
80 reactor: R,
81 network: N,
82 fs: F,
83 transport: Arc<InProcessTransport>,
84 ) -> Self {
85 build_engine(config, reactor, network, fs, transport)
86 }
87
88 pub fn add_user_actor(&mut self, spec: ChildSpec<R>) {
90 self.user_specs.push(spec);
91 }
92
93 pub fn add_system_actor(&mut self, spec: ChildSpec<R>) {
95 self.system_specs.push(spec);
96 }
97
98 pub fn handle(&self) -> super::EngineHandle<R, N, F> {
101 super::EngineHandle {
102 transport: self.transport.clone(),
103 transport_registry: self.transport_registry.clone(),
104 type_registry: self.type_registry.clone(),
105 responses: self.responses.clone(),
106 registry: self.registry.clone(),
107 ask_timeout: self.config.ask_timeout,
108 reactor: self.reactor.clone(),
109 network: self.network.clone(),
110 _fs: self.fs.clone(),
111 start_time: self.start_time,
112 source_addr: self.source_addr,
113 federated_routing: self.federated_routing.clone(),
114 send_cache: Arc::new(dashmap::DashMap::new()),
115 pump_rx: Arc::clone(&self.pump_rx),
116 }
117 }
118
119 pub fn enable_federated_routing(
120 &mut self,
121 registry: Arc<parking_lot::RwLock<palladium_federation::FederatedRegistry>>,
122 policy: palladium_federation::FederationPolicy,
123 ) -> Arc<FederatedRouting> {
124 let routing = Arc::new(FederatedRouting::new(registry, policy));
125 self.federated_routing = Some(Arc::clone(&routing));
126 routing
127 }
128}
129
130impl Default for Engine<TokioReactor, TokioNetwork, TokioFileSystem> {
131 fn default() -> Self {
132 Self::new()
133 }
134}