Skip to main content

palladium_runtime/engine/
run.rs

1use std::sync::{Arc, Mutex};
2use std::time::{Duration, Instant};
3
4use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
5
6use palladium_actor::{
7    ActorPath, AddrHash, ChildSpec, NamespacePolicy, RestartPolicy, ShutdownPolicy, StopReason,
8};
9use palladium_transport::{InProcessTransport, TransportRegistry};
10
11use crate::common::{ChildEvent, LifecycleSignal};
12use crate::control_plane::ControlPlaneActor;
13use crate::engine::config::EngineConfig;
14use crate::engine::runtime::Engine;
15use crate::federation::FederatedRouting;
16use crate::fs::FileSystem;
17use crate::reactor::Reactor;
18use crate::registry::ActorRegistry;
19use crate::responses::ResponseRegistry;
20use crate::supervisor::{supervisor_task, RestartIntensity, SupervisionStrategy};
21use palladium_transport::network::Network;
22
23struct ControlPlaneSpecDeps<R: Reactor, N: Network, F: FileSystem> {
24    registry: Arc<ActorRegistry<R>>,
25    responses: Arc<ResponseRegistry>,
26    start_time: Instant,
27    internal_shutdown_tx: Arc<Mutex<Option<tokio::sync::oneshot::Sender<()>>>>,
28    cluster_membership: Option<Arc<parking_lot::RwLock<palladium_federation::ClusterMembership>>>,
29    federated_registry: Option<Arc<parking_lot::RwLock<palladium_federation::FederatedRegistry>>>,
30    federation_policy: Option<Arc<parking_lot::RwLock<palladium_federation::FederationPolicy>>>,
31    reactor: R,
32    network: N,
33    fs: F,
34}
35
36struct SupervisorSpawnDeps<R: Reactor, F: FileSystem> {
37    transport: Arc<InProcessTransport>,
38    transport_registry: Arc<TransportRegistry>,
39    responses: Arc<ResponseRegistry>,
40    registry: Arc<ActorRegistry<R>>,
41    reactor: R,
42    fs: F,
43    federation: Option<Arc<FederatedRouting>>,
44}
45
46impl<R: Reactor, N: Network, F: FileSystem> Engine<R, N, F> {
47    /// Launch the engine on a new `current_thread` Tokio runtime.
48    ///
49    /// Blocks until `shutdown_rx` resolves. Use the returned `EngineHandle`
50    /// to interact with actors from the calling thread before calling `run`.
51    pub fn run(self, shutdown_rx: tokio::sync::oneshot::Receiver<()>) {
52        let rt = tokio::runtime::Builder::new_current_thread()
53            .enable_all()
54            .build()
55            .expect("failed to build Tokio runtime");
56
57        let local = tokio::task::LocalSet::new();
58        local.block_on(&rt, self.run_async(shutdown_rx));
59    }
60
61    pub async fn run_async(self, mut shutdown_rx: tokio::sync::oneshot::Receiver<()>) {
62        let reactor = self.reactor.clone();
63        let network = self.network.clone();
64        let transport = self.transport;
65        let config = self.config;
66
67        let (internal_shutdown_tx, mut internal_shutdown_rx) =
68            tokio::sync::oneshot::channel::<()>();
69        let internal_shutdown_tx = Arc::new(Mutex::new(Some(internal_shutdown_tx)));
70
71        let mut system_specs = self.system_specs;
72        let tcp_addr = config
73            .control_plane_tcp_addr
74            .as_deref()
75            .and_then(|addr| crate::control_plane::parse_socket_addr(addr).ok());
76        let quic_addr = config
77            .control_plane_quic_addr
78            .as_deref()
79            .and_then(|addr| crate::control_plane::parse_socket_addr(addr).ok());
80        let cp_deps = ControlPlaneSpecDeps {
81            registry: Arc::clone(&self.registry),
82            responses: Arc::clone(&self.responses),
83            start_time: self.start_time,
84            internal_shutdown_tx: Arc::clone(&internal_shutdown_tx),
85            cluster_membership: self.cluster_membership.clone(),
86            federated_registry: self.federated_registry.clone(),
87            federation_policy: self.federated_routing.as_ref().map(|r| r.policy()),
88            reactor: reactor.clone(),
89            network: network.clone(),
90            fs: self.fs.clone(),
91        };
92
93        if let Some(spec) = control_plane_spec(&config, tcp_addr, quic_addr, cp_deps) {
94            system_specs.push(spec);
95        }
96
97        maybe_spawn_gossip(
98            &config,
99            self.cluster_membership.clone(),
100            self.federated_registry.clone(),
101            reactor.clone(),
102            network.clone(),
103        );
104
105        let sup_deps = SupervisorSpawnDeps {
106            transport: Arc::clone(&transport),
107            transport_registry: Arc::clone(&self.transport_registry),
108            responses: Arc::clone(&self.responses),
109            registry: Arc::clone(&self.registry),
110            reactor: reactor.clone(),
111            fs: self.fs.clone(),
112            federation: self.federated_routing.clone(),
113        };
114        let sys_ctrl_tx = spawn_supervisor(
115            ActorPath::parse("/system").unwrap(),
116            config.system_strategy,
117            config.intensity.clone(),
118            system_specs,
119            &sup_deps,
120        );
121        let user_ctrl_tx = spawn_supervisor(
122            ActorPath::parse("/user").unwrap(),
123            config.user_strategy,
124            config.intensity.clone(),
125            self.user_specs,
126            &sup_deps,
127        );
128
129        tokio::select! {
130            _ = &mut shutdown_rx => {},
131            _ = &mut internal_shutdown_rx => {},
132        }
133
134        user_ctrl_tx
135            .send(LifecycleSignal::Stop(StopReason::Shutdown))
136            .ok();
137        sys_ctrl_tx
138            .send(LifecycleSignal::Stop(StopReason::Shutdown))
139            .ok();
140
141        palladium_actor::Reactor::sleep(&reactor, config.shutdown_timeout).await;
142
143        self.registry.cancel_all();
144        self.responses.drain();
145    }
146}
147
148fn control_plane_spec<R: Reactor, N: Network, F: FileSystem>(
149    config: &EngineConfig<R>,
150    tcp_addr: Option<std::net::SocketAddr>,
151    quic_addr: Option<std::net::SocketAddr>,
152    deps: ControlPlaneSpecDeps<R, N, F>,
153) -> Option<ChildSpec<R>> {
154    if config.control_plane_socket.is_none() && tcp_addr.is_none() && quic_addr.is_none() {
155        return None;
156    }
157
158    let cp_path = ActorPath::parse("/system/control-plane").unwrap();
159    let ns = NamespacePolicy::default_for(&cp_path).unwrap();
160    let prpc = config.plugin_rpc.clone();
161    let actor_spawn = config.actor_spawn.clone();
162    let engine_id = config.engine_id.clone();
163    let consensus_engine = config.consensus_engine.clone();
164    let consensus_groups = config.consensus_groups.clone();
165    let consensus_group_meta = config.consensus_group_meta.clone();
166    let consensus_engine_info = config.consensus_engine_info.clone();
167    let socket_path = config.control_plane_socket.clone();
168    let tls = config.control_plane_tls.clone();
169    let reg = deps.registry.clone();
170    let responses = deps.responses.clone();
171    let tx = deps.internal_shutdown_tx;
172    let r = deps.reactor.clone();
173    let n = deps.network.clone();
174    let f = deps.fs.clone();
175    let start_time = deps.start_time;
176    let cluster_membership = deps.cluster_membership;
177    let federated_registry = deps.federated_registry;
178    let federation_policy = deps.federation_policy;
179
180    Some(ChildSpec::new(
181        "control-plane",
182        RestartPolicy::Permanent,
183        ShutdownPolicy::Timeout(Duration::from_secs(5)),
184        ns,
185        move || {
186            let tx_final = tx.lock().unwrap().take();
187            Box::new(ControlPlaneActor::<R, N, F>::new(
188                socket_path.clone(),
189                tcp_addr,
190                quic_addr,
191                tls.clone(),
192                Arc::clone(&reg),
193                Arc::clone(&responses),
194                start_time,
195                tx_final,
196                1,
197                prpc.clone(),
198                actor_spawn.clone(),
199                cluster_membership.clone(),
200                federated_registry.clone(),
201                federation_policy.clone(),
202                engine_id.clone(),
203                consensus_engine.clone(),
204                consensus_groups.clone(),
205                consensus_group_meta.clone(),
206                consensus_engine_info.clone(),
207                r.clone(),
208                n.clone(),
209                f.clone(),
210            ))
211        },
212    ))
213}
214
215fn maybe_spawn_gossip<R: Reactor, N: Network>(
216    config: &EngineConfig<R>,
217    cluster_membership: Option<Arc<parking_lot::RwLock<palladium_federation::ClusterMembership>>>,
218    federated_registry: Option<Arc<parking_lot::RwLock<palladium_federation::FederatedRegistry>>>,
219    reactor: R,
220    network: N,
221) {
222    if !config.federation_enabled {
223        return;
224    }
225    if let (Some(membership), Some(registry)) = (cluster_membership, federated_registry) {
226        if let Some(bind_addr) = config
227            .federation_bind_addr
228            .as_ref()
229            .and_then(|s| s.parse().ok())
230        {
231            crate::gossip::spawn_gossip_service(
232                crate::gossip::GossipConfig {
233                    bind_addr,
234                    seed_nodes: config
235                        .federation_seed_nodes
236                        .iter()
237                        .filter_map(|s| s.parse().ok())
238                        .collect(),
239                    gossip_interval: config.federation_membership.gossip_interval,
240                    fanout: config.federation_membership.fanout,
241                },
242                membership,
243                registry,
244                reactor,
245                network,
246            );
247        } else {
248            eprintln!("federation_enabled=true but federation_bind_addr invalid");
249        }
250    }
251}
252
253fn spawn_supervisor<R: Reactor, F: FileSystem>(
254    path: ActorPath,
255    strategy: SupervisionStrategy,
256    intensity: RestartIntensity,
257    specs: Vec<ChildSpec<R>>,
258    deps: &SupervisorSpawnDeps<R, F>,
259) -> UnboundedSender<LifecycleSignal<R>> {
260    let (ctrl_tx, ctrl_rx) = unbounded_channel::<LifecycleSignal<R>>();
261    let (event_tx, event_rx) = unbounded_channel::<ChildEvent<R>>();
262    let path_for_task = path.clone();
263    let reg_for_task = Arc::clone(&deps.registry);
264    let reactor_for_task = deps.reactor.clone();
265    let fs_for_task = deps.fs.clone();
266    let transport = Arc::clone(&deps.transport);
267    let transport_registry = Arc::clone(&deps.transport_registry);
268    let responses = Arc::clone(&deps.responses);
269    let federation = deps.federation.clone();
270    let task_handle = palladium_actor::Reactor::spawn_local(
271        &deps.reactor,
272        Box::pin(async move {
273            supervisor_task(
274                path_for_task,
275                strategy,
276                intensity,
277                ctrl_rx,
278                event_rx,
279                event_tx,
280                None,
281                transport,
282                transport_registry,
283                responses,
284                reg_for_task,
285                reactor_for_task,
286                fs_for_task,
287                federation,
288                specs,
289            )
290            .await;
291        }),
292    );
293
294    let addr = AddrHash::new(&path, 0);
295    let slot = crate::registry::ActorSlot {
296        path,
297        addr,
298        mailbox_tx: palladium_transport::mailbox(1).0,
299        ctrl_tx: ctrl_tx.clone(),
300        task_handle: Arc::new(parking_lot::Mutex::new(Some(task_handle))),
301        supervisor_addr: None,
302        running: Arc::new(std::sync::atomic::AtomicBool::new(true)),
303        restart_count: Arc::new(std::sync::atomic::AtomicU32::new(0)),
304        message_count: Arc::new(std::sync::atomic::AtomicU64::new(0)),
305        compute_time_ns: Arc::new(std::sync::atomic::AtomicU64::new(0)),
306    };
307    deps.registry.insert(slot);
308
309    ctrl_tx
310}