rust_supervisor/control/handle.rs
1//! Public runtime control handle.
2//!
3//! The handle owns the command sender side and exposes asynchronous control
4//! methods. It keeps command construction separate from runtime execution.
5
6use crate::child_runner::runner::ChildRunReport;
7use crate::control::command::{CommandMeta, CommandResult, ControlCommand};
8use crate::dashboard::runtime::DashboardIpcRuntimeGuard;
9use crate::error::types::SupervisorError;
10use crate::id::types::{ChildId, SupervisorPath};
11use crate::observe::pipeline::{ObservabilityPipeline, TestRecorder};
12use crate::runtime::lifecycle::{RuntimeControlPlane, RuntimeExitReport, RuntimeHealthReport};
13use crate::runtime::message::{ControlPlaneMessage, RuntimeLoopMessage};
14use std::sync::Arc;
15use std::sync::Mutex;
16use tokio::sync::{broadcast, mpsc, oneshot};
17
18/// Cloneable handle used to control a running supervisor.
19#[derive(Debug, Clone)]
20pub struct SupervisorHandle {
21 /// Sender side used to submit runtime control commands.
22 command_sender: mpsc::Sender<RuntimeLoopMessage>,
23 /// Broadcast sender used to create lifecycle event subscriptions.
24 event_sender: broadcast::Sender<String>,
25 /// Runtime control plane lifecycle state.
26 control_plane: RuntimeControlPlane,
27 /// Shared typed observability pipeline.
28 observability: Arc<Mutex<ObservabilityPipeline>>,
29 /// Optional dashboard IPC runtime guard.
30 dashboard_runtime: Option<Arc<DashboardIpcRuntimeGuard>>,
31}
32
33impl SupervisorHandle {
34 /// Creates a runtime handle from channel senders.
35 ///
36 /// # Arguments
37 ///
38 /// - `command_sender`: Sender used to submit runtime commands.
39 /// - `event_sender`: Sender used to subscribe to lifecycle events.
40 ///
41 /// # Returns
42 ///
43 /// Returns a [`SupervisorHandle`].
44 pub fn new(
45 command_sender: mpsc::Sender<RuntimeLoopMessage>,
46 event_sender: broadcast::Sender<String>,
47 control_plane: RuntimeControlPlane,
48 ) -> Self {
49 Self::new_with_observability(
50 command_sender,
51 event_sender,
52 control_plane,
53 Arc::new(Mutex::new(ObservabilityPipeline::new(16, 16))),
54 )
55 }
56
57 /// Creates a runtime handle with a shared observability pipeline.
58 ///
59 /// # Arguments
60 ///
61 /// - `command_sender`: Sender used to submit runtime commands.
62 /// - `event_sender`: Sender used to subscribe to lifecycle event text.
63 /// - `control_plane`: Runtime control plane lifecycle state.
64 /// - `observability`: Shared typed observability pipeline.
65 ///
66 /// # Returns
67 ///
68 /// Returns a [`SupervisorHandle`].
69 pub(crate) fn new_with_observability(
70 command_sender: mpsc::Sender<RuntimeLoopMessage>,
71 event_sender: broadcast::Sender<String>,
72 control_plane: RuntimeControlPlane,
73 observability: Arc<Mutex<ObservabilityPipeline>>,
74 ) -> Self {
75 Self {
76 command_sender,
77 event_sender,
78 control_plane,
79 observability,
80 dashboard_runtime: None,
81 }
82 }
83
84 /// Attaches a dashboard IPC runtime guard to this handle.
85 ///
86 /// # Arguments
87 ///
88 /// - `dashboard_runtime`: Guard that owns dashboard IPC runtime tasks.
89 ///
90 /// # Returns
91 ///
92 /// Returns this handle with dashboard runtime lifecycle attached.
93 pub(crate) fn with_dashboard_runtime(
94 mut self,
95 dashboard_runtime: Arc<DashboardIpcRuntimeGuard>,
96 ) -> Self {
97 self.dashboard_runtime = Some(dashboard_runtime);
98 self
99 }
100
101 /// Adds a child manifest under a supervisor path.
102 ///
103 /// # Arguments
104 ///
105 /// - `target`: Supervisor path that should receive the child.
106 /// - `child_manifest`: Child manifest text supplied by the caller.
107 /// - `requested_by`: Caller that requested the command.
108 /// - `reason`: Human-readable command reason.
109 ///
110 /// # Child Manifest Example
111 ///
112 /// The runtime expects a YAML child declaration. The smallest useful
113 /// manifest names the child and selects a task kind:
114 ///
115 /// ```yaml
116 /// name: worker
117 /// kind: async_worker
118 /// ```
119 ///
120 /// Optional fields can be added when the child needs dependencies,
121 /// lifecycle policy, resource limits, command permissions, environment
122 /// variables, or secret references:
123 ///
124 /// ```yaml
125 /// name: worker
126 /// kind: async_worker
127 /// criticality: optional
128 /// restart_policy: transient
129 /// dependencies:
130 /// - cache
131 /// health_check:
132 /// check_interval_secs: 10
133 /// timeout_secs: 5
134 /// max_retries: 3
135 /// readiness:
136 /// check_interval_secs: 5
137 /// timeout_secs: 3
138 /// resource_limits:
139 /// max_memory_mb: 256
140 /// max_cpu_percent: 80
141 /// max_file_descriptors: 1024
142 /// command_permissions:
143 /// allow_shutdown: false
144 /// allow_restart: true
145 /// allowed_signals:
146 /// - SIGTERM
147 /// environment:
148 /// - name: WORKER_MODE
149 /// value: queue
150 /// - name: API_TOKEN
151 /// secret_ref: ${API_TOKEN}
152 /// secrets:
153 /// - name: API_TOKEN
154 /// key: workers/api_token
155 /// required: true
156 /// ```
157 ///
158 /// # Example
159 ///
160 /// ```no_run
161 /// # async fn add_child_example() -> Result<(), rust_supervisor::error::types::SupervisorError> {
162 /// use rust_supervisor::control::command::CommandResult;
163 /// use rust_supervisor::id::types::SupervisorPath;
164 /// use rust_supervisor::runtime::supervisor::Supervisor;
165 /// use rust_supervisor::spec::supervisor::SupervisorSpec;
166 ///
167 /// let handle = Supervisor::start(SupervisorSpec::root(Vec::new())).await?;
168 /// let result = handle
169 /// .add_child(
170 /// SupervisorPath::root(),
171 /// "name: worker\nkind: async_worker\n",
172 /// "operator",
173 /// "attach worker during runtime update",
174 /// )
175 /// .await?;
176 ///
177 /// assert!(matches!(result, CommandResult::ChildAdded { .. }));
178 /// # Ok(())
179 /// # }
180 /// ```
181 ///
182 /// # Returns
183 ///
184 /// Returns a [`CommandResult`] after the runtime accepts the command.
185 pub async fn add_child(
186 &self,
187 target: SupervisorPath,
188 child_manifest: impl Into<String>,
189 requested_by: impl Into<String>,
190 reason: impl Into<String>,
191 ) -> Result<CommandResult, SupervisorError> {
192 self.send(ControlCommand::AddChild {
193 meta: CommandMeta::new(requested_by, reason),
194 target,
195 child_manifest: child_manifest.into(),
196 })
197 .await
198 }
199
200 /// Removes a child from runtime governance.
201 ///
202 /// # Arguments
203 ///
204 /// - `child_id`: Target child identifier.
205 /// - `requested_by`: Caller that requested the command.
206 /// - `reason`: Human-readable command reason.
207 ///
208 /// # Returns
209 ///
210 /// Returns a [`CommandResult`] after removal or idempotent reuse.
211 pub async fn remove_child(
212 &self,
213 child_id: ChildId,
214 requested_by: impl Into<String>,
215 reason: impl Into<String>,
216 ) -> Result<CommandResult, SupervisorError> {
217 self.child_command(child_id, requested_by, reason, |meta, child_id| {
218 ControlCommand::RemoveChild { meta, child_id }
219 })
220 .await
221 }
222
223 /// Restarts a child explicitly.
224 ///
225 /// # Arguments
226 ///
227 /// - `child_id`: Target child identifier.
228 /// - `requested_by`: Caller that requested the command.
229 /// - `reason`: Human-readable command reason.
230 ///
231 /// # Returns
232 ///
233 /// Returns a [`CommandResult`] after restart dispatch.
234 pub async fn restart_child(
235 &self,
236 child_id: ChildId,
237 requested_by: impl Into<String>,
238 reason: impl Into<String>,
239 ) -> Result<CommandResult, SupervisorError> {
240 self.child_command(child_id, requested_by, reason, |meta, child_id| {
241 ControlCommand::RestartChild { meta, child_id }
242 })
243 .await
244 }
245
246 /// Pauses a child idempotently.
247 ///
248 /// # Arguments
249 ///
250 /// - `child_id`: Target child identifier.
251 /// - `requested_by`: Caller that requested the command.
252 /// - `reason`: Human-readable command reason.
253 ///
254 /// # Returns
255 ///
256 /// Returns the current child state after the command.
257 pub async fn pause_child(
258 &self,
259 child_id: ChildId,
260 requested_by: impl Into<String>,
261 reason: impl Into<String>,
262 ) -> Result<CommandResult, SupervisorError> {
263 self.child_command(child_id, requested_by, reason, |meta, child_id| {
264 ControlCommand::PauseChild { meta, child_id }
265 })
266 .await
267 }
268
269 /// Resumes a child idempotently.
270 ///
271 /// # Arguments
272 ///
273 /// - `child_id`: Target child identifier.
274 /// - `requested_by`: Caller that requested the command.
275 /// - `reason`: Human-readable command reason.
276 ///
277 /// # Returns
278 ///
279 /// Returns the current child state after the command.
280 pub async fn resume_child(
281 &self,
282 child_id: ChildId,
283 requested_by: impl Into<String>,
284 reason: impl Into<String>,
285 ) -> Result<CommandResult, SupervisorError> {
286 self.child_command(child_id, requested_by, reason, |meta, child_id| {
287 ControlCommand::ResumeChild { meta, child_id }
288 })
289 .await
290 }
291
292 /// Quarantines a child idempotently.
293 ///
294 /// # Arguments
295 ///
296 /// - `child_id`: Target child identifier.
297 /// - `requested_by`: Caller that requested the command.
298 /// - `reason`: Human-readable command reason.
299 ///
300 /// # Returns
301 ///
302 /// Returns the current child state after the command.
303 pub async fn quarantine_child(
304 &self,
305 child_id: ChildId,
306 requested_by: impl Into<String>,
307 reason: impl Into<String>,
308 ) -> Result<CommandResult, SupervisorError> {
309 self.child_command(child_id, requested_by, reason, |meta, child_id| {
310 ControlCommand::QuarantineChild { meta, child_id }
311 })
312 .await
313 }
314
315 /// Shuts down the supervisor tree idempotently.
316 ///
317 /// # Arguments
318 ///
319 /// - `requested_by`: Caller that requested shutdown.
320 /// - `reason`: Human-readable shutdown reason.
321 ///
322 /// # Returns
323 ///
324 /// Returns the current shutdown result.
325 pub async fn shutdown_tree(
326 &self,
327 requested_by: impl Into<String>,
328 reason: impl Into<String>,
329 ) -> Result<CommandResult, SupervisorError> {
330 self.send(ControlCommand::ShutdownTree {
331 meta: CommandMeta::new(requested_by, reason),
332 })
333 .await
334 }
335
336 /// Queries the current runtime state.
337 ///
338 /// # Arguments
339 ///
340 /// This function has no arguments.
341 ///
342 /// # Returns
343 ///
344 /// Returns a [`CommandResult::CurrentState`] value.
345 pub async fn current_state(&self) -> Result<CommandResult, SupervisorError> {
346 self.send(ControlCommand::CurrentState {
347 meta: CommandMeta::new("system", "current_state"),
348 })
349 .await
350 }
351
352 /// Reports whether the runtime control loop is alive.
353 ///
354 /// # Arguments
355 ///
356 /// This function has no arguments.
357 ///
358 /// # Returns
359 ///
360 /// Returns `true` when ordinary control commands may still be accepted.
361 pub fn is_alive(&self) -> bool {
362 self.control_plane.is_alive()
363 }
364
365 /// Returns a runtime control plane health report.
366 ///
367 /// # Arguments
368 ///
369 /// This function has no arguments.
370 ///
371 /// # Returns
372 ///
373 /// Returns a [`RuntimeHealthReport`] value for the current observation.
374 pub fn health(&self) -> RuntimeHealthReport {
375 self.control_plane.health()
376 }
377
378 /// Waits until the runtime control plane reaches a final state.
379 ///
380 /// # Arguments
381 ///
382 /// This function has no arguments.
383 ///
384 /// # Returns
385 ///
386 /// Returns the cached [`RuntimeExitReport`].
387 pub async fn join(&self) -> Result<RuntimeExitReport, SupervisorError> {
388 let report = self.control_plane.join().await;
389 let _ignored = self.event_sender.send(format!(
390 "runtime_control_loop_join_completed:{}:{}:{}",
391 report.state.as_str(),
392 report.phase,
393 report.reason
394 ));
395 Ok(report)
396 }
397
398 /// Requests shutdown for the runtime control plane itself.
399 ///
400 /// # Arguments
401 ///
402 /// - `requested_by`: Caller that requested shutdown.
403 /// - `reason`: Human-readable shutdown reason.
404 ///
405 /// # Returns
406 ///
407 /// Returns the final [`RuntimeExitReport`].
408 pub async fn shutdown(
409 &self,
410 requested_by: impl Into<String>,
411 reason: impl Into<String>,
412 ) -> Result<RuntimeExitReport, SupervisorError> {
413 let meta = CommandMeta::new(requested_by, reason);
414 if let Some(report) = self
415 .control_plane
416 .mark_shutdown_requested(meta.requested_by.clone(), meta.reason.clone())?
417 {
418 return Ok(report);
419 }
420
421 let (reply_sender, reply_receiver) = oneshot::channel();
422 if self
423 .command_sender
424 .send(RuntimeLoopMessage::ControlPlane(
425 ControlPlaneMessage::Shutdown { meta, reply_sender },
426 ))
427 .await
428 .is_err()
429 {
430 return Err(self
431 .closed_runtime_error_after_join("runtime control loop is closed")
432 .await);
433 }
434 match reply_receiver.await {
435 Ok(result) => {
436 result?;
437 }
438 Err(_) => {
439 return Err(self
440 .closed_runtime_error_after_join("runtime control loop dropped shutdown reply")
441 .await);
442 }
443 }
444 self.join().await
445 }
446
447 /// Subscribes to runtime event text emitted by the control loop.
448 ///
449 /// # Arguments
450 ///
451 /// This function has no arguments.
452 ///
453 /// # Returns
454 ///
455 /// Returns a broadcast receiver for event text.
456 pub fn subscribe_events(&self) -> broadcast::Receiver<String> {
457 let receiver = self.event_sender.subscribe();
458 if self.control_plane.is_alive() {
459 let _ignored = self
460 .event_sender
461 .send("runtime_control_loop_started:startup".to_owned());
462 }
463 receiver
464 }
465
466 /// Returns a copy of the typed observability test recorder.
467 ///
468 /// # Arguments
469 ///
470 /// This function has no arguments.
471 ///
472 /// # Returns
473 ///
474 /// Returns the currently retained [`TestRecorder`] contents.
475 pub fn observability_recorder(&self) -> TestRecorder {
476 self.observability
477 .lock()
478 .map(|pipeline| pipeline.test_recorder.clone())
479 .unwrap_or_default()
480 }
481
482 /// Hidden integration-test hook that feeds a synthetic [`ChildRunReport`] through the mailbox.
483 ///
484 /// Production callers must not rely on this hook.
485 #[doc(hidden)]
486 pub async fn generation_fencing_replay_child_exit_for_test(
487 &self,
488 report: ChildRunReport,
489 ) -> Result<(), SupervisorError> {
490 if let Some(report_final) = self.control_plane.final_report() {
491 return Err(runtime_exit_error(&report_final));
492 }
493 if !self.control_plane.is_alive() {
494 return Err(SupervisorError::InvalidTransition {
495 message: format!(
496 "runtime control loop is not alive: state={}",
497 self.control_plane.health().state.as_str()
498 ),
499 });
500 }
501 self.command_sender
502 .send(RuntimeLoopMessage::ControlPlane(
503 ControlPlaneMessage::ReplayChildExitForTest {
504 report: Box::new(report),
505 },
506 ))
507 .await
508 .map_err(|_| SupervisorError::InvalidTransition {
509 message: "runtime control loop is closed".to_owned(),
510 })
511 }
512
513 /// Sends one control command and waits for the result.
514 ///
515 /// # Arguments
516 ///
517 /// - `command`: Command that should be executed by the runtime loop.
518 ///
519 /// # Returns
520 ///
521 /// Returns a command result or a supervisor error when the runtime is gone.
522 async fn send(&self, command: ControlCommand) -> Result<CommandResult, SupervisorError> {
523 if let Some(report) = self.control_plane.final_report() {
524 return Err(runtime_exit_error(&report));
525 }
526 if !self.control_plane.is_alive() {
527 return Err(SupervisorError::InvalidTransition {
528 message: format!(
529 "runtime control loop is not alive: state={}",
530 self.control_plane.health().state.as_str()
531 ),
532 });
533 }
534 command.validate_audit_metadata()?;
535 let (reply_sender, reply_receiver) = oneshot::channel();
536 if self
537 .command_sender
538 .send(RuntimeLoopMessage::Control {
539 command,
540 reply_sender,
541 })
542 .await
543 .is_err()
544 {
545 return Err(self
546 .closed_runtime_error_after_join("runtime control loop is closed")
547 .await);
548 }
549 match reply_receiver.await {
550 Ok(result) => result,
551 Err(_) => Err(self
552 .closed_runtime_error_after_join("runtime control loop dropped command reply")
553 .await),
554 }
555 }
556
557 /// Builds and sends a child-targeted command.
558 ///
559 /// # Arguments
560 ///
561 /// - `child_id`: Target child identifier.
562 /// - `requested_by`: Caller that requested the command.
563 /// - `reason`: Human-readable command reason.
564 /// - `builder`: Command builder for the child operation.
565 ///
566 /// # Returns
567 ///
568 /// Returns a command result from the runtime loop.
569 async fn child_command<F>(
570 &self,
571 child_id: ChildId,
572 requested_by: impl Into<String>,
573 reason: impl Into<String>,
574 builder: F,
575 ) -> Result<CommandResult, SupervisorError>
576 where
577 F: FnOnce(CommandMeta, ChildId) -> ControlCommand,
578 {
579 let meta = CommandMeta::new(requested_by, reason);
580 self.send(builder(meta, child_id)).await
581 }
582
583 /// Builds an error after waiting for the runtime exit report when possible.
584 async fn closed_runtime_error_after_join(&self, fallback: &str) -> SupervisorError {
585 if let Some(report) = self.control_plane.final_report() {
586 return runtime_exit_error(&report);
587 }
588 if self.command_sender.is_closed() {
589 let report = self.control_plane.join().await;
590 return runtime_exit_error(&report);
591 }
592 SupervisorError::InvalidTransition {
593 message: fallback.to_owned(),
594 }
595 }
596}
597
598/// Builds a control command error from a runtime exit report.
599fn runtime_exit_error(report: &RuntimeExitReport) -> SupervisorError {
600 SupervisorError::InvalidTransition {
601 message: format!(
602 "runtime control loop already exited: state={}, phase={}, reason={}",
603 report.state.as_str(),
604 report.phase,
605 report.reason
606 ),
607 }
608}