1use std::sync::{Arc, Mutex};
2use std::time::{Duration, Instant};
3
4use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
5
6use palladium_actor::{
7 ActorPath, AddrHash, ChildSpec, NamespacePolicy, RestartPolicy, ShutdownPolicy, StopReason,
8};
9use palladium_transport::{InProcessTransport, TransportRegistry};
10
11use crate::common::{ChildEvent, LifecycleSignal};
12use crate::control_plane::ControlPlaneActor;
13use crate::engine::config::EngineConfig;
14use crate::engine::runtime::Engine;
15use crate::federation::FederatedRouting;
16use crate::fs::FileSystem;
17use crate::reactor::Reactor;
18use crate::registry::ActorRegistry;
19use crate::responses::ResponseRegistry;
20use crate::supervisor::{supervisor_task, RestartIntensity, SupervisionStrategy};
21use palladium_transport::network::Network;
22
23struct ControlPlaneSpecDeps<R: Reactor, N: Network, F: FileSystem> {
24 registry: Arc<ActorRegistry<R>>,
25 responses: Arc<ResponseRegistry>,
26 start_time: Instant,
27 internal_shutdown_tx: Arc<Mutex<Option<tokio::sync::oneshot::Sender<()>>>>,
28 cluster_membership: Option<Arc<parking_lot::RwLock<palladium_federation::ClusterMembership>>>,
29 federated_registry: Option<Arc<parking_lot::RwLock<palladium_federation::FederatedRegistry>>>,
30 federation_policy: Option<Arc<parking_lot::RwLock<palladium_federation::FederationPolicy>>>,
31 reactor: R,
32 network: N,
33 fs: F,
34}
35
36struct SupervisorSpawnDeps<R: Reactor, F: FileSystem> {
37 transport: Arc<InProcessTransport>,
38 transport_registry: Arc<TransportRegistry>,
39 responses: Arc<ResponseRegistry>,
40 registry: Arc<ActorRegistry<R>>,
41 reactor: R,
42 fs: F,
43 federation: Option<Arc<FederatedRouting>>,
44}
45
46impl<R: Reactor, N: Network, F: FileSystem> Engine<R, N, F> {
47 pub fn run(self, shutdown_rx: tokio::sync::oneshot::Receiver<()>) {
52 let rt = tokio::runtime::Builder::new_current_thread()
53 .enable_all()
54 .build()
55 .expect("failed to build Tokio runtime");
56
57 let local = tokio::task::LocalSet::new();
58 local.block_on(&rt, self.run_async(shutdown_rx));
59 }
60
61 pub async fn run_async(self, mut shutdown_rx: tokio::sync::oneshot::Receiver<()>) {
62 let reactor = self.reactor.clone();
63 let network = self.network.clone();
64 let transport = self.transport;
65 let config = self.config;
66
67 let (internal_shutdown_tx, mut internal_shutdown_rx) =
68 tokio::sync::oneshot::channel::<()>();
69 let internal_shutdown_tx = Arc::new(Mutex::new(Some(internal_shutdown_tx)));
70
71 let mut system_specs = self.system_specs;
72 let tcp_addr = config
73 .control_plane_tcp_addr
74 .as_deref()
75 .and_then(|addr| crate::control_plane::parse_socket_addr(addr).ok());
76 let quic_addr = config
77 .control_plane_quic_addr
78 .as_deref()
79 .and_then(|addr| crate::control_plane::parse_socket_addr(addr).ok());
80 let cp_deps = ControlPlaneSpecDeps {
81 registry: Arc::clone(&self.registry),
82 responses: Arc::clone(&self.responses),
83 start_time: self.start_time,
84 internal_shutdown_tx: Arc::clone(&internal_shutdown_tx),
85 cluster_membership: self.cluster_membership.clone(),
86 federated_registry: self.federated_registry.clone(),
87 federation_policy: self.federated_routing.as_ref().map(|r| r.policy()),
88 reactor: reactor.clone(),
89 network: network.clone(),
90 fs: self.fs.clone(),
91 };
92
93 if let Some(spec) = control_plane_spec(&config, tcp_addr, quic_addr, cp_deps) {
94 system_specs.push(spec);
95 }
96
97 maybe_spawn_gossip(
98 &config,
99 self.cluster_membership.clone(),
100 self.federated_registry.clone(),
101 reactor.clone(),
102 network.clone(),
103 );
104
105 let sup_deps = SupervisorSpawnDeps {
106 transport: Arc::clone(&transport),
107 transport_registry: Arc::clone(&self.transport_registry),
108 responses: Arc::clone(&self.responses),
109 registry: Arc::clone(&self.registry),
110 reactor: reactor.clone(),
111 fs: self.fs.clone(),
112 federation: self.federated_routing.clone(),
113 };
114 let sys_ctrl_tx = spawn_supervisor(
115 ActorPath::parse("/system").unwrap(),
116 config.system_strategy,
117 config.intensity.clone(),
118 system_specs,
119 &sup_deps,
120 );
121 let user_ctrl_tx = spawn_supervisor(
122 ActorPath::parse("/user").unwrap(),
123 config.user_strategy,
124 config.intensity.clone(),
125 self.user_specs,
126 &sup_deps,
127 );
128
129 tokio::select! {
130 _ = &mut shutdown_rx => {},
131 _ = &mut internal_shutdown_rx => {},
132 }
133
134 user_ctrl_tx
135 .send(LifecycleSignal::Stop(StopReason::Shutdown))
136 .ok();
137 sys_ctrl_tx
138 .send(LifecycleSignal::Stop(StopReason::Shutdown))
139 .ok();
140
141 palladium_actor::Reactor::sleep(&reactor, config.shutdown_timeout).await;
142
143 self.registry.cancel_all();
144 self.responses.drain();
145 }
146}
147
148fn control_plane_spec<R: Reactor, N: Network, F: FileSystem>(
149 config: &EngineConfig<R>,
150 tcp_addr: Option<std::net::SocketAddr>,
151 quic_addr: Option<std::net::SocketAddr>,
152 deps: ControlPlaneSpecDeps<R, N, F>,
153) -> Option<ChildSpec<R>> {
154 if config.control_plane_socket.is_none() && tcp_addr.is_none() && quic_addr.is_none() {
155 return None;
156 }
157
158 let cp_path = ActorPath::parse("/system/control-plane").unwrap();
159 let ns = NamespacePolicy::default_for(&cp_path).unwrap();
160 let prpc = config.plugin_rpc.clone();
161 let actor_spawn = config.actor_spawn.clone();
162 let engine_id = config.engine_id.clone();
163 let consensus_engine = config.consensus_engine.clone();
164 let consensus_groups = config.consensus_groups.clone();
165 let consensus_group_meta = config.consensus_group_meta.clone();
166 let consensus_engine_info = config.consensus_engine_info.clone();
167 let socket_path = config.control_plane_socket.clone();
168 let tls = config.control_plane_tls.clone();
169 let reg = deps.registry.clone();
170 let responses = deps.responses.clone();
171 let tx = deps.internal_shutdown_tx;
172 let r = deps.reactor.clone();
173 let n = deps.network.clone();
174 let f = deps.fs.clone();
175 let start_time = deps.start_time;
176 let cluster_membership = deps.cluster_membership;
177 let federated_registry = deps.federated_registry;
178 let federation_policy = deps.federation_policy;
179
180 Some(ChildSpec::new(
181 "control-plane",
182 RestartPolicy::Permanent,
183 ShutdownPolicy::Timeout(Duration::from_secs(5)),
184 ns,
185 move || {
186 let tx_final = tx.lock().unwrap().take();
187 Box::new(ControlPlaneActor::<R, N, F>::new(
188 socket_path.clone(),
189 tcp_addr,
190 quic_addr,
191 tls.clone(),
192 Arc::clone(®),
193 Arc::clone(&responses),
194 start_time,
195 tx_final,
196 1,
197 prpc.clone(),
198 actor_spawn.clone(),
199 cluster_membership.clone(),
200 federated_registry.clone(),
201 federation_policy.clone(),
202 engine_id.clone(),
203 consensus_engine.clone(),
204 consensus_groups.clone(),
205 consensus_group_meta.clone(),
206 consensus_engine_info.clone(),
207 r.clone(),
208 n.clone(),
209 f.clone(),
210 ))
211 },
212 ))
213}
214
215fn maybe_spawn_gossip<R: Reactor, N: Network>(
216 config: &EngineConfig<R>,
217 cluster_membership: Option<Arc<parking_lot::RwLock<palladium_federation::ClusterMembership>>>,
218 federated_registry: Option<Arc<parking_lot::RwLock<palladium_federation::FederatedRegistry>>>,
219 reactor: R,
220 network: N,
221) {
222 if !config.federation_enabled {
223 return;
224 }
225 if let (Some(membership), Some(registry)) = (cluster_membership, federated_registry) {
226 if let Some(bind_addr) = config
227 .federation_bind_addr
228 .as_ref()
229 .and_then(|s| s.parse().ok())
230 {
231 crate::gossip::spawn_gossip_service(
232 crate::gossip::GossipConfig {
233 bind_addr,
234 seed_nodes: config
235 .federation_seed_nodes
236 .iter()
237 .filter_map(|s| s.parse().ok())
238 .collect(),
239 gossip_interval: config.federation_membership.gossip_interval,
240 fanout: config.federation_membership.fanout,
241 },
242 membership,
243 registry,
244 reactor,
245 network,
246 );
247 } else {
248 eprintln!("federation_enabled=true but federation_bind_addr invalid");
249 }
250 }
251}
252
253fn spawn_supervisor<R: Reactor, F: FileSystem>(
254 path: ActorPath,
255 strategy: SupervisionStrategy,
256 intensity: RestartIntensity,
257 specs: Vec<ChildSpec<R>>,
258 deps: &SupervisorSpawnDeps<R, F>,
259) -> UnboundedSender<LifecycleSignal<R>> {
260 let (ctrl_tx, ctrl_rx) = unbounded_channel::<LifecycleSignal<R>>();
261 let (event_tx, event_rx) = unbounded_channel::<ChildEvent<R>>();
262 let path_for_task = path.clone();
263 let reg_for_task = Arc::clone(&deps.registry);
264 let reactor_for_task = deps.reactor.clone();
265 let fs_for_task = deps.fs.clone();
266 let transport = Arc::clone(&deps.transport);
267 let transport_registry = Arc::clone(&deps.transport_registry);
268 let responses = Arc::clone(&deps.responses);
269 let federation = deps.federation.clone();
270 let task_handle = palladium_actor::Reactor::spawn_local(
271 &deps.reactor,
272 Box::pin(async move {
273 supervisor_task(
274 path_for_task,
275 strategy,
276 intensity,
277 ctrl_rx,
278 event_rx,
279 event_tx,
280 None,
281 transport,
282 transport_registry,
283 responses,
284 reg_for_task,
285 reactor_for_task,
286 fs_for_task,
287 federation,
288 specs,
289 )
290 .await;
291 }),
292 );
293
294 let addr = AddrHash::new(&path, 0);
295 let slot = crate::registry::ActorSlot {
296 path,
297 addr,
298 mailbox_tx: palladium_transport::mailbox(1).0,
299 ctrl_tx: ctrl_tx.clone(),
300 task_handle: Arc::new(parking_lot::Mutex::new(Some(task_handle))),
301 supervisor_addr: None,
302 running: Arc::new(std::sync::atomic::AtomicBool::new(true)),
303 restart_count: Arc::new(std::sync::atomic::AtomicU32::new(0)),
304 message_count: Arc::new(std::sync::atomic::AtomicU64::new(0)),
305 compute_time_ns: Arc::new(std::sync::atomic::AtomicU64::new(0)),
306 };
307 deps.registry.insert(slot);
308
309 ctrl_tx
310}